278 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			278 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | import base64 | ||
|  | import logging | ||
|  | import time | ||
|  | 
 | ||
|  | from NetUtils import ClientStatus | ||
|  | from worlds._bizhawk.client import BizHawkClient | ||
|  | from worlds._bizhawk import read, write, guarded_write | ||
|  | 
 | ||
|  | from worlds.pokemon_rb.locations import location_data | ||
|  | 
 | ||
|  | logger = logging.getLogger("Client") | ||
|  | 
 | ||
|  | BANK_EXCHANGE_RATE = 100000000 | ||
|  | 
 | ||
|  | DATA_LOCATIONS = { | ||
|  |     "ItemIndex": (0x1A6E, 0x02), | ||
|  |     "Deathlink": (0x00FD, 0x01), | ||
|  |     "APItem": (0x00FF, 0x01), | ||
|  |     "EventFlag": (0x1735, 0x140), | ||
|  |     "Missable": (0x161A, 0x20), | ||
|  |     "Hidden": (0x16DE, 0x0E), | ||
|  |     "Rod": (0x1716, 0x01), | ||
|  |     "DexSanityFlag": (0x1A71, 19), | ||
|  |     "GameStatus": (0x1A84, 0x01), | ||
|  |     "Money": (0x141F, 3), | ||
|  |     "ResetCheck": (0x0100, 4), | ||
|  |     # First and second Vermilion Gym trash can selection. Second is not used, so should always be 0. | ||
|  |     # First should never be above 0x0F. This is just before Event Flags. | ||
|  |     "CrashCheck1": (0x1731, 2), | ||
|  |     # Unused, should always be 0. This is just before Missables flags. | ||
|  |     "CrashCheck2": (0x1617, 1), | ||
|  |     # Progressive keys, should never be above 10. Just before Dexsanity flags. | ||
|  |     "CrashCheck3": (0x1A70, 1), | ||
|  |     # Route 18 script value. Should never be above 2. Just before Hidden items flags. | ||
|  |     "CrashCheck4": (0x16DD, 1), | ||
|  | } | ||
|  | 
 | ||
|  | location_map = {"Rod": {}, "EventFlag": {}, "Missable": {}, "Hidden": {}, "list": {}, "DexSanityFlag": {}} | ||
|  | location_bytes_bits = {} | ||
|  | for location in location_data: | ||
|  |     if location.ram_address is not None: | ||
|  |         if type(location.ram_address) == list: | ||
|  |             location_map[type(location.ram_address).__name__][(location.ram_address[0].flag, location.ram_address[1].flag)] = location.address | ||
|  |             location_bytes_bits[location.address] = [{'byte': location.ram_address[0].byte, 'bit': location.ram_address[0].bit}, | ||
|  |                                                      {'byte': location.ram_address[1].byte, 'bit': location.ram_address[1].bit}] | ||
|  |         else: | ||
|  |             location_map[type(location.ram_address).__name__][location.ram_address.flag] = location.address | ||
|  |             location_bytes_bits[location.address] = {'byte': location.ram_address.byte, 'bit': location.ram_address.bit} | ||
|  | 
 | ||
|  | location_name_to_id = {location.name: location.address for location in location_data if location.type == "Item" | ||
|  |                        and location.address is not None} | ||
|  | 
 | ||
|  | 
 | ||
|  | class PokemonRBClient(BizHawkClient): | ||
|  |     system = ("GB", "SGB") | ||
|  |     patch_suffix = (".apred", ".apblue") | ||
|  |     game = "Pokemon Red and Blue" | ||
|  | 
 | ||
|  |     def __init__(self): | ||
|  |         super().__init__() | ||
|  |         self.auto_hints = set() | ||
|  |         self.locations_array = None | ||
|  |         self.disconnect_pending = False | ||
|  |         self.set_deathlink = False | ||
|  |         self.banking_command = None | ||
|  |         self.game_state = False | ||
|  |         self.last_death_link = 0 | ||
|  | 
 | ||
