279 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			279 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import base64
 | |
| import logging
 | |
| import time
 | |
| 
 | |
| from NetUtils import ClientStatus
 | |
| from worlds._bizhawk.client import BizHawkClient
 | |
| from worlds._bizhawk import read, write, guarded_write
 | |
| 
 | |
| from .locations import location_data
 | |
| 
 | |
| logger = logging.getLogger("Client")
 | |
| 
 | |
| BANK_EXCHANGE_RATE = 50000000
 | |
| 
 | |
| DATA_LOCATIONS = {
 | |
|     "ItemIndex": (0x1A6E, 0x02),
 | |
|     "Deathlink": (0x00FD, 0x01),
 | |
|     "APItem": (0x00FF, 0x01),
 | |
|     "EventFlag": (0x1735, 0x140),
 | |
|     "Missable": (0x161A, 0x20),
 | |
|     "Hidden": (0x16DE, 0x0E),
 | |
|     "Rod": (0x1716, 0x01),
 | |
|     "DexSanityFlag": (0x1A71, 19),
 | |
|     "GameStatus": (0x1A84, 0x01),
 | |
|     "Money": (0x141F, 3),
 | |
|     "ResetCheck": (0x0100, 4),
 | |
|     # First and second Vermilion Gym trash can selection. Second is not used, so should always be 0.
 | |
|     # First should never be above 0x0F. This is just before Event Flags.
 | |
|     "CrashCheck1": (0x1731, 2),
 | |
|     # Unused, should always be 0. This is just before Missables flags.
 | |
|     "CrashCheck2": (0x1617, 1),
 | |
|     # Progressive keys, should never be above 10. Just before Dexsanity flags.
 | |
|     "CrashCheck3": (0x1A70, 1),
 | |
|     # Route 18 Gate script value. Should never be above 3. Just before Hidden items flags.
 | |
|     "CrashCheck4": (0x16DD, 1),
 | |
| }
 | |
| 
 | |
| location_map = {"Rod": {}, "EventFlag": {}, "Missable": {}, "Hidden": {}, "list": {}, "DexSanityFlag": {}}
 | |
| location_bytes_bits = {}
 | |
| for location in location_data:
 | |
|     if location.ram_address is not None:
 | |
|         if type(location.ram_address) == list:
 | |
|             location_map[type(location.ram_address).__name__][(location.ram_address[0].flag, location.ram_address[1].flag)] = location.address
 | |
|             location_bytes_bits[location.address] = [{'byte': location.ram_address[0].byte, 'bit': location.ram_address[0].bit},
 | |
|                                                      {'byte': location.ram_address[1].byte, 'bit': location.ram_address[1].bit}]
 | |
|         else:
 | |
|             location_map[type(location.ram_address).__name__][location.ram_address.flag] = location.address
 | |
|             location_bytes_bits[location.address] = {'byte': location.ram_address.byte, 'bit': location.ram_address.bit}
 | |
| 
 | |
| location_name_to_id = {location.name: location.address for location in location_data if location.type == "Item"
 | |
|                        and location.address is not None}
 | |
| 
 | |
| 
 | |
| class PokemonRBClient(BizHawkClient):
 | |
|     system = ("GB", "SGB")
 | |
|     patch_suffix = (".apred", ".apblue")
 | |
|     game = "Pokemon Red and Blue"
 | |
| 
 | |
|     def __init__(self):
 | |
|         super().__init__()
 | |
|         self.auto_hints = set()
 | |
|         self.locations_array = None
 | |
|         self.disconnect_pending = False
 | |
|         self.set_deathlink = False
 | |
|         self.banking_command = None
 | |
|         self.game_state = False
 | |
|         self.last_death_link = 0
 | |
| 
 | |
|     async def validate_rom(self, ctx):
 | |
|         game_name = await read(ctx.bizhawk_ctx, [(0x134, 12, "ROM")])
 | |
|         game_name = game_name[0].decode("ascii")
 | |
|         if game_name in ("POKEMON RED\00", "POKEMON BLUE"):
 | |
