Linted worlds/grinch files through python Black

Moved Item name references to a class-based Enum to improve future QoL
Moved categories to an Enum to improve future QoL
Replace bit_size with byte_size to match what the variable is actually measuring
Moved Sleigh Parts regions to match where you actually get the checks
Updated the RAM Handler with comments, renamed bit_size to byte_size, replaced update_value with update_method that now takes one of several methods, and created an __init__ function

NOTE: DOES NOT CURRENLTY FUNCTION
This commit is contained in:
2025-10-27 13:17:04 -06:00
parent 6f552949fa
commit d0a8df25f6
7 changed files with 2876 additions and 902 deletions

View File

@@ -6,7 +6,13 @@ import copy
import uuid
import Utils
from .Locations import grinch_locations, GrinchLocation
from .Items import ALL_ITEMS_TABLE, MISSION_ITEMS_TABLE, GADGETS_TABLE, KEYS_TABLE, GrinchItemData #, SLEIGH_PARTS_TABLE
from .Items import (
ALL_ITEMS_TABLE,
MISSION_ITEMS_TABLE,
GADGETS_TABLE,
KEYS_TABLE,
GrinchItemData,
) # , SLEIGH_PARTS_TABLE
import worlds._bizhawk as bizhawk
from worlds._bizhawk.client import BizHawkClient
@@ -30,6 +36,7 @@ MAX_EGGS: int = 200
EGG_COUNT_ADDR: int = 0x010058
EGG_ADDR_BYTESIZE: int = 2
class GrinchClient(BizHawkClient):
game = "The Grinch"
system = "PSX"
@@ -52,27 +59,44 @@ class GrinchClient(BizHawkClient):
async def validate_rom(self, ctx: "BizHawkClientContext") -> bool:
from CommonClient import logger
# TODO Check the ROM data to see if it matches against bytes expected
grinch_identifier_ram_address: int = 0x00928C
bios_identifier_ram_address: int = 0x097F30
try:
bytes_actual: bytes = (await bizhawk.read(ctx.bizhawk_ctx, [(
grinch_identifier_ram_address, 11, "MainRAM")]))[0]
bytes_actual: bytes = (
await bizhawk.read(
ctx.bizhawk_ctx, [(grinch_identifier_ram_address, 11, "MainRAM")]
)
)[0]
psx_rom_name = bytes_actual.decode("ascii")
if psx_rom_name != "SLUS_011.97":
bios_bytes_check: bytes = (await bizhawk.read(ctx.bizhawk_ctx, [(
bios_identifier_ram_address, 24, "MainRAM")]))[0]
bios_bytes_check: bytes = (
await bizhawk.read(
ctx.bizhawk_ctx, [(bios_identifier_ram_address, 24, "MainRAM")]
)
)[0]
if "System ROM Version" in bios_bytes_check.decode("ascii"):
if not self.loading_bios_msg:
self.loading_bios_msg = True
logger.error("BIOS is currently loading. Will wait up to 5 seconds before retrying.")
logger.error(
"BIOS is currently loading. Will wait up to 5 seconds before retrying."
)
return False
logger.error("Invalid rom detected. You are not playing Grinch USA Version.")
raise Exception("Invalid rom detected. You are not playing Grinch USA Version.")
logger.error(
"Invalid rom detected. You are not playing Grinch USA Version."
)
raise Exception(
"Invalid rom detected. You are not playing Grinch USA Version."
)
ctx.command_processor.commands["ringlink"] = _cmd_ringlink
except Exception:
return False
@@ -86,38 +110,54 @@ class GrinchClient(BizHawkClient):
def on_package(self, ctx: "BizHawkClientContext", cmd: str, args: dict) -> None:
from CommonClient import logger
super().on_package(ctx, cmd, args)
match cmd:
case "Connected": # On Connect
self.loc_unlimited_eggs = bool(ctx.slot_data["give_unlimited_eggs"])
self.unique_client_id = self._get_uuid()
logger.info("You are now connected to the client. "+
"There may be a slight delay to check you are not in demo mode before locations start to send.")
logger.info(
"You are now connected to the client. "
+ "There may be a slight delay to check you are not in demo mode before locations start to send."
)
self.ring_link_enabled = bool(ctx.slot_data["ring_link"])
tags = copy.deepcopy(ctx.tags)
if self.ring_link_enabled:
ctx.tags.add("RingLink")
else:
ctx.tags -= { "RingLink" }
ctx.tags -= {"RingLink"}
if tags != ctx.tags:
Utils.async_start(ctx.send_msgs([{"cmd": "ConnectUpdate", "tags": ctx.tags}]), "Update RingLink Tags")
Utils.async_start(
ctx.send_msgs([{"cmd": "ConnectUpdate", "tags": ctx.tags}]),
"Update RingLink Tags",
)
case "Bounced":
if "tags" not in args:
return
if "RingLink" in ctx.tags and "RingLink" in args["tags"] and args["data"]["source"] != self.unique_client_id:
Utils.async_start(self.ring_link_input(args["data"]["amount"], ctx), "SyncEggs")
if (
"RingLink" in ctx.tags
and "RingLink" in args["tags"]
and args["data"]["source"] != self.unique_client_id
):
Utils.async_start(
self.ring_link_input(args["data"]["amount"], ctx), "SyncEggs"
)
async def set_auth(self, ctx: "BizHawkClientContext") -> None:
await ctx.get_username()
async def game_watcher(self, ctx: "BizHawkClientContext") -> None:
from CommonClient import logger
#If the player is not connected to an AP Server, or their connection was disconnected.
# If the player is not connected to an AP Server, or their connection was disconnected.
if not ctx.slot:
return
@@ -125,7 +165,9 @@ class GrinchClient(BizHawkClient):
if not await self.ingame_checker(ctx):
return
if not any(task.get_name() == "Grinch EggLink" for task in asyncio.all_tasks()):
if not any(
task.get_name() == "Grinch EggLink" for task in asyncio.all_tasks()
):
print("EggLink")
self.send_ring_link = True
Utils.async_start(self.ring_link_output(ctx), name="Grinch EggLink")
@@ -138,16 +180,24 @@ class GrinchClient(BizHawkClient):
except bizhawk.RequestFailedError as ex:
# The connector didn't respond. Exit handler and return to main loop to reconnect
logger.error("Failure to connect / authenticate the grinch. Error details: " + str(ex))
logger.error(
"Failure to connect / authenticate the grinch. Error details: "
+ str(ex)
)
pass
except Exception as genericEx:
# For all other errors, catch this and let the client gracefully disconnect
logger.error("Unknown error occurred while playing the grinch. Error details: " + str(genericEx))
logger.error(
"Unknown error occurred while playing the grinch. Error details: "
+ str(genericEx)
)
await ctx.disconnect(False)
pass
async def location_checker(self, ctx: "BizHawkClientContext"):
from CommonClient import logger
# Update the AP Server to know what locations are not checked yet.
local_locations_checked: list[int] = []
addr_list_to_read: list[tuple[int, int, str]] = []
@@ -157,11 +207,15 @@ class GrinchClient(BizHawkClient):
for missing_location in local_ap_locations:
grinch_loc_name = ctx.location_names.lookup_in_game(missing_location)
grinch_loc_ram_data = grinch_locations[grinch_loc_name]
missing_addr_list: list[tuple[int, int, str]] = [(read_addr.ram_address, read_addr.bit_size, "MainRAM") for
read_addr in grinch_loc_ram_data.update_ram_addr]
missing_addr_list: list[tuple[int, int, str]] = [
(read_addr.ram_address, read_addr.byte_size, "MainRAM")
for read_addr in grinch_loc_ram_data.update_ram_addr
]
addr_list_to_read = [*addr_list_to_read, *missing_addr_list]
returned_bytes: list[bytes] = await bizhawk.read(ctx.bizhawk_ctx, addr_list_to_read)
returned_bytes: list[bytes] = await bizhawk.read(
ctx.bizhawk_ctx, addr_list_to_read
)
# Now loop through everything again and this time get the byte value from the above read, convert to int,
# and check to see if that ram address has our expected value.
@@ -174,22 +228,41 @@ class GrinchClient(BizHawkClient):
# Grinch ram data may have more than one address to update, so we are going to loop through all addresses in a location
# We use a list here to keep track of all our checks. If they are all true, then and only then do we mark that location as checked.
ram_checked_list: list[bool] = []
for addr_to_update in grinch_loc_ram_data.update_ram_addr:
is_binary = True if not addr_to_update.binary_bit_pos is None else False
orig_index: int = addr_list_to_read.index((addr_to_update.ram_address, addr_to_update.bit_size, "MainRAM"))
value_read_from_bizhawk: int = int.from_bytes(returned_bytes[orig_index], "little")
orig_index: int = addr_list_to_read.index(
(addr_to_update.ram_address, addr_to_update.byte_size, "MainRAM")
)
value_read_from_bizhawk: int = int.from_bytes(
returned_bytes[orig_index], "little"
)
if is_binary:
ram_checked_list.append((value_read_from_bizhawk & (1 << addr_to_update.binary_bit_pos)) > 0)
ram_checked_list.append(
(value_read_from_bizhawk & (1 << addr_to_update.binary_bit_pos))
> 0
)
else:
expected_int_value = addr_to_update.value
ram_checked_list.append(expected_int_value == value_read_from_bizhawk)
ram_checked_list.append(
expected_int_value == value_read_from_bizhawk
)
if all(ram_checked_list):
local_locations_checked.append(GrinchLocation.get_apid(grinch_loc_ram_data.id))
local_locations_checked.append(
GrinchLocation.get_apid(grinch_loc_ram_data.id)
)
# Update the AP server with the locally checked list of locations (In other words, locations I found in Grinch)
locations_sent_to_ap: set[int] = await ctx.check_locations(local_locations_checked)
locations_sent_to_ap: set[int] = await ctx.check_locations(
local_locations_checked
)
if len(locations_sent_to_ap) > 0:
await self.remove_physical_items(ctx)
ctx.locations_checked = set(local_locations_checked)
async def receiving_items_handler(self, ctx: "BizHawkClientContext"):
@@ -197,12 +270,20 @@ class GrinchClient(BizHawkClient):
# If the list says that we have 3 items and we already received items, we will ignore and continue.
# Otherwise, we will get the new items and give them to the player.
self.last_received_index = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
RECV_ITEM_ADDR, RECV_ITEM_BITSIZE, "MainRAM")]))[0], "little")
self.last_received_index = int.from_bytes(
(
await bizhawk.read(
ctx.bizhawk_ctx, [(RECV_ITEM_ADDR, RECV_ITEM_BITSIZE, "MainRAM")]
)
)[0],
"little",
)
if len(ctx.items_received) == self.last_received_index:
return
# Ensures we only get the new items that we want to give the player
new_items_only = ctx.items_received[self.last_received_index:]
new_items_only = ctx.items_received[self.last_received_index :]
ram_addr_dict: dict[int, list[int]] = {}
for item_received in new_items_only:
@@ -211,83 +292,164 @@ class GrinchClient(BizHawkClient):
for addr_to_update in grinch_item_ram_data.update_ram_addr:
is_binary = True if not addr_to_update.binary_bit_pos is None else False
if addr_to_update.ram_address in ram_addr_dict.keys():
current_ram_address_value = ram_addr_dict[addr_to_update.ram_address][0]
current_ram_address_value = ram_addr_dict[
addr_to_update.ram_address
][0]
else:
current_ram_address_value = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
addr_to_update.ram_address, addr_to_update.bit_size, "MainRAM")]))[0], "little")
current_ram_address_value = int.from_bytes(
(
await bizhawk.read(
ctx.bizhawk_ctx,
[
(
addr_to_update.ram_address,
addr_to_update.byte_size,
"MainRAM",
)
],
)
)[0],
"little",
)
if is_binary:
current_ram_address_value = (current_ram_address_value | (1 << addr_to_update.binary_bit_pos))
current_ram_address_value = current_ram_address_value | (
1 << addr_to_update.binary_bit_pos
)
elif addr_to_update.update_existing_value:
# Grabs minimum value of a list of numbers and makes sure it does not go above max count possible
current_ram_address_value += addr_to_update.value
current_ram_address_value = min(current_ram_address_value, addr_to_update.max_count)
current_ram_address_value = min(
current_ram_address_value, addr_to_update.max_count
)
else:
current_ram_address_value = addr_to_update.value
# Write the updated value back into RAM
ram_addr_dict[addr_to_update.ram_address] = [current_ram_address_value, addr_to_update.bit_size]
ram_addr_dict[addr_to_update.ram_address] = [
current_ram_address_value,
addr_to_update.byte_size,
]
self.last_received_index += 1
# Update the latest received item index to ram as well.
ram_addr_dict[RECV_ITEM_ADDR] = [self.last_received_index, RECV_ITEM_BITSIZE]
await bizhawk.write(ctx.bizhawk_ctx, self.convert_dict_to_ram_list(ram_addr_dict))
await bizhawk.write(
ctx.bizhawk_ctx, self.convert_dict_to_ram_list(ram_addr_dict)
)
async def goal_checker(self, ctx: "BizHawkClientContext"):
if not ctx.finished_game:
goal_loc = grinch_locations["MC - Sleigh Ride - Neutralizing Santa"]
goal_ram_address = goal_loc.update_ram_addr[0]
current_ram_address_value = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
goal_ram_address.ram_address, goal_ram_address.bit_size, "MainRAM")]))[0], "little")
current_ram_address_value = int.from_bytes(
(
await bizhawk.read(
ctx.bizhawk_ctx,
[
(
goal_ram_address.ram_address,
goal_ram_address.byte_size,
"MainRAM",
)
],
)
)[0],
"little",
)
if (current_ram_address_value & (1 << goal_ram_address.binary_bit_pos)) > 0:
# if current_ram_address_value == goal_ram_address.value:
# if current_ram_address_value == goal_ram_address.value:
ctx.finished_game = True
await ctx.send_msgs([{
"cmd": "StatusUpdate",
"status": NetUtils.ClientStatus.CLIENT_GOAL,
}])
await ctx.send_msgs(
[
{
"cmd": "StatusUpdate",
"status": NetUtils.ClientStatus.CLIENT_GOAL,
}
]
)
# This function's entire purpose is to take away items we physically received ingame, but have not received from AP
async def remove_physical_items(self, ctx: "BizHawkClientContext"):
ram_addr_dict: dict[int, list[int]] = {}
list_recv_itemids: list[int] = [netItem.item for netItem in ctx.items_received]
items_to_check: dict[str, GrinchItemData] = {**GADGETS_TABLE} #, **SLEIGH_PARTS_TABLE
heart_count = len(list(item_id for item_id in list_recv_itemids if item_id == 42570))
items_to_check: dict[str, GrinchItemData] = {
**GADGETS_TABLE
} # , **SLEIGH_PARTS_TABLE
heart_count = len(
list(item_id for item_id in list_recv_itemids if item_id == 42570)
)
heart_item_data = ALL_ITEMS_TABLE["Heart of Stone"]
ram_addr_dict[heart_item_data.update_ram_addr[0].ram_address] = [min(heart_count, 4), 1]
ram_addr_dict[heart_item_data.update_ram_addr[0].ram_address] = [
min(heart_count, 4),
1,
]
# Setting mission count for all accesses back to 0 to prevent warping/unlocking after completing 3 missions
ram_addr_dict[0x0100F0] = [0, 4]
for (item_name, item_data) in items_to_check.items():
for item_name, item_data in items_to_check.items():
# If item is an event or already been received, ignore.
if item_data.id is None or GrinchLocation.get_apid(item_data.id) in list_recv_itemids:
if (
item_data.id is None
or GrinchLocation.get_apid(item_data.id) in list_recv_itemids
):
continue
# This assumes we don't have the item so we must set all the data to 0
for addr_to_update in item_data.update_ram_addr:
is_binary = True if not addr_to_update.binary_bit_pos is None else False
if is_binary:
if addr_to_update.ram_address in ram_addr_dict.keys():
current_bin_value = ram_addr_dict[addr_to_update.ram_address][0]
else:
current_bin_value = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
addr_to_update.ram_address, addr_to_update.bit_size, "MainRAM")]))[0], "little")
current_bin_value = int.from_bytes(
(
await bizhawk.read(
ctx.bizhawk_ctx,
[
(
addr_to_update.ram_address,
addr_to_update.byte_size,
"MainRAM",
)
],
)
)[0],
"little",
)
current_bin_value &= ~(1 << addr_to_update.binary_bit_pos)
ram_addr_dict[addr_to_update.ram_address] = [current_bin_value, 1]
else:
ram_addr_dict[addr_to_update.ram_address] = [0, addr_to_update.bit_size]
ram_addr_dict[addr_to_update.ram_address] = [
0,
addr_to_update.byte_size,
]
await bizhawk.write(ctx.bizhawk_ctx, self.convert_dict_to_ram_list(ram_addr_dict))
await bizhawk.write(
ctx.bizhawk_ctx, self.convert_dict_to_ram_list(ram_addr_dict)
)
def convert_dict_to_ram_list(self, addr_dict: dict[int, list[int]]) -> list[tuple[int, Sequence[int], str]]:
def convert_dict_to_ram_list(
self, addr_dict: dict[int, list[int]]
) -> list[tuple[int, Sequence[int], str]]:
addr_list_to_update: list[tuple[int, Sequence[int], str]] = []
for (key, val) in addr_dict.items():
addr_list_to_update.append((key, val[0].to_bytes(val[1], "little"), "MainRAM"))
for key, val in addr_dict.items():
addr_list_to_update.append(
(key, val[0].to_bytes(val[1], "little"), "MainRAM")
)
return addr_list_to_update
@@ -296,56 +458,93 @@ class GrinchClient(BizHawkClient):
ram_addr_dict: dict[int, list[int]] = {}
list_recv_itemids: list[int] = [netItem.item for netItem in ctx.items_received]
items_to_check: dict[str, GrinchItemData] = {**KEYS_TABLE, **MISSION_ITEMS_TABLE}
items_to_check: dict[str, GrinchItemData] = {
**KEYS_TABLE,
**MISSION_ITEMS_TABLE,
}
for (item_name, item_data) in items_to_check.items():
for item_name, item_data in items_to_check.items():
# If item is an event or already been received, ignore.
if item_data.id is None: # or GrinchLocation.get_apid(item_data.id) in list_recv_itemids:
if (
item_data.id is None
): # or GrinchLocation.get_apid(item_data.id) in list_recv_itemids:
continue
# This will either constantly update the item to ensure you still have it or take it away if you don't deserve it
for addr_to_update in item_data.update_ram_addr:
is_binary = True if not addr_to_update.binary_bit_pos is None else False
if is_binary:
if addr_to_update.ram_address in ram_addr_dict.keys():
current_bin_value = ram_addr_dict[addr_to_update.ram_address][0]
else:
current_bin_value = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
addr_to_update.ram_address, addr_to_update.bit_size, "MainRAM")]))[0], "little")
current_bin_value = int.from_bytes(
(
await bizhawk.read(
ctx.bizhawk_ctx,
[
(
addr_to_update.ram_address,
addr_to_update.byte_size,
"MainRAM",
)
],
)
)[0],
"little",
)
if GrinchLocation.get_apid(item_data.id) in list_recv_itemids:
current_bin_value |= (1 << addr_to_update.binary_bit_pos)
current_bin_value |= 1 << addr_to_update.binary_bit_pos
else:
current_bin_value &= ~(1 << addr_to_update.binary_bit_pos)
ram_addr_dict[addr_to_update.ram_address] = [current_bin_value, 1]
else:
if GrinchLocation.get_apid(item_data.id) in list_recv_itemids:
ram_addr_dict[addr_to_update.ram_address] = [addr_to_update.value, addr_to_update.bit_size]
else:
ram_addr_dict[addr_to_update.ram_address] = [0, addr_to_update.bit_size]
ram_addr_dict[addr_to_update.ram_address] = [
addr_to_update.value,
addr_to_update.byte_size,
]
await bizhawk.write(ctx.bizhawk_ctx, self.convert_dict_to_ram_list(ram_addr_dict))
else:
ram_addr_dict[addr_to_update.ram_address] = [
0,
addr_to_update.byte_size,
]
await bizhawk.write(
ctx.bizhawk_ctx, self.convert_dict_to_ram_list(ram_addr_dict)
)
async def ingame_checker(self, ctx: "BizHawkClientContext"):
from CommonClient import logger
ingame_map_id = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
0x010000, 1, "MainRAM")]))[0], "little")
initial_cutscene_checker = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
0x010094, 1, "MainRAM")]))[0], "little")
ingame_map_id = int.from_bytes(
(await bizhawk.read(ctx.bizhawk_ctx, [(0x010000, 1, "MainRAM")]))[0],
"little",
)
initial_cutscene_checker = int.from_bytes(
(await bizhawk.read(ctx.bizhawk_ctx, [(0x010094, 1, "MainRAM")]))[0],
"little",
)
#If not in game or at a menu, or loading the publisher logos
# If not in game or at a menu, or loading the publisher logos
if ingame_map_id <= 0x04 or ingame_map_id >= 0x35:
self.ingame_log = False
return False
#If grinch has changed maps
# If grinch has changed maps
if not ingame_map_id == self.last_map_location:
# If the last "map" we were on was a menu or a publisher logo
if self.last_map_location in MENU_MAP_IDS:
# Reset our demo mode checker just in case the game is in demo mode.
self.demo_mode_buffer = 0
self.ingame_log = False
if initial_cutscene_checker != 1:
return False
@@ -358,27 +557,42 @@ class GrinchClient(BizHawkClient):
self.demo_mode_buffer += 1
return False
demo_mode = int.from_bytes((await bizhawk.read(ctx.bizhawk_ctx, [(
0x01008A, 1, "MainRAM")]))[0], "little")
demo_mode = int.from_bytes(
(await bizhawk.read(ctx.bizhawk_ctx, [(0x01008A, 1, "MainRAM")]))[0],
"little",
)
if demo_mode == 1:
return False
if not self.ingame_log:
logger.info("You can now start sending locations from the Grinch!")
self.ingame_log = True
return True
async def option_handler(self, ctx: "BizHawkClientContext"):
if self.loc_unlimited_eggs:
await bizhawk.write(ctx.bizhawk_ctx, [(EGG_COUNT_ADDR, MAX_EGGS.to_bytes(2,"little"), "MainRAM")])
await bizhawk.write(
ctx.bizhawk_ctx,
[(EGG_COUNT_ADDR, MAX_EGGS.to_bytes(2, "little"), "MainRAM")],
)
async def ring_link_output(self, ctx: "BizHawkClientContext"):
from CommonClient import logger
while self.send_ring_link and ctx.slot:
try:
current_egg_count = int.from_bytes(
(await bizhawk.read(ctx.bizhawk_ctx, [(EGG_COUNT_ADDR, EGG_ADDR_BYTESIZE, "MainRAM")]))[0], "little")
(
await bizhawk.read(
ctx.bizhawk_ctx,
[(EGG_COUNT_ADDR, EGG_ADDR_BYTESIZE, "MainRAM")],
)
)[0],
"little",
)
if (current_egg_count - self.previous_egg_count) != 0:
msg = {
@@ -386,29 +600,55 @@ class GrinchClient(BizHawkClient):
"data": {
"time": time.time(),
"source": self.unique_client_id,
"amount": current_egg_count - self.previous_egg_count
"amount": current_egg_count - self.previous_egg_count,
},
"tags": ["RingLink"]
"tags": ["RingLink"],
}
await ctx.send_msgs([msg])
self.previous_egg_count = current_egg_count
# logger.info(f"RingLink: You sent {str(current_egg_count - self.previous_egg_count)} rotten eggs.")
await asyncio.sleep(0.1)
except Exception as ex:
logger.error("While monitoring grinch's egg count ingame, an error occurred. Details:"+ str(ex))
logger.error(
"While monitoring grinch's egg count ingame, an error occurred. Details:"
+ str(ex)
)
self.send_ring_link = False
if not ctx.slot:
logger.info("You must be connected to the multi-world in order for RingLink to work properly.")
logger.info(
"You must be connected to the multi-world in order for RingLink to work properly."
)
async def ring_link_input(self, egg_amount: int, ctx: "BizHawkClientContext"):
from CommonClient import logger
game_egg_count = int.from_bytes(
(await bizhawk.read(ctx.bizhawk_ctx, [(EGG_COUNT_ADDR, EGG_ADDR_BYTESIZE, "MainRAM")]))[0], "little")
non_neg_eggs = game_egg_count + egg_amount if game_egg_count + egg_amount > 0 else 0
(
await bizhawk.read(
ctx.bizhawk_ctx, [(EGG_COUNT_ADDR, EGG_ADDR_BYTESIZE, "MainRAM")]
)
)[0],
"little",
)
non_neg_eggs = (
game_egg_count + egg_amount if game_egg_count + egg_amount > 0 else 0
)
current_egg_count = min(non_neg_eggs, MAX_EGGS)
await bizhawk.write(ctx.bizhawk_ctx, [(EGG_COUNT_ADDR,
int(current_egg_count).to_bytes(EGG_ADDR_BYTESIZE, "little"), "MainRAM")])
await bizhawk.write(
ctx.bizhawk_ctx,
[
(
EGG_COUNT_ADDR,
int(current_egg_count).to_bytes(EGG_ADDR_BYTESIZE, "little"),
"MainRAM",
)
],
)
self.previous_egg_count = current_egg_count
# logger.info(f"RingLink: You received {str(egg_amount)} rotten eggs.")
@@ -419,18 +659,27 @@ class GrinchClient(BizHawkClient):
uid += ord(char)
return uid
def _cmd_ringlink(self):
"""Toggle ringling from client. Overrides default setting."""
if not self.ctx.slot:
return
Utils.async_start(_update_ring_link(self.ctx, not "RingLink" in self.ctx.tags), name="Update RingLink")
Utils.async_start(
_update_ring_link(self.ctx, not "RingLink" in self.ctx.tags),
name="Update RingLink",
)
async def _update_ring_link(ctx: "BizHawkClientContext", ring_link: bool):
"""Helper function to set Ring Link connection tag on/off and update the connection if already connected."""
old_tags = copy.deepcopy(ctx.tags)
if ring_link:
ctx.tags.add("RingLink")
else:
ctx.tags -= {"RingLink"}
if old_tags != ctx.tags and ctx.server and not ctx.server.socket.closed:
await ctx.send_msgs([{"cmd": "ConnectUpdate", "tags": ctx.tags}])
await ctx.send_msgs([{"cmd": "ConnectUpdate", "tags": ctx.tags}])