|  |     async def validate_rom(self, ctx): | ||
|  |         game_name = await read(ctx.bizhawk_ctx, [(0x134, 12, "ROM")]) | ||
|  |         game_name = game_name[0].decode("ascii") | ||
|  |         if game_name in ("POKEMON RED\00", "POKEMON BLUE"): | ||
|  |             ctx.game = self.game | ||
|  |             ctx.items_handling = 0b001 | ||
|  |             ctx.command_processor.commands["bank"] = cmd_bank | ||
|  |             seed_name = await read(ctx.bizhawk_ctx, [(0xFFDB, 21, "ROM")]) | ||
|  |             ctx.seed_name = seed_name[0].split(b"\0")[0].decode("ascii") | ||
|  |             self.set_deathlink = False | ||
|  |             self.banking_command = None | ||
|  |             self.locations_array = None | ||
|  |             self.disconnect_pending = False | ||
|  |             return True | ||
|  |         return False | ||
|  | 
 | ||
|  |     async def set_auth(self, ctx): | ||
|  |         auth_name = await read(ctx.bizhawk_ctx, [(0xFFC6, 21, "ROM")]) | ||
|  |         if auth_name[0] == bytes([0] * 21): | ||
|  |             # rom was patched before rom names implemented, use player name | ||
|  |             auth_name = await read(ctx.bizhawk_ctx, [(0xFFF0, 16, "ROM")]) | ||
|  |             auth_name = auth_name[0].decode("ascii").split("\x00")[0] | ||
|  |         else: | ||
|  |             auth_name = base64.b64encode(auth_name[0]).decode() | ||
|  |         ctx.auth = auth_name | ||
|  | 
 | ||
|  |     async def game_watcher(self, ctx): | ||
|  |         if not ctx.server or not ctx.server.socket.open or ctx.server.socket.closed: | ||
|  |             return | ||
|  | 
 | ||
|  |         data = await read(ctx.bizhawk_ctx, [(loc_data[0], loc_data[1], "WRAM") | ||
|  |                                             for loc_data in DATA_LOCATIONS.values()]) | ||
|  |         data = {data_set_name: data_name for data_set_name, data_name in zip(DATA_LOCATIONS.keys(), data)} | ||
|  | 
 | ||
|  |         if self.set_deathlink: | ||
|  |             self.set_deathlink = False | ||
|  |             await ctx.update_death_link(True) | ||
|  | 
 | ||
|  |         if self.disconnect_pending: | ||
|  |             self.disconnect_pending = False | ||
|  |             await ctx.disconnect() | ||
|  | 
 | ||
|  |         if data["GameStatus"][0] == 0 or data["ResetCheck"] == b'\xff\xff\xff\x7f': | ||
|  |             # Do not handle anything before game save is loaded | ||
|  |             self.game_state = False | ||
|  |             return | ||
|  |         elif (data["GameStatus"][0] not in (0x2A, 0xAC) | ||
|  |               or data["CrashCheck1"][0] & 0xF0 or data["CrashCheck1"][1] & 0xFF | ||
|  |               or data["CrashCheck2"][0] | ||
|  |               or data["CrashCheck3"][0] > 10 | ||
|  |               or data["CrashCheck4"][0] > 2): | ||
|  |             # Should mean game crashed | ||
|  |             logger.warning("Pokémon Red/Blue game may have crashed. Disconnecting from server.") | ||
|  |             self.game_state = False | ||
|  |             await ctx.disconnect() | ||
|  |             return | ||
|  |         self.game_state = True | ||
|  | 
 | ||
|  |         # SEND ITEMS TO CLIENT | ||
|  | 
 | ||
|  |         if data["APItem"][0] == 0: | ||
|  |             item_index = int.from_bytes(data["ItemIndex"], "little") | ||
|  |             if len(ctx.items_received) > item_index: | ||
|  |                 item_code = ctx.items_received[item_index].item - 172000000 | ||
|  |                 if item_code > 255: | ||
|  |                     item_code -= 256 | ||
|  |                 await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["APItem"][0], | ||
|  |                                                [item_code], "WRAM")]) | ||
|  | 
 | ||
|  |         # LOCATION CHECKS | ||
|  | 
 | ||
|  |         locations = set() | ||
|  | 
 | ||