|             ctx.game = self.game
 | |
|             ctx.items_handling = 0b001
 | |
|             ctx.command_processor.commands["bank"] = cmd_bank
 | |
|             seed_name = await read(ctx.bizhawk_ctx, [(0xFFDB, 21, "ROM")])
 | |
|             ctx.seed_name = seed_name[0].split(b"\0")[0].decode("ascii")
 | |
|             self.set_deathlink = False
 | |
|             self.banking_command = None
 | |
|             self.locations_array = None
 | |
|             self.disconnect_pending = False
 | |
|             return True
 | |
|         return False
 | |
| 
 | |
|     async def set_auth(self, ctx):
 | |
|         auth_name = await read(ctx.bizhawk_ctx, [(0xFFC6, 21, "ROM")])
 | |
|         if auth_name[0] == bytes([0] * 21):
 | |
|             # rom was patched before rom names implemented, use player name
 | |
|             auth_name = await read(ctx.bizhawk_ctx, [(0xFFF0, 16, "ROM")])
 | |
|             auth_name = auth_name[0].decode("ascii").split("\x00")[0]
 | |
|         else:
 | |
|             auth_name = base64.b64encode(auth_name[0]).decode()
 | |
|         ctx.auth = auth_name
 | |
| 
 | |
|     async def game_watcher(self, ctx):
 | |
|         if not ctx.server or not ctx.server.socket.open or ctx.server.socket.closed:
 | |
|             return
 | |
| 
 | |
|         data = await read(ctx.bizhawk_ctx, [(loc_data[0], loc_data[1], "WRAM")
 | |
|                                             for loc_data in DATA_LOCATIONS.values()])
 | |
|         data = {data_set_name: data_name for data_set_name, data_name in zip(DATA_LOCATIONS.keys(), data)}
 | |
| 
 | |
|         if self.set_deathlink:
 | |
|             self.set_deathlink = False
 | |
|             await ctx.update_death_link(True)
 | |
| 
 | |
|         if self.disconnect_pending:
 | |
|             self.disconnect_pending = False
 | |
|             await ctx.disconnect()
 | |
| 
 | |
|         if data["GameStatus"][0] == 0 or data["ResetCheck"] == b'\xff\xff\xff\x7f':
 | |
|             # Do not handle anything before game save is loaded
 | |
|             self.game_state = False
 | |
|             return
 | |
|         elif (data["GameStatus"][0] not in (0x2A, 0xAC)
 | |
|               or data["CrashCheck1"][0] & 0xF0 or data["CrashCheck1"][1] & 0xFF
 | |
|               or data["CrashCheck2"][0]
 | |
|               or data["CrashCheck3"][0] > 10
 | |
|               or data["CrashCheck4"][0] > 3):
 | |
|             # Should mean game crashed
 | |
|             logger.warning("Pokémon Red/Blue game may have crashed. Disconnecting from server.")
 | |
|             self.game_state = False
 | |
|             await ctx.disconnect()
 | |
|             return
 | |
|         self.game_state = True
 | |
| 
 | |
|         # SEND ITEMS TO CLIENT
 | |
| 
 | |
|         if data["APItem"][0] == 0:
 | |
|             item_index = int.from_bytes(data["ItemIndex"], "little")
 | |
|             if len(ctx.items_received) > item_index:
 | |
|                 item_code = ctx.items_received[item_index].item - 172000000
 | |
|                 if item_code > 255:
 | |
|                     item_code -= 256
 | |
|                 await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["APItem"][0],
 | |
|                                                [item_code], "WRAM")])
 | |
| 
 | |
|         # LOCATION CHECKS
 | |
| 
 | |
|         locations = set()
 | |
| 
 | |
|         for flag_type, loc_map in location_map.items():
 | |
|             for flag, loc_id in loc_map.items():
 | |
|                 if flag_type == "list":
 | |
