from typing import TYPE_CHECKING import asyncio import NetUtils from .Locations import grinch_locations, GrinchLocation from .Items import ALL_ITEMS_TABLE, SLEIGH_PARTS_TABLE, MISSION_ITEMS_TABLE, GADGETS_TABLE, KEYS_TABLE, GrinchItemData import worlds._bizhawk as bizhawk from worlds._bizhawk.client import BizHawkClient if TYPE_CHECKING: from worlds._bizhawk.context import BizHawkClientContext from CommonClient import logger # Stores received index of last item received in PS1 memory card save data # By storing this index, it will remember the last item received and prevent item duplication loops RECV_ITEM_ADDR = 0x010068 RECV_ITEM_BITSIZE = 4 # Maximum number of times we check if we are in demo mode or not MAX_DEMO_MODE_CHECK = 30 class GrinchClient(BizHawkClient): game = "The Grinch" system = "PSX" patch_suffix = ".apgrinch" items_handling = 0b111 demo_mode_buffer = 0 last_map_location = -1 ingame_log = False def __init__(self): super().__init__() self.last_received_index = 0 self.loading_bios_msg = False self.loc_unlimited_eggs = False async def validate_rom(self, ctx: "BizHawkClientContext") -> bool: from CommonClient import logger # TODO Check the ROM data to see if it matches against bytes expected grinch_identifier_ram_address: int = 0x00928C bios_identifier_ram_address: int = 0x097F30 try: bytes_actual: bytes = (await bizhawk.read(ctx.bizhawk_ctx, [( grinch_identifier_ram_address, 11, "MainRAM")]))[0] psx_rom_name = bytes_actual.decode("ascii") if psx_rom_name != "SLUS_011.97": bios_bytes_check: bytes = (await bizhawk.read(ctx.bizhawk_ctx, [( bios_identifier_ram_address, 24, "MainRAM")]))[0] if "System ROM Version" in bios_bytes_check.decode("ascii"): if not self.loading_bios_msg: self.loading_bios_msg = True logger.error("BIOS is currently loading. Will wait up to 5 seconds before retrying.") return False logger.error("Invalid rom detected. You are not playing Grinch USA Version.") raise Exception("Invalid rom detected. You are not playing Grinch USA Version.") except Exception: return False ctx.game = self.game ctx.items_handling = self.items_handling ctx.want_slot_data = True ctx.watcher_timeout = 0.125 self.loading_bios_msg = False return True def on_package(self, ctx: "BizHawkClientContext", cmd: str, args: dict) -> None: super().on_package(ctx, cmd, args) match cmd: case "Connected": # On Connect self.loc_unlimited_eggs = bool(ctx.slot_data["give_unlimited_eggs"]) async def set_auth(self, ctx: "BizHawkClientContext") -> None: await ctx.get_username() async def game_watcher(self, ctx: "BizHawkClientContext") -> None: #If the player is not connected to an AP Server, or their connection was disconnected. if ctx.server is None or ctx.server.socket.closed or ctx.slot_data is None: return try: if not await self.ingame_checker(ctx): return await self.constant_address_update(ctx) await self.location_checker(ctx) await self.receiving_items_handler(ctx) await self.goal_checker(ctx) await self.option_handler(ctx) except bizhawk.RequestFailedError: # The connector didn't respond. Exit handler and return to main loop to reconnect pass async def location_checker(self, ctx: "BizHawkClientContext"): # Update the AP Server to know what locations are not checked yet. local_locations_checked: list[int] = [] for missing_location in grinch_locations.keys(): # local_location = ctx.location_names.lookup_in_game(missing_location) # Missing location is the AP ID & we need to convert it back to a location name within our game. # Using the location name, we can then get the Grinch ram data from there. grinch_loc_ram_data = grinch_locations[missing_location] if grinch_loc_ram_data.id is None or not GrinchLocation.get_apid(grinch_loc_ram_data.id) in ctx.missing_locations: continue # Grinch ram data may have more than one address to update, so we are going to loop through all addresses in a location # We use a list here to keep track of all our checks. If they are all true, then and only then do we mark that location as checked. ram_checked_list: list[bool] = [] for addr_to_update in grinch_loc_ram_data.update_ram_addr: is_binary = True if not addr_to_update.binary_bit_pos is None else False current_ram_address_value = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [( addr_to_update.ram_address, addr_to_update.bit_size, "MainRAM")]))[0], "little") if is_binary: ram_checked_list.append((current_ram_address_value & (1 << addr_to_update.binary_bit_pos)) > 0) else: expected_int_value = addr_to_update.value ram_checked_list.append(expected_int_value == current_ram_address_value) if all(ram_checked_list): local_locations_checked.append(GrinchLocation.get_apid(grinch_loc_ram_data.id)) # Update the AP server with the locally checked list of locations (In other words, locations I found in Grinch) await ctx.check_locations(local_locations_checked) ctx.locations_checked = set(local_locations_checked) async def receiving_items_handler(self, ctx: "BizHawkClientContext"): # Len will give us the size of the items received list & we will track that against how many items we received already # If the list says that we have 3 items and we already received items, we will ignore and continue. # Otherwise, we will get the new items and give them to the player. self.last_received_index = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [( RECV_ITEM_ADDR, RECV_ITEM_BITSIZE, "MainRAM")]))[0], "little") if len(ctx.items_received) == self.last_received_index: return # Ensures we only get the new items that we want to give the player new_items_only = ctx.items_received[self.last_received_index:] for item_received in new_items_only: local_item = ctx.item_names.lookup_in_game(item_received.item) grinch_item_ram_data = ALL_ITEMS_TABLE[local_item] for addr_to_update in grinch_item_ram_data.update_ram_addr: is_binary = True if not addr_to_update.binary_bit_pos is None else False current_ram_address_value = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [( addr_to_update.ram_address, addr_to_update.bit_size, "MainRAM")]))[0], "little") if is_binary: current_ram_address_value = (current_ram_address_value | (1 << addr_to_update.binary_bit_pos)) elif addr_to_update.update_existing_value: # Grabs minimum value of a list of numbers and makes sure it does not go above max count possible current_ram_address_value += addr_to_update.value current_ram_address_value = min(current_ram_address_value, addr_to_update.max_count) else: current_ram_address_value = addr_to_update.value # Write the updated value back into RAM await self.update_and_validate_address(ctx, addr_to_update.ram_address, current_ram_address_value, addr_to_update.bit_size) # await bizhawk.write(ctx.bizhawk_ctx, [(addr_to_update.ram_address, # current_ram_address_value.to_bytes(addr_to_update.bit_size, "little"), "MainRAM")]) self.last_received_index += 1 await self.update_and_validate_address(ctx, RECV_ITEM_ADDR, self.last_received_index, RECV_ITEM_BITSIZE) async def goal_checker(self, ctx: "BizHawkClientContext"): if not ctx.finished_game: goal_loc = grinch_locations["Neutralizing Santa"] goal_ram_address = goal_loc.update_ram_addr[0] current_ram_address_value = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [( goal_ram_address.ram_address, goal_ram_address.bit_size, "MainRAM")]))[0], "little") if (current_ram_address_value & (1 << goal_ram_address.binary_bit_pos)) > 0: ctx.finished_game = True await ctx.send_msgs([{ "cmd": "StatusUpdate", "status": NetUtils.ClientStatus.CLIENT_GOAL, }]) async def constant_address_update(self, ctx: "BizHawkClientContext"): list_recv_itemids: list[int] = [netItem.item for netItem in ctx.items_received] items_to_check: dict[str, GrinchItemData] = {**SLEIGH_PARTS_TABLE, **MISSION_ITEMS_TABLE, **GADGETS_TABLE, **KEYS_TABLE} heart_count = len(list(item_id for item_id in list_recv_itemids if item_id == 42570)) heart_item_data = ALL_ITEMS_TABLE["Heart of Stone"] await self.update_and_validate_address(ctx, heart_item_data.update_ram_addr[0].ram_address, min(heart_count, 4), 1) for (item_name, item_data) in items_to_check.items(): # If item is an event or already been received, ignore. if item_data.id is None or GrinchLocation.get_apid(item_data.id) in list_recv_itemids: continue # This assumes we don't have the item so we must set all the data to 0 for addr_to_update in item_data.update_ram_addr: is_binary = True if not addr_to_update.binary_bit_pos is None else False if is_binary: await self.update_and_validate_address(ctx, addr_to_update.ram_address, 0, 1) else: await self.update_and_validate_address(ctx, addr_to_update.ram_address, 0, 1) async def ingame_checker(self, ctx: "BizHawkClientContext"): ingame_map_id = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [( 0x010000, 1, "MainRAM")]))[0], "little") if not ingame_map_id == self.last_map_location: self.last_map_location = ingame_map_id self.demo_mode_buffer = 0 self.ingame_log = False return False demo_mode = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [( 0x01008A, 1, "MainRAM")]))[0], "little") if demo_mode == 1: return False if ingame_map_id <= 0x04 or ingame_map_id >= 0x35: return False if not self.demo_mode_buffer == MAX_DEMO_MODE_CHECK: await asyncio.sleep(0.1) self.demo_mode_buffer += 1 return False if not self.ingame_log: print("You can now start sending locations to the Grinch!") self.ingame_log = True return True async def option_handler(self, ctx: "BizHawkClientContext"): if self.loc_unlimited_eggs: max_eggs: int = 200 await bizhawk.write(ctx.bizhawk_ctx, [(0x010058, max_eggs.to_bytes(2,"little"), "MainRAM")]) async def update_and_validate_address(self, ctx: "BizHawkClientContext", address_to_validate: int, expected_value: int, byte_size: int): await bizhawk.write(ctx.bizhawk_ctx, [(address_to_validate, expected_value.to_bytes(byte_size, "little"), "MainRAM")]) current_value = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(address_to_validate, byte_size, "MainRAM")]))[0], "little") if not current_value == expected_value: if address_to_validate == 0x010000 or address_to_validate == 0x08FB94: # TODO Temporairly skips teleportation addresses; to be changed later on. return raise Exception("Unable to update address as expected. Address: "+ str(address_to_validate)+"; Expected Value: "+str(expected_value))