| 
									
										
										
										
											2023-11-25 05:57:02 -05:00
										 |  |  | import base64 | 
					
						
							|  |  |  | import logging | 
					
						
							|  |  |  | import time | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from NetUtils import ClientStatus | 
					
						
							|  |  |  | from worlds._bizhawk.client import BizHawkClient | 
					
						
							|  |  |  | from worlds._bizhawk import read, write, guarded_write | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-12-13 17:57:14 -05:00
										 |  |  | from .locations import location_data | 
					
						
							| 
									
										
										
										
											2023-11-25 05:57:02 -05:00
										 |  |  | 
 | 
					
						
							|  |  |  | logger = logging.getLogger("Client") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-02-10 18:59:15 -05:00
										 |  |  | BANK_EXCHANGE_RATE = 50000000 | 
					
						
							| 
									
										
										
										
											2023-11-25 05:57:02 -05:00
										 |  |  | 
 | 
					
						
							|  |  |  | DATA_LOCATIONS = { | 
					
						
							|  |  |  |     "ItemIndex": (0x1A6E, 0x02), | 
					
						
							|  |  |  |     "Deathlink": (0x00FD, 0x01), | 
					
						
							|  |  |  |     "APItem": (0x00FF, 0x01), | 
					
						
							|  |  |  |     "EventFlag": (0x1735, 0x140), | 
					
						
							|  |  |  |     "Missable": (0x161A, 0x20), | 
					
						
							|  |  |  |     "Hidden": (0x16DE, 0x0E), | 
					
						
							|  |  |  |     "Rod": (0x1716, 0x01), | 
					
						
							|  |  |  |     "DexSanityFlag": (0x1A71, 19), | 
					
						
							|  |  |  |     "GameStatus": (0x1A84, 0x01), | 
					
						
							|  |  |  |     "Money": (0x141F, 3), | 
					
						
							|  |  |  |     "ResetCheck": (0x0100, 4), | 
					
						
							|  |  |  |     # First and second Vermilion Gym trash can selection. Second is not used, so should always be 0. | 
					
						
							|  |  |  |     # First should never be above 0x0F. This is just before Event Flags. | 
					
						
							|  |  |  |     "CrashCheck1": (0x1731, 2), | 
					
						
							|  |  |  |     # Unused, should always be 0. This is just before Missables flags. | 
					
						
							|  |  |  |     "CrashCheck2": (0x1617, 1), | 
					
						
							|  |  |  |     # Progressive keys, should never be above 10. Just before Dexsanity flags. | 
					
						
							|  |  |  |     "CrashCheck3": (0x1A70, 1), | 
					
						
							|  |  |  |     # Route 18 script value. Should never be above 2. Just before Hidden items flags. | 
					
						
							|  |  |  |     "CrashCheck4": (0x16DD, 1), | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | location_map = {"Rod": {}, "EventFlag": {}, "Missable": {}, "Hidden": {}, "list": {}, "DexSanityFlag": {}} | 
					
						
							|  |  |  | location_bytes_bits = {} | 
					
						
							|  |  |  | for location in location_data: | 
					
						
							|  |  |  |     if location.ram_address is not None: | 
					
						
							|  |  |  |         if type(location.ram_address) == list: | 
					
						
							|  |  |  |             location_map[type(location.ram_address).__name__][(location.ram_address[0].flag, location.ram_address[1].flag)] = location.address | 
					
						
							|  |  |  |             location_bytes_bits[location.address] = [{'byte': location.ram_address[0].byte, 'bit': location.ram_address[0].bit}, | 
					
						
							|  |  |  |                                                      {'byte': location.ram_address[1].byte, 'bit': location.ram_address[1].bit}] | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             location_map[type(location.ram_address).__name__][location.ram_address.flag] = location.address | 
					
						
							|  |  |  |             location_bytes_bits[location.address] = {'byte': location.ram_address.byte, 'bit': location.ram_address.bit} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | location_name_to_id = {location.name: location.address for location in location_data if location.type == "Item" | 
					
						
							|  |  |  |                        and location.address is not None} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class PokemonRBClient(BizHawkClient): | 
					
						
							|  |  |  |     system = ("GB", "SGB") | 
					
						
							|  |  |  |     patch_suffix = (".apred", ".apblue") | 
					
						
							|  |  |  |     game = "Pokemon Red and Blue" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self): | 
					
						
							|  |  |  |         super().__init__() | 
					
						
							|  |  |  |         self.auto_hints = set() | 
					
						
							|  |  |  |         self.locations_array = None | 
					
						
							|  |  |  |         self.disconnect_pending = False | 
					
						
							|  |  |  |         self.set_deathlink = False | 
					
						
							|  |  |  |         self.banking_command = None | 
					
						
							|  |  |  |         self.game_state = False | 
					
						
							|  |  |  |         self.last_death_link = 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     async def validate_rom(self, ctx): | 
					
						
							|  |  |  |         game_name = await read(ctx.bizhawk_ctx, [(0x134, 12, "ROM")]) | 
					
						
							|  |  |  |         game_name = game_name[0].decode("ascii") | 
					
						
							|  |  |  |         if game_name in ("POKEMON RED\00", "POKEMON BLUE"): | 
					
						
							|  |  |  |             ctx.game = self.game | 
					
						
							|  |  |  |             ctx.items_handling = 0b001 | 
					
						
							|  |  |  |             ctx.command_processor.commands["bank"] = cmd_bank | 
					
						
							|  |  |  |             seed_name = await read(ctx.bizhawk_ctx, [(0xFFDB, 21, "ROM")]) | 
					
						
							|  |  |  |             ctx.seed_name = seed_name[0].split(b"\0")[0].decode("ascii") | 
					
						
							|  |  |  |             self.set_deathlink = False | 
					
						
							|  |  |  |             self.banking_command = None | 
					
						
							|  |  |  |             self.locations_array = None | 
					
						
							|  |  |  |             self.disconnect_pending = False | 
					
						
							|  |  |  |             return True | 
					
						
							|  |  |  |         return False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     async def set_auth(self, ctx): | 
					
						
							|  |  |  |         auth_name = await read(ctx.bizhawk_ctx, [(0xFFC6, 21, "ROM")]) | 
					
						
							|  |  |  |         if auth_name[0] == bytes([0] * 21): | 
					
						
							|  |  |  |             # rom was patched before rom names implemented, use player name | 
					
						
							|  |  |  |             auth_name = await read(ctx.bizhawk_ctx, [(0xFFF0, 16, "ROM")]) | 
					
						
							|  |  |  |             auth_name = auth_name[0].decode("ascii").split("\x00")[0] | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             auth_name = base64.b64encode(auth_name[0]).decode() | 
					
						
							|  |  |  |         ctx.auth = auth_name | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     async def game_watcher(self, ctx): | 
					
						
							|  |  |  |         if not ctx.server or not ctx.server.socket.open or ctx.server.socket.closed: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         data = await read(ctx.bizhawk_ctx, [(loc_data[0], loc_data[1], "WRAM") | 
					
						
							|  |  |  |                                             for loc_data in DATA_LOCATIONS.values()]) | 
					
						
							|  |  |  |         data = {data_set_name: data_name for data_set_name, data_name in zip(DATA_LOCATIONS.keys(), data)} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if self.set_deathlink: | 
					
						
							|  |  |  |             self.set_deathlink = False | 
					
						
							|  |  |  |             await ctx.update_death_link(True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if self.disconnect_pending: | 
					
						
							|  |  |  |             self.disconnect_pending = False | 
					
						
							|  |  |  |             await ctx.disconnect() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if data["GameStatus"][0] == 0 or data["ResetCheck"] == b'\xff\xff\xff\x7f': | 
					
						
							|  |  |  |             # Do not handle anything before game save is loaded | 
					
						
							|  |  |  |             self.game_state = False | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         elif (data["GameStatus"][0] not in (0x2A, 0xAC) | 
					
						
							|  |  |  |               or data["CrashCheck1"][0] & 0xF0 or data["CrashCheck1"][1] & 0xFF | 
					
						
							|  |  |  |               or data["CrashCheck2"][0] | 
					
						
							|  |  |  |               or data["CrashCheck3"][0] > 10 | 
					
						
							|  |  |  |               or data["CrashCheck4"][0] > 2): | 
					
						
							|  |  |  |             # Should mean game crashed | 
					
						
							|  |  |  |             logger.warning("Pokémon Red/Blue game may have crashed. Disconnecting from server.") | 
					
						
							|  |  |  |             self.game_state = False | 
					
						
							|  |  |  |             await ctx.disconnect() | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         self.game_state = True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # SEND ITEMS TO CLIENT | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if data["APItem"][0] == 0: | 
					
						
							|  |  |  |             item_index = int.from_bytes(data["ItemIndex"], "little") | 
					
						
							|  |  |  |             if len(ctx.items_received) > item_index: | 
					
						
							|  |  |  |                 item_code = ctx.items_received[item_index].item - 172000000 | 
					
						
							|  |  |  |                 if item_code > 255: | 
					
						
							|  |  |  |                     item_code -= 256 | 
					
						
							|  |  |  |                 await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["APItem"][0], | 
					
						
							|  |  |  |                                                [item_code], "WRAM")]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # LOCATION CHECKS | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         locations = set() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for flag_type, loc_map in location_map.items(): | 
					
						
							|  |  |  |             for flag, loc_id in loc_map.items(): | 
					
						
							|  |  |  |                 if flag_type == "list": | 
					
						
							|  |  |  |                     if (data["EventFlag"][location_bytes_bits[loc_id][0]['byte']] & 1 << | 
					
						
							|  |  |  |                             location_bytes_bits[loc_id][0]['bit'] | 
					
						
							|  |  |  |                             and data["Missable"][location_bytes_bits[loc_id][1]['byte']] & 1 << | 
					
						
							|  |  |  |                             location_bytes_bits[loc_id][1]['bit']): | 
					
						
							|  |  |  |                         locations.add(loc_id) | 
					
						
							|  |  |  |                 elif data[flag_type][location_bytes_bits[loc_id]['byte']] & 1 << location_bytes_bits[loc_id]['bit']: | 
					
						
							|  |  |  |                     locations.add(loc_id) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if locations != self.locations_array: | 
					
						
							|  |  |  |             if locations: | 
					
						
							|  |  |  |                 self.locations_array = locations | 
					
						
							|  |  |  |                 await ctx.send_msgs([{"cmd": "LocationChecks", "locations": list(locations)}]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # AUTO HINTS | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         hints = [] | 
					
						
							|  |  |  |         if data["EventFlag"][280] & 16: | 
					
						
							|  |  |  |             hints.append("Cerulean Bicycle Shop") | 
					
						
							|  |  |  |         if data["EventFlag"][280] & 32: | 
					
						
							|  |  |  |             hints.append("Route 2 Gate - Oak's Aide") | 
					
						
							|  |  |  |         if data["EventFlag"][280] & 64: | 
					
						
							|  |  |  |             hints.append("Route 11 Gate 2F - Oak's Aide") | 
					
						
							|  |  |  |         if data["EventFlag"][280] & 128: | 
					
						
							|  |  |  |             hints.append("Route 15 Gate 2F - Oak's Aide") | 
					
						
							|  |  |  |         if data["EventFlag"][281] & 1: | 
					
						
							|  |  |  |             hints += ["Celadon Prize Corner - Item Prize 1", "Celadon Prize Corner - Item Prize 2", | 
					
						
							|  |  |  |                       "Celadon Prize Corner - Item Prize 3"] | 
					
						
							|  |  |  |         if (location_name_to_id["Fossil - Choice A"] in ctx.checked_locations and location_name_to_id[ | 
					
						
							|  |  |  |             "Fossil - Choice B"] | 
					
						
							|  |  |  |                 not in ctx.checked_locations): | 
					
						
							|  |  |  |             hints.append("Fossil - Choice B") | 
					
						
							|  |  |  |         elif (location_name_to_id["Fossil - Choice B"] in ctx.checked_locations and location_name_to_id[ | 
					
						
							|  |  |  |             "Fossil - Choice A"] | 
					
						
							|  |  |  |               not in ctx.checked_locations): | 
					
						
							|  |  |  |             hints.append("Fossil - Choice A") | 
					
						
							|  |  |  |         hints = [ | 
					
						
							|  |  |  |             location_name_to_id[loc] for loc in hints if location_name_to_id[loc] not in self.auto_hints and | 
					
						
							|  |  |  |                                                          location_name_to_id[loc] in ctx.missing_locations and | 
					
						
							|  |  |  |                                                          location_name_to_id[loc] not in ctx.locations_checked | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |         if hints: | 
					
						
							|  |  |  |             await ctx.send_msgs([{"cmd": "LocationScouts", "locations": hints, "create_as_hint": 2}]) | 
					
						
							|  |  |  |         self.auto_hints.update(hints) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # DEATHLINK | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if "DeathLink" in ctx.tags: | 
					
						
							|  |  |  |             if data["Deathlink"][0] == 3: | 
					
						
							|  |  |  |                 await ctx.send_death(ctx.player_names[ctx.slot] + " is out of usable Pokémon! " | 
					
						
							|  |  |  |                                      + ctx.player_names[ctx.slot] + " blacked out!") | 
					
						
							|  |  |  |                 await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["Deathlink"][0], [0], "WRAM")]) | 
					
						
							|  |  |  |                 self.last_death_link = ctx.last_death_link | 
					
						
							|  |  |  |             elif ctx.last_death_link > self.last_death_link: | 
					
						
							|  |  |  |                 self.last_death_link = ctx.last_death_link | 
					
						
							|  |  |  |                 await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["Deathlink"][0], [1], "WRAM")]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # BANK | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if self.banking_command: | 
					
						
							|  |  |  |             original_money = data["Money"] | 
					
						
							|  |  |  |             # Money is stored as binary-coded decimal. | 
					
						
							|  |  |  |             money = int(original_money.hex()) | 
					
						
							|  |  |  |             if self.banking_command > money: | 
					
						
							|  |  |  |                 logger.warning(f"You do not have ${self.banking_command} to deposit!") | 
					
						
							|  |  |  |             elif (-self.banking_command * BANK_EXCHANGE_RATE) > ctx.stored_data[f"EnergyLink{ctx.team}"]: | 
					
						
							|  |  |  |                 logger.warning("Not enough money in the EnergyLink storage!") | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 if self.banking_command + money > 999999: | 
					
						
							|  |  |  |                     self.banking_command = 999999 - money | 
					
						
							|  |  |  |                 money = str(money - self.banking_command).zfill(6) | 
					
						
							|  |  |  |                 money = [int(money[:2], 16), int(money[2:4], 16), int(money[4:], 16)] | 
					
						
							|  |  |  |                 money_written = await guarded_write(ctx.bizhawk_ctx, [(0x141F, money, "WRAM")], | 
					
						
							|  |  |  |                                                     [(0x141F, original_money, "WRAM")]) | 
					
						
							|  |  |  |                 if money_written: | 
					
						
							|  |  |  |                     if self.banking_command >= 0: | 
					
						
							|  |  |  |                         deposit = self.banking_command - int(self.banking_command / 4) | 
					
						
							|  |  |  |                         tax = self.banking_command - deposit | 
					
						
							|  |  |  |                         logger.info(f"Deposited ${deposit}, and charged a tax of ${tax}.") | 
					
						
							|  |  |  |                         self.banking_command = deposit | 
					
						
							|  |  |  |                     else: | 
					
						
							|  |  |  |                         logger.info(f"Withdrew ${-self.banking_command}.") | 
					
						
							|  |  |  |                     await ctx.send_msgs([{ | 
					
						
							|  |  |  |                         "cmd": "Set", "key": f"EnergyLink{ctx.team}", "operations": | 
					
						
							|  |  |  |                             [{"operation": "add", "value": self.banking_command * BANK_EXCHANGE_RATE}, | 
					
						
							|  |  |  |                              {"operation": "max", "value": 0}], | 
					
						
							|  |  |  |                     }]) | 
					
						
							|  |  |  |             self.banking_command = None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # VICTORY | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if data["EventFlag"][280] & 1 and not ctx.finished_game: | 
					
						
							|  |  |  |             await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}]) | 
					
						
							|  |  |  |             ctx.finished_game = True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def on_package(self, ctx, cmd, args): | 
					
						
							|  |  |  |         if cmd == 'Connected': | 
					
						
							|  |  |  |             if 'death_link' in args['slot_data'] and args['slot_data']['death_link']: | 
					
						
							|  |  |  |                 self.set_deathlink = True | 
					
						
							|  |  |  |                 self.last_death_link = time.time() | 
					
						
							|  |  |  |             ctx.set_notify(f"EnergyLink{ctx.team}") | 
					
						
							|  |  |  |         elif cmd == 'RoomInfo': | 
					
						
							|  |  |  |             if ctx.seed_name and ctx.seed_name != args["seed_name"]: | 
					
						
							|  |  |  |                 # CommonClient's on_package displays an error to the user in this case, but connection is not cancelled. | 
					
						
							|  |  |  |                 self.game_state = False | 
					
						
							|  |  |  |                 self.disconnect_pending = True | 
					
						
							|  |  |  |         super().on_package(ctx, cmd, args) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def cmd_bank(self, cmd: str = "", amount: str = ""): | 
					
						
							|  |  |  |     """Deposit or withdraw money with the server's EnergyLink storage.
 | 
					
						
							|  |  |  |     /bank - check server balance. | 
					
						
							|  |  |  |     /bank deposit # - deposit money. One quarter of the amount will be lost to taxation. | 
					
						
							|  |  |  |     /bank withdraw # - withdraw money.""" | 
					
						
							|  |  |  |     if self.ctx.game != "Pokemon Red and Blue": | 
					
						
							|  |  |  |         logger.warning("This command can only be used while playing Pokémon Red and Blue") | 
					
						
							|  |  |  |         return | 
					
						
							|  |  |  |     if not cmd: | 
					
						
							|  |  |  |         logger.info(f"Money available: {int(self.ctx.stored_data[f'EnergyLink{self.ctx.team}'] / BANK_EXCHANGE_RATE)}") | 
					
						
							|  |  |  |         return | 
					
						
							|  |  |  |     elif (not self.ctx.server) or self.ctx.server.socket.closed or not self.ctx.client_handler.game_state: | 
					
						
							|  |  |  |         logger.info(f"Must be connected to server and in game.") | 
					
						
							|  |  |  |     elif not amount: | 
					
						
							|  |  |  |         logger.warning("You must specify an amount.") | 
					
						
							|  |  |  |     elif cmd == "withdraw": | 
					
						
							|  |  |  |         self.ctx.client_handler.banking_command = -int(amount) | 
					
						
							|  |  |  |     elif cmd == "deposit": | 
					
						
							|  |  |  |         if int(amount) < 4: | 
					
						
							|  |  |  |             logger.warning("You must deposit at least $4, for tax purposes.") | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         self.ctx.client_handler.banking_command = int(amount) | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         logger.warning(f"Invalid bank command {cmd}") | 
					
						
							|  |  |  |         return |