2025-08-14 00:23:40 -04:00
|
|
|
import asyncio
|
2025-08-17 21:22:12 -04:00
|
|
|
from os import write
|
2025-08-04 23:03:31 -04:00
|
|
|
from typing import TYPE_CHECKING
|
2025-08-16 02:24:19 -04:00
|
|
|
|
|
|
|
import NetUtils
|
2025-08-17 21:22:12 -04:00
|
|
|
from .Locations import grinch_locations, GrinchLocation
|
2025-08-25 18:27:19 -04:00
|
|
|
from .Items import ALL_ITEMS_TABLE, SLEIGH_PARTS_TABLE, MISSION_ITEMS_TABLE, GADGETS_TABLE, KEYS_TABLE, GrinchItemData
|
2025-08-04 23:03:31 -04:00
|
|
|
import worlds._bizhawk as bizhawk
|
|
|
|
from worlds._bizhawk.client import BizHawkClient
|
|
|
|
from worlds.Files import APDeltaPatch
|
2025-08-14 00:23:40 -04:00
|
|
|
|
2025-08-04 23:03:31 -04:00
|
|
|
if TYPE_CHECKING:
|
|
|
|
from worlds._bizhawk.context import BizHawkClientContext
|
2025-08-06 23:34:52 -04:00
|
|
|
from CommonClient import logger
|
2025-08-04 23:03:31 -04:00
|
|
|
|
2025-08-19 01:59:18 -04:00
|
|
|
# Stores received index of last item received in PS1 memory card save data
|
|
|
|
# By storing this index, it will remember the last item received and prevent item duplication loops
|
|
|
|
RECV_ITEM_ADDR = 0x010064
|
|
|
|
RECV_ITEM_BITSIZE = 4
|
2025-08-04 23:03:31 -04:00
|
|
|
|
|
|
|
class GrinchClient(BizHawkClient):
|
|
|
|
game = "The Grinch"
|
|
|
|
system = "PSX"
|
|
|
|
patch_suffix = ".apgrinch"
|
2025-08-06 23:34:52 -04:00
|
|
|
items_handling = 0b111
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
super().__init__()
|
2025-08-14 00:23:40 -04:00
|
|
|
self.last_received_index = 0
|
|
|
|
self.loading_bios_msg = False
|
|
|
|
self.loc_unlimited_eggs = False
|
2025-08-04 23:03:31 -04:00
|
|
|
|
|
|
|
async def validate_rom(self, ctx: "BizHawkClientContext") -> bool:
|
2025-08-07 00:38:42 -04:00
|
|
|
from CommonClient import logger
|
2025-08-06 23:34:52 -04:00
|
|
|
|
2025-08-07 00:38:42 -04:00
|
|
|
# TODO Check the ROM data to see if it matches against bytes expected
|
2025-08-04 23:03:31 -04:00
|
|
|
grinch_identifier_ram_address: int = 0x00928C
|
2025-08-14 00:23:40 -04:00
|
|
|
bios_identifier_ram_address: int = 0x097F30
|
2025-08-04 23:03:31 -04:00
|
|
|
try:
|
|
|
|
bytes_actual: bytes = (await bizhawk.read(ctx.bizhawk_ctx, [(
|
2025-08-06 23:34:52 -04:00
|
|
|
grinch_identifier_ram_address, 11, "MainRAM")]))[0]
|
|
|
|
|
|
|
|
psx_rom_name = bytes_actual.decode("ascii")
|
|
|
|
if psx_rom_name != "SLUS_011.97":
|
2025-08-14 00:23:40 -04:00
|
|
|
bios_bytes_check: bytes = (await bizhawk.read(ctx.bizhawk_ctx, [(
|
|
|
|
bios_identifier_ram_address, 24, "MainRAM")]))[0]
|
|
|
|
if "System ROM Version" in bios_bytes_check.decode("ascii"):
|
|
|
|
if not self.loading_bios_msg:
|
|
|
|
self.loading_bios_msg = True
|
|
|
|
logger.error("BIOS is currently loading. Will wait up to 5 seconds before retrying.")
|
|
|
|
return False
|
|
|
|
|
2025-08-07 00:38:42 -04:00
|
|
|
logger.error("Invalid rom detected. You are not playing Grinch USA Version.")
|
2025-08-06 23:34:52 -04:00
|
|
|
raise Exception("Invalid rom detected. You are not playing Grinch USA Version.")
|
2025-08-14 00:23:40 -04:00
|
|
|
except Exception:
|
2025-08-06 23:34:52 -04:00
|
|
|
return False
|
|
|
|
|
|
|
|
ctx.game = self.game
|
|
|
|
ctx.items_handling = self.items_handling
|
|
|
|
ctx.want_slot_data = True
|
2025-08-07 00:38:42 -04:00
|
|
|
ctx.watcher_timeout = 0.125
|
2025-08-14 00:23:40 -04:00
|
|
|
self.loading_bios_msg = False
|
2025-08-06 23:34:52 -04:00
|
|
|
|
|
|
|
return True
|
|
|
|
|
2025-08-19 01:59:18 -04:00
|
|
|
def on_package(self, ctx: "BizHawkClientContext", cmd: str, args: dict) -> None:
|
|
|
|
super().on_package(ctx, cmd, args)
|
|
|
|
match cmd:
|
|
|
|
case "Connected": # On Connect
|
|
|
|
self.loc_unlimited_eggs = bool(ctx.slot_data["give_unlimited_eggs"])
|
2025-08-14 00:23:40 -04:00
|
|
|
|
2025-08-06 23:34:52 -04:00
|
|
|
async def set_auth(self, ctx: "BizHawkClientContext") -> None:
|
|
|
|
await ctx.get_username()
|
2025-08-04 23:03:31 -04:00
|
|
|
|
|
|
|
async def game_watcher(self, ctx: "BizHawkClientContext") -> None:
|
2025-08-06 23:34:52 -04:00
|
|
|
#If the player is not connected to an AP Server, or their connection was disconnected.
|
|
|
|
if ctx.server is None or ctx.server.socket.closed or ctx.slot_data is None:
|
|
|
|
return
|
|
|
|
|
2025-08-04 23:03:31 -04:00
|
|
|
try:
|
2025-08-19 01:59:18 -04:00
|
|
|
if not await self.ingame_checker(ctx):
|
2025-08-14 00:23:40 -04:00
|
|
|
return
|
2025-08-06 23:34:52 -04:00
|
|
|
# # Read save data
|
|
|
|
# save_data = await bizhawk.read(
|
|
|
|
# ctx.bizhawk_ctx,
|
|
|
|
# [(0x3000100, 20, "System Bus")]
|
|
|
|
# )[0]
|
|
|
|
#
|
|
|
|
# # Check locations
|
|
|
|
# if save_data[2] & 0x04:
|
|
|
|
# await ctx.send_msgs([{
|
|
|
|
# "cmd": "LocationChecks",
|
|
|
|
# "locations": [23]
|
|
|
|
# }])
|
|
|
|
#
|
|
|
|
# # Send game clear
|
|
|
|
# if not ctx.finished_game and (save_data[5] & 0x01):
|
|
|
|
# await ctx.send_msgs([{
|
|
|
|
# "cmd": "StatusUpdate",
|
|
|
|
# "status": ClientStatus.CLIENT_GOAL
|
|
|
|
# }])
|
|
|
|
await self.location_checker(ctx)
|
2025-08-07 00:38:42 -04:00
|
|
|
await self.receiving_items_handler(ctx)
|
2025-08-16 02:24:19 -04:00
|
|
|
await self.goal_checker(ctx)
|
2025-08-25 18:27:19 -04:00
|
|
|
await self.constant_address_update(ctx)
|
2025-08-19 01:59:18 -04:00
|
|
|
await self.option_handler(ctx)
|
2025-08-04 23:03:31 -04:00
|
|
|
|
|
|
|
except bizhawk.RequestFailedError:
|
|
|
|
# The connector didn't respond. Exit handler and return to main loop to reconnect
|
2025-08-06 23:34:52 -04:00
|
|
|
pass
|
|
|
|
|
|
|
|
async def location_checker(self, ctx: "BizHawkClientContext"):
|
|
|
|
# Update the AP Server to know what locations are not checked yet.
|
2025-08-14 00:23:40 -04:00
|
|
|
local_locations_checked: list[int] = []
|
2025-08-17 21:22:12 -04:00
|
|
|
for missing_location in grinch_locations.keys():
|
|
|
|
# local_location = ctx.location_names.lookup_in_game(missing_location)
|
2025-08-06 23:34:52 -04:00
|
|
|
# Missing location is the AP ID & we need to convert it back to a location name within our game.
|
|
|
|
# Using the location name, we can then get the Grinch ram data from there.
|
2025-08-17 21:22:12 -04:00
|
|
|
grinch_loc_ram_data = grinch_locations[missing_location]
|
2025-08-19 01:59:18 -04:00
|
|
|
if grinch_loc_ram_data.id is None or not GrinchLocation.get_apid(grinch_loc_ram_data.id) in ctx.missing_locations:
|
2025-08-17 21:22:12 -04:00
|
|
|
continue
|
2025-08-06 23:34:52 -04:00
|
|
|
|
|
|
|
# Grinch ram data may have more than one address to update, so we are going to loop through all addresses in a location
|
2025-08-25 17:03:48 -04:00
|
|
|
# We use a list here to keep track of all our checks. If they are all true, then and only then do we mark that location as checked.
|
|
|
|
ram_checked_list: list[bool] = []
|
2025-08-06 23:34:52 -04:00
|
|
|
for addr_to_update in grinch_loc_ram_data.update_ram_addr:
|
|
|
|
is_binary = True if not addr_to_update.binary_bit_pos is None else False
|
|
|
|
current_ram_address_value = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
|
2025-08-07 00:38:42 -04:00
|
|
|
addr_to_update.ram_address, addr_to_update.bit_size, "MainRAM")]))[0], "little")
|
2025-08-06 23:34:52 -04:00
|
|
|
if is_binary:
|
2025-08-25 17:03:48 -04:00
|
|
|
ram_checked_list.append((current_ram_address_value & (1 << addr_to_update.binary_bit_pos)) > 0)
|
2025-08-06 23:34:52 -04:00
|
|
|
else:
|
|
|
|
expected_int_value = addr_to_update.value
|
2025-08-25 17:03:48 -04:00
|
|
|
ram_checked_list.append(expected_int_value == current_ram_address_value)
|
|
|
|
if all(ram_checked_list):
|
|
|
|
local_locations_checked.append(GrinchLocation.get_apid(grinch_loc_ram_data.id))
|
2025-08-06 23:34:52 -04:00
|
|
|
|
|
|
|
# Update the AP server with the locally checked list of locations (In other words, locations I found in Grinch)
|
2025-08-14 00:23:40 -04:00
|
|
|
await ctx.check_locations(local_locations_checked)
|
|
|
|
ctx.locations_checked = set(local_locations_checked)
|
2025-08-06 23:34:52 -04:00
|
|
|
|
|
|
|
async def receiving_items_handler(self, ctx: "BizHawkClientContext"):
|
2025-08-07 00:38:42 -04:00
|
|
|
# Len will give us the size of the items received list & we will track that against how many items we received already
|
|
|
|
# If the list says that we have 3 items and we already received items, we will ignore and continue.
|
|
|
|
# Otherwise, we will get the new items and give them to the player.
|
2025-08-19 01:59:18 -04:00
|
|
|
|
|
|
|
self.last_received_index = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
|
|
|
|
RECV_ITEM_ADDR, RECV_ITEM_BITSIZE, "MainRAM")]))[0], "little")
|
2025-08-07 00:38:42 -04:00
|
|
|
if len(ctx.items_received) == self.last_received_index:
|
|
|
|
return
|
|
|
|
|
|
|
|
# Ensures we only get the new items that we want to give the player
|
|
|
|
new_items_only = ctx.items_received[self.last_received_index:]
|
|
|
|
|
|
|
|
for item_received in new_items_only:
|
|
|
|
local_item = ctx.item_names.lookup_in_game(item_received.item)
|
2025-08-06 23:34:52 -04:00
|
|
|
grinch_item_ram_data = ALL_ITEMS_TABLE[local_item]
|
|
|
|
|
|
|
|
for addr_to_update in grinch_item_ram_data.update_ram_addr:
|
2025-08-07 00:38:42 -04:00
|
|
|
is_binary = True if not addr_to_update.binary_bit_pos is None else False
|
2025-08-14 00:23:40 -04:00
|
|
|
current_ram_address_value = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
|
|
|
|
addr_to_update.ram_address, addr_to_update.bit_size, "MainRAM")]))[0], "little")
|
2025-08-07 00:38:42 -04:00
|
|
|
if is_binary:
|
|
|
|
current_ram_address_value = (current_ram_address_value | (1 << addr_to_update.binary_bit_pos))
|
2025-08-14 00:23:40 -04:00
|
|
|
elif addr_to_update.update_existing_value:
|
2025-08-16 02:24:19 -04:00
|
|
|
# Grabs minimum value of a list of numbers and makes sure it does not go above max count possible
|
2025-08-17 21:22:12 -04:00
|
|
|
current_ram_address_value += addr_to_update.value
|
|
|
|
current_ram_address_value = min(current_ram_address_value, addr_to_update.max_count)
|
2025-08-07 00:38:42 -04:00
|
|
|
else:
|
|
|
|
current_ram_address_value = addr_to_update.value
|
|
|
|
|
|
|
|
# Write the updated value back into RAM
|
2025-08-17 21:22:12 -04:00
|
|
|
await self.update_and_validate_address(ctx, addr_to_update.ram_address, current_ram_address_value, addr_to_update.bit_size)
|
|
|
|
# await bizhawk.write(ctx.bizhawk_ctx, [(addr_to_update.ram_address,
|
|
|
|
# current_ram_address_value.to_bytes(addr_to_update.bit_size, "little"), "MainRAM")])
|
2025-08-07 00:38:42 -04:00
|
|
|
|
2025-08-14 00:23:40 -04:00
|
|
|
self.last_received_index += 1
|
2025-08-19 01:59:18 -04:00
|
|
|
await self.update_and_validate_address(ctx, RECV_ITEM_ADDR, self.last_received_index, RECV_ITEM_BITSIZE)
|
2025-08-14 00:23:40 -04:00
|
|
|
|
2025-08-16 02:24:19 -04:00
|
|
|
async def goal_checker(self, ctx: "BizHawkClientContext"):
|
|
|
|
if not ctx.finished_game:
|
|
|
|
goal_loc = grinch_locations["Neutralizing Santa"]
|
|
|
|
goal_ram_address = goal_loc.update_ram_addr[0]
|
|
|
|
current_ram_address_value = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
|
|
|
|
goal_ram_address.ram_address, goal_ram_address.bit_size, "MainRAM")]))[0], "little")
|
|
|
|
if (current_ram_address_value & (1 << goal_ram_address.binary_bit_pos)) > 0:
|
|
|
|
ctx.finished_game = True
|
|
|
|
await ctx.send_msgs([{
|
|
|
|
"cmd": "StatusUpdate",
|
|
|
|
"status": NetUtils.ClientStatus.CLIENT_GOAL,
|
|
|
|
}])
|
|
|
|
|
2025-08-25 18:27:19 -04:00
|
|
|
async def constant_address_update(self, ctx: "BizHawkClientContext"):
|
|
|
|
list_recv_itemids: list[int] = [netItem.item for netItem in ctx.items_received]
|
|
|
|
items_to_check: dict[str, GrinchItemData] = {**SLEIGH_PARTS_TABLE, **MISSION_ITEMS_TABLE, **GADGETS_TABLE, **KEYS_TABLE}
|
2025-08-25 18:49:44 -04:00
|
|
|
heart_count = len(list(item_id for item_id in list_recv_itemids if item_id == 42570))
|
|
|
|
heart_item_data = ALL_ITEMS_TABLE["Heart of Stone"]
|
|
|
|
await self.update_and_validate_address(ctx, heart_item_data.update_ram_addr[0].ram_address, min(heart_count, 4), 1)
|
|
|
|
|
|
|
|
has_sleigh_key = any(list(item_id for item_id in list_recv_itemids if item_id == 42479))
|
|
|
|
if not has_sleigh_key:
|
|
|
|
sleigh_key_item_data = ALL_ITEMS_TABLE["Sleigh Room Key"]
|
|
|
|
# TODO Game sets value to 3 at some point. If this breaks at some point, go back into this.
|
|
|
|
await self.update_and_validate_address(ctx, sleigh_key_item_data.update_ram_addr[0].ram_address, 0, 1)
|
2025-08-25 18:27:19 -04:00
|
|
|
|
|
|
|
for (item_name, item_data) in items_to_check.items():
|
|
|
|
# If item is an event or already been received, ignore.
|
|
|
|
if item_data.id is None or GrinchLocation.get_apid(item_data.id) in list_recv_itemids:
|
|
|
|
continue
|
|
|
|
|
|
|
|
# This assumes we don't have the item so we must set all the data to 0
|
|
|
|
for addr_to_update in item_data.update_ram_addr:
|
|
|
|
is_binary = True if not addr_to_update.binary_bit_pos is None else False
|
|
|
|
current_ram_address_value = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
|
|
|
|
addr_to_update.ram_address, addr_to_update.bit_size, "MainRAM")]))[0], "little")
|
|
|
|
if is_binary:
|
|
|
|
await self.update_and_validate_address(ctx, addr_to_update.ram_address,
|
|
|
|
current_ram_address_value | (1 << addr_to_update.binary_bit_pos), 1)
|
|
|
|
else:
|
|
|
|
await self.update_and_validate_address(ctx, addr_to_update.ram_address, 0, 1)
|
2025-08-16 02:24:19 -04:00
|
|
|
|
2025-08-14 00:23:40 -04:00
|
|
|
async def ingame_checker(self, ctx: "BizHawkClientContext"):
|
|
|
|
demo_mode = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
|
|
|
|
0x01008A, 1, "MainRAM")]))[0], "little")
|
|
|
|
if demo_mode == 1:
|
|
|
|
return False
|
|
|
|
|
|
|
|
is_not_ingame = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
|
|
|
|
0x010000, 1, "MainRAM")]))[0], "little")
|
|
|
|
if is_not_ingame <= 0x04 or is_not_ingame >= 0x35:
|
|
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
|
|
async def option_handler(self, ctx: "BizHawkClientContext"):
|
|
|
|
if self.loc_unlimited_eggs:
|
|
|
|
max_eggs: int = 200
|
2025-08-19 01:59:18 -04:00
|
|
|
await bizhawk.write(ctx.bizhawk_ctx, [(0x010058, max_eggs.to_bytes(2,"little"), "MainRAM")])
|
2025-08-17 21:22:12 -04:00
|
|
|
|
|
|
|
async def update_and_validate_address(self, ctx: "BizHawkClientContext", address_to_validate: int, expected_value: int, byte_size: int):
|
|
|
|
await bizhawk.write(ctx.bizhawk_ctx, [(address_to_validate, expected_value.to_bytes(byte_size, "little"), "MainRAM")])
|
|
|
|
current_value = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(address_to_validate, byte_size, "MainRAM")]))[0], "little")
|
|
|
|
if not current_value == expected_value:
|
2025-08-25 18:27:19 -04:00
|
|
|
if address_to_validate == 0x010000 or address_to_validate == 0x08FB94: # TODO Temporairly skips teleportation addresses; to be changed later on.
|
2025-08-25 17:07:21 -04:00
|
|
|
return
|
2025-08-17 21:22:12 -04:00
|
|
|
raise Exception("Unable to update address as expected. Address: "+ str(address_to_validate)+"; Expected Value: "+str(expected_value))
|