|                     if (data["EventFlag"][location_bytes_bits[loc_id][0]['byte']] & 1 <<
 | |
|                             location_bytes_bits[loc_id][0]['bit']
 | |
|                             and data["Missable"][location_bytes_bits[loc_id][1]['byte']] & 1 <<
 | |
|                             location_bytes_bits[loc_id][1]['bit']):
 | |
|                         locations.add(loc_id)
 | |
|                 elif data[flag_type][location_bytes_bits[loc_id]['byte']] & 1 << location_bytes_bits[loc_id]['bit']:
 | |
|                     locations.add(loc_id)
 | |
| 
 | |
|         if locations != self.locations_array:
 | |
|             if locations:
 | |
|                 self.locations_array = locations
 | |
|                 await ctx.send_msgs([{"cmd": "LocationChecks", "locations": list(locations)}])
 | |
| 
 | |
|         # AUTO HINTS
 | |
| 
 | |
|         hints = []
 | |
|         if data["EventFlag"][280] & 16:
 | |
|             hints.append("Cerulean Bicycle Shop")
 | |
|         if data["EventFlag"][280] & 32:
 | |
|             hints.append("Route 2 Gate - Oak's Aide")
 | |
|         if data["EventFlag"][280] & 64:
 | |
|             hints.append("Route 11 Gate 2F - Oak's Aide")
 | |
|         if data["EventFlag"][280] & 128:
 | |
|             hints.append("Route 15 Gate 2F - Oak's Aide")
 | |
|         if data["EventFlag"][281] & 1:
 | |
|             hints += ["Celadon Prize Corner - Item Prize 1", "Celadon Prize Corner - Item Prize 2",
 | |
|                       "Celadon Prize Corner - Item Prize 3"]
 | |
|         if (location_name_to_id["Fossil - Choice A"] in ctx.checked_locations and location_name_to_id[
 | |
|             "Fossil - Choice B"]
 | |
|                 not in ctx.checked_locations):
 | |
|             hints.append("Fossil - Choice B")
 | |
|         elif (location_name_to_id["Fossil - Choice B"] in ctx.checked_locations and location_name_to_id[
 | |
|             "Fossil - Choice A"]
 | |
|               not in ctx.checked_locations):
 | |
|             hints.append("Fossil - Choice A")
 | |
|         hints = [
 | |
|             location_name_to_id[loc] for loc in hints if location_name_to_id[loc] not in self.auto_hints and
 | |
|                                                          location_name_to_id[loc] in ctx.missing_locations and
 | |
|                                                          location_name_to_id[loc] not in ctx.locations_checked
 | |
|         ]
 | |
|         if hints:
 | |
|             await ctx.send_msgs([{"cmd": "LocationScouts", "locations": hints, "create_as_hint": 2}])
 | |
|         self.auto_hints.update(hints)
 | |
| 
 | |
|         # DEATHLINK
 | |
| 
 | |
|         if "DeathLink" in ctx.tags:
 | |
|             if data["Deathlink"][0] == 3:
 | |
|                 await ctx.send_death(ctx.player_names[ctx.slot] + " is out of usable Pokémon! "
 | |
|                                      + ctx.player_names[ctx.slot] + " blacked out!")
 | |
|                 await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["Deathlink"][0], [0], "WRAM")])
 | |
|                 self.last_death_link = ctx.last_death_link
 | |
|             elif ctx.last_death_link > self.last_death_link:
 | |
|                 self.last_death_link = ctx.last_death_link
 | |
|                 await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["Deathlink"][0], [1], "WRAM")])
 | |
| 
 | |
|         # BANK
 | |
| 
 | |
|         if self.banking_command:
 | |
|             original_money = data["Money"]
 | |
|             # Money is stored as binary-coded decimal.
 | |
|             money = int(original_money.hex())
 | |
|             if self.banking_command > money:
 | |
|                 logger.warning(f"You do not have ${self.banking_command} to deposit!")
 | |
|             elif (-self.banking_command * BANK_EXCHANGE_RATE) > (ctx.stored_data[f"EnergyLink{ctx.team}"] or 0):
 | |