|  |         for flag_type, loc_map in location_map.items(): | ||
|  |             for flag, loc_id in loc_map.items(): | ||
|  |                 if flag_type == "list": | ||
|  |                     if (data["EventFlag"][location_bytes_bits[loc_id][0]['byte']] & 1 << | ||
|  |                             location_bytes_bits[loc_id][0]['bit'] | ||
|  |                             and data["Missable"][location_bytes_bits[loc_id][1]['byte']] & 1 << | ||
|  |                             location_bytes_bits[loc_id][1]['bit']): | ||
|  |                         locations.add(loc_id) | ||
|  |                 elif data[flag_type][location_bytes_bits[loc_id]['byte']] & 1 << location_bytes_bits[loc_id]['bit']: | ||
|  |                     locations.add(loc_id) | ||
|  | 
 | ||
|  |         if locations != self.locations_array: | ||
|  |             if locations: | ||
|  |                 self.locations_array = locations | ||
|  |                 await ctx.send_msgs([{"cmd": "LocationChecks", "locations": list(locations)}]) | ||
|  | 
 | ||
|  |         # AUTO HINTS | ||
|  | 
 | ||
|  |         hints = [] | ||
|  |         if data["EventFlag"][280] & 16: | ||
|  |             hints.append("Cerulean Bicycle Shop") | ||
|  |         if data["EventFlag"][280] & 32: | ||
|  |             hints.append("Route 2 Gate - Oak's Aide") | ||
|  |         if data["EventFlag"][280] & 64: | ||
|  |             hints.append("Route 11 Gate 2F - Oak's Aide") | ||
|  |         if data["EventFlag"][280] & 128: | ||
|  |             hints.append("Route 15 Gate 2F - Oak's Aide") | ||
|  |         if data["EventFlag"][281] & 1: | ||
|  |             hints += ["Celadon Prize Corner - Item Prize 1", "Celadon Prize Corner - Item Prize 2", | ||
|  |                       "Celadon Prize Corner - Item Prize 3"] | ||
|  |         if (location_name_to_id["Fossil - Choice A"] in ctx.checked_locations and location_name_to_id[ | ||
|  |             "Fossil - Choice B"] | ||
|  |                 not in ctx.checked_locations): | ||
|  |             hints.append("Fossil - Choice B") | ||
|  |         elif (location_name_to_id["Fossil - Choice B"] in ctx.checked_locations and location_name_to_id[ | ||
|  |             "Fossil - Choice A"] | ||
|  |               not in ctx.checked_locations): | ||
|  |             hints.append("Fossil - Choice A") | ||
|  |         hints = [ | ||
|  |             location_name_to_id[loc] for loc in hints if location_name_to_id[loc] not in self.auto_hints and | ||
|  |                                                          location_name_to_id[loc] in ctx.missing_locations and | ||
|  |                                                          location_name_to_id[loc] not in ctx.locations_checked | ||
|  |         ] | ||
|  |         if hints: | ||
|  |             await ctx.send_msgs([{"cmd": "LocationScouts", "locations": hints, "create_as_hint": 2}]) | ||
|  |         self.auto_hints.update(hints) | ||
|  | 
 | ||
|  |         # DEATHLINK | ||
|  | 
 | ||
|  |         if "DeathLink" in ctx.tags: | ||
|  |             if data["Deathlink"][0] == 3: | ||
|  |                 await ctx.send_death(ctx.player_names[ctx.slot] + " is out of usable Pokémon! " | ||
|  |                                      + ctx.player_names[ctx.slot] + " blacked out!") | ||
|  |                 await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["Deathlink"][0], [0], "WRAM")]) | ||
|  |                 self.last_death_link = ctx.last_death_link | ||
|  |             elif ctx.last_death_link > self.last_death_link: | ||
|  |                 self.last_death_link = ctx.last_death_link | ||
|  |                 await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["Deathlink"][0], [1], "WRAM")]) | ||
|  | 
 | ||
|  |         # BANK | ||
|  | 
 | ||