|                 logger.warning("Not enough money in the EnergyLink storage!")
 | |
|             else:
 | |
|                 if self.banking_command + money > 999999:
 | |
|                     self.banking_command = 999999 - money
 | |
|                 money = str(money - self.banking_command).zfill(6)
 | |
|                 money = [int(money[:2], 16), int(money[2:4], 16), int(money[4:], 16)]
 | |
|                 money_written = await guarded_write(ctx.bizhawk_ctx, [(0x141F, money, "WRAM")],
 | |
|                                                     [(0x141F, original_money, "WRAM")])
 | |
|                 if money_written:
 | |
|                     if self.banking_command >= 0:
 | |
|                         deposit = self.banking_command - int(self.banking_command / 4)
 | |
|                         tax = self.banking_command - deposit
 | |
|                         logger.info(f"Deposited ${deposit}, and charged a tax of ${tax}.")
 | |
|                         self.banking_command = deposit
 | |
|                     else:
 | |
|                         logger.info(f"Withdrew ${-self.banking_command}.")
 | |
|                     await ctx.send_msgs([{
 | |
|                         "cmd": "Set", "key": f"EnergyLink{ctx.team}", "operations":
 | |
|                             [{"operation": "add", "value": self.banking_command * BANK_EXCHANGE_RATE},
 | |
|                              {"operation": "max", "value": 0}],
 | |
|                     }])
 | |
|             self.banking_command = None
 | |
| 
 | |
|         # VICTORY
 | |
| 
 | |
|         if data["EventFlag"][280] & 1 and not ctx.finished_game:
 | |
|             await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
 | |
|             ctx.finished_game = True
 | |
| 
 | |
|     def on_package(self, ctx, cmd, args):
 | |
|         if cmd == 'Connected':
 | |
|             if 'death_link' in args['slot_data'] and args['slot_data']['death_link']:
 | |
|                 self.set_deathlink = True
 | |
|                 self.last_death_link = time.time()
 | |
|             ctx.set_notify(f"EnergyLink{ctx.team}")
 | |
|         elif cmd == 'RoomInfo':
 | |
|             if ctx.seed_name and ctx.seed_name != args["seed_name"]:
 | |
|                 # CommonClient's on_package displays an error to the user in this case, but connection is not cancelled.
 | |
|                 self.game_state = False
 | |
|                 self.disconnect_pending = True
 | |
|         super().on_package(ctx, cmd, args)
 | |
| 
 | |
| 
 | |
| def cmd_bank(self, cmd: str = "", amount: str = ""):
 | |
|     """Deposit or withdraw money with the server's EnergyLink storage.
 | |
|     /bank - check server balance.
 | |
|     /bank deposit # - deposit money. One quarter of the amount will be lost to taxation.
 | |
|     /bank withdraw # - withdraw money."""
 | |
|     if self.ctx.game != "Pokemon Red and Blue":
 | |
|         logger.warning("This command can only be used while playing Pokémon Red and Blue")
 | |
|         return
 | |
|     if (not self.ctx.server) or self.ctx.server.socket.closed or not self.ctx.client_handler.game_state:
 | |
|         logger.info(f"Must be connected to server and in game.")
 | |
|         return
 | |
|     elif not cmd:
 | |
|         logger.info(f"Money available: {int((self.ctx.stored_data[f'EnergyLink{self.ctx.team}'] or 0) / BANK_EXCHANGE_RATE)}")
 | |
|         return
 | |
|     elif not amount:
 | |
|         logger.warning("You must specify an amount.")
 | |
|     elif cmd == "withdraw":
 | |
|         self.ctx.client_handler.banking_command = -int(amount)
 | |
|     elif cmd == "deposit":
 | |
|         if int(amount) < 4:
 | |
|             logger.warning("You must deposit at least $4, for tax purposes.")
 | |
|             return
 | |
|         self.ctx.client_handler.banking_command = int(amount)
 | |
|     else:
 | |
|         logger.warning(f"Invalid bank command {cmd}")
 | |
|         return
 | 