|  |         if self.banking_command: | ||
|  |             original_money = data["Money"] | ||
|  |             # Money is stored as binary-coded decimal. | ||
|  |             money = int(original_money.hex()) | ||
|  |             if self.banking_command > money: | ||
|  |                 logger.warning(f"You do not have ${self.banking_command} to deposit!") | ||
|  |             elif (-self.banking_command * BANK_EXCHANGE_RATE) > ctx.stored_data[f"EnergyLink{ctx.team}"]: | ||
|  |                 logger.warning("Not enough money in the EnergyLink storage!") | ||
|  |             else: | ||
|  |                 if self.banking_command + money > 999999: | ||
|  |                     self.banking_command = 999999 - money | ||
|  |                 money = str(money - self.banking_command).zfill(6) | ||
|  |                 money = [int(money[:2], 16), int(money[2:4], 16), int(money[4:], 16)] | ||
|  |                 money_written = await guarded_write(ctx.bizhawk_ctx, [(0x141F, money, "WRAM")], | ||
|  |                                                     [(0x141F, original_money, "WRAM")]) | ||
|  |                 if money_written: | ||
|  |                     if self.banking_command >= 0: | ||
|  |                         deposit = self.banking_command - int(self.banking_command / 4) | ||
|  |                         tax = self.banking_command - deposit | ||
|  |                         logger.info(f"Deposited ${deposit}, and charged a tax of ${tax}.") | ||
|  |                         self.banking_command = deposit | ||
|  |                     else: | ||
|  |                         logger.info(f"Withdrew ${-self.banking_command}.") | ||
|  |                     await ctx.send_msgs([{ | ||
|  |                         "cmd": "Set", "key": f"EnergyLink{ctx.team}", "operations": | ||
|  |                             [{"operation": "add", "value": self.banking_command * BANK_EXCHANGE_RATE}, | ||
|  |                              {"operation": "max", "value": 0}], | ||
|  |                     }]) | ||
|  |             self.banking_command = None | ||
|  | 
 | ||
|  |         # VICTORY | ||
|  | 
 | ||
|  |         if data["EventFlag"][280] & 1 and not ctx.finished_game: | ||
|  |             await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}]) | ||
|  |             ctx.finished_game = True | ||
|  | 
 | ||
|  |     def on_package(self, ctx, cmd, args): | ||
|  |         if cmd == 'Connected': | ||
|  |             if 'death_link' in args['slot_data'] and args['slot_data']['death_link']: | ||
|  |                 self.set_deathlink = True | ||
|  |                 self.last_death_link = time.time() | ||
|  |             ctx.set_notify(f"EnergyLink{ctx.team}") | ||
|  |         elif cmd == 'RoomInfo': | ||
|  |             if ctx.seed_name and ctx.seed_name != args["seed_name"]: | ||
|  |                 # CommonClient's on_package displays an error to the user in this case, but connection is not cancelled. | ||
|  |                 self.game_state = False | ||
|  |                 self.disconnect_pending = True | ||
|  |         super().on_package(ctx, cmd, args) | ||
|  | 
 | ||
|  | 
 | ||
|  | def cmd_bank(self, cmd: str = "", amount: str = ""): | ||
|  |     """Deposit or withdraw money with the server's EnergyLink storage.
 | ||
|  |     /bank - check server balance. | ||
|  |     /bank deposit # - deposit money. One quarter of the amount will be lost to taxation. | ||
|  |     /bank withdraw # - withdraw money.""" | ||
|  |     if self.ctx.game != "Pokemon Red and Blue": | ||
|  |         logger.warning("This command can only be used while playing Pokémon Red and Blue") | ||
|  |         return | ||
|  |     if not cmd: | ||
|  |         logger.info(f"Money available: {int(self.ctx.stored_data[f'EnergyLink{self.ctx.team}'] / BANK_EXCHANGE_RATE)}") | ||
|  |         return | ||
|  |     elif (not self.ctx.server) or self.ctx.server.socket.closed or not self.ctx.client_handler.game_state: | ||
|  |         logger.info(f"Must be connected to server and in game.") | ||
|  |     elif not amount: | ||
|  |         logger.warning("You must specify an amount.") | ||
|  |     elif cmd == "withdraw": | ||
|  |         self.ctx.client_handler.banking_command = -int(amount) | ||
|  |     elif cmd == "deposit": | ||
|  |         if int(amount) < 4: | ||
|  |             logger.warning("You must deposit at least $4, for tax purposes.") | ||
|  |             return | ||
|  |         self.ctx.client_handler.banking_command = int(amount) | ||
|  |     else: | ||
|  |         logger.warning(f"Invalid bank command {cmd}") | ||
|  |         return |