mirror of
https://github.com/MarioSpore/Grinch-AP.git
synced 2025-10-21 12:11:33 -06:00

Co-authored-by: beauxq <beauxq@yahoo.com> Co-authored-by: Exempt-Medic <60412657+Exempt-Medic@users.noreply.github.com>
329 lines
16 KiB
Python
329 lines
16 KiB
Python
import logging
|
|
from collections import deque
|
|
from typing import TYPE_CHECKING
|
|
|
|
from NetUtils import ClientStatus
|
|
|
|
import worlds._bizhawk as bizhawk
|
|
from worlds._bizhawk.client import BizHawkClient
|
|
|
|
if TYPE_CHECKING:
|
|
from worlds._bizhawk.context import BizHawkClientContext
|
|
|
|
|
|
base_id = 7000
|
|
logger = logging.getLogger("Client")
|
|
|
|
|
|
rom_name_location = 0x07FFE3
|
|
locations_array_start = 0x200
|
|
locations_array_length = 0x100
|
|
items_obtained = 0x03
|
|
gp_location_low = 0x1C
|
|
gp_location_middle = 0x1D
|
|
gp_location_high = 0x1E
|
|
weapons_arrays_starts = [0x118, 0x158, 0x198, 0x1D8]
|
|
armors_arrays_starts = [0x11C, 0x15C, 0x19C, 0x1DC]
|
|
status_a_location = 0x102
|
|
status_b_location = 0x0FC
|
|
status_c_location = 0x0A3
|
|
|
|
key_items = ["Lute", "Crown", "Crystal", "Herb", "Key", "Tnt", "Adamant", "Slab", "Ruby", "Rod",
|
|
"Floater", "Chime", "Tail", "Cube", "Bottle", "Oxyale", "EarthOrb", "FireOrb", "WaterOrb", "AirOrb"]
|
|
|
|
consumables = ["Shard", "Tent", "Cabin", "House", "Heal", "Pure", "Soft"]
|
|
|
|
weapons = ["WoodenNunchucks", "SmallKnife", "WoodenRod", "Rapier", "IronHammer", "ShortSword", "HandAxe", "Scimitar",
|
|
"IronNunchucks", "LargeKnife", "IronStaff", "Sabre", "LongSword", "GreatAxe", "Falchon", "SilverKnife",
|
|
"SilverSword", "SilverHammer", "SilverAxe", "FlameSword", "IceSword", "DragonSword", "GiantSword",
|
|
"SunSword", "CoralSword", "WereSword", "RuneSword", "PowerRod", "LightAxe", "HealRod", "MageRod", "Defense",
|
|
"WizardRod", "Vorpal", "CatClaw", "ThorHammer", "BaneSword", "Katana", "Xcalber", "Masamune"]
|
|
|
|
armor = ["Cloth", "WoodenArmor", "ChainArmor", "IronArmor", "SteelArmor", "SilverArmor", "FlameArmor", "IceArmor",
|
|
"OpalArmor", "DragonArmor", "Copper", "Silver", "Gold", "Opal", "WhiteShirt", "BlackShirt", "WoodenShield",
|
|
"IronShield", "SilverShield", "FlameShield", "IceShield", "OpalShield", "AegisShield", "Buckler", "ProCape",
|
|
"Cap", "WoodenHelm", "IronHelm", "SilverHelm", "OpalHelm", "HealHelm", "Ribbon", "Gloves", "CopperGauntlets",
|
|
"IronGauntlets", "SilverGauntlets", "ZeusGauntlets", "PowerGauntlets", "OpalGauntlets", "ProRing"]
|
|
|
|
gold_items = ["Gold10", "Gold20", "Gold25", "Gold30", "Gold55", "Gold70", "Gold85", "Gold110", "Gold135", "Gold155",
|
|
"Gold160", "Gold180", "Gold240", "Gold255", "Gold260", "Gold295", "Gold300", "Gold315", "Gold330",
|
|
"Gold350", "Gold385", "Gold400", "Gold450", "Gold500", "Gold530", "Gold575", "Gold620", "Gold680",
|
|
"Gold750", "Gold795", "Gold880", "Gold1020", "Gold1250", "Gold1455", "Gold1520", "Gold1760", "Gold1975",
|
|
"Gold2000", "Gold2750", "Gold3400", "Gold4150", "Gold5000", "Gold5450", "Gold6400", "Gold6720",
|
|
"Gold7340", "Gold7690", "Gold7900", "Gold8135", "Gold9000", "Gold9300", "Gold9500", "Gold9900",
|
|
"Gold10000", "Gold12350", "Gold13000", "Gold13450", "Gold14050", "Gold14720", "Gold15000", "Gold17490",
|
|
"Gold18010", "Gold19990", "Gold20000", "Gold20010", "Gold26000", "Gold45000", "Gold65000"]
|
|
|
|
extended_consumables = ["FullCure", "Phoenix", "Blast", "Smoke",
|
|
"Refresh", "Flare", "Black", "Guard",
|
|
"Quick", "HighPotion", "Wizard", "Cloak"]
|
|
|
|
ext_consumables_lookup = {"FullCure": "Ext1", "Phoenix": "Ext2", "Blast": "Ext3", "Smoke": "Ext4",
|
|
"Refresh": "Ext1", "Flare": "Ext2", "Black": "Ext3", "Guard": "Ext4",
|
|
"Quick": "Ext1", "HighPotion": "Ext2", "Wizard": "Ext3", "Cloak": "Ext4"}
|
|
|
|
ext_consumables_locations = {"Ext1": 0x3C, "Ext2": 0x3D, "Ext3": 0x3E, "Ext4": 0x3F}
|
|
|
|
|
|
movement_items = ["Ship", "Bridge", "Canal", "Canoe"]
|
|
|
|
no_overworld_items = ["Sigil", "Mark"]
|
|
|
|
|
|
class FF1Client(BizHawkClient):
|
|
game = "Final Fantasy"
|
|
system = "NES"
|
|
|
|
weapons_queue: deque[int]
|
|
armor_queue: deque[int]
|
|
consumable_stack_amounts: dict[str, int] | None
|
|
|
|
def __init__(self) -> None:
|
|
self.wram = "RAM"
|
|
self.sram = "WRAM"
|
|
self.rom = "PRG ROM"
|
|
self.consumable_stack_amounts = None
|
|
self.weapons_queue = deque()
|
|
self.armor_queue = deque()
|
|
self.guard_character = 0x00
|
|
|
|
async def validate_rom(self, ctx: "BizHawkClientContext") -> bool:
|
|
try:
|
|
# Check ROM name/patch version
|
|
rom_name = ((await bizhawk.read(ctx.bizhawk_ctx, [(rom_name_location, 0x0D, self.rom)]))[0])
|
|
rom_name = rom_name.decode("ascii")
|
|
if rom_name != "FINAL FANTASY":
|
|
return False # Not a Final Fantasy 1 ROM
|
|
except bizhawk.RequestFailedError:
|
|
return False # Not able to get a response, say no for now
|
|
|
|
ctx.game = self.game
|
|
ctx.items_handling = 0b111
|
|
ctx.want_slot_data = True
|
|
# Resetting these in case of switching ROMs
|
|
self.consumable_stack_amounts = None
|
|
self.weapons_queue = deque()
|
|
self.armor_queue = deque()
|
|
|
|
return True
|
|
|
|
async def game_watcher(self, ctx: "BizHawkClientContext") -> None:
|
|
if ctx.server is None:
|
|
return
|
|
|
|
if ctx.slot is None:
|
|
return
|
|
try:
|
|
self.guard_character = await self.read_sram_value(ctx, status_a_location)
|
|
# If the first character's name starts with a 0 value, we're at the title screen/character creation.
|
|
# In that case, don't allow any read/writes.
|
|
# We do this by setting the guard to 1 because that's neither a valid character nor the initial value.
|
|
if self.guard_character == 0:
|
|
self.guard_character = 0x01
|
|
|
|
if self.consumable_stack_amounts is None:
|
|
self.consumable_stack_amounts = {}
|
|
self.consumable_stack_amounts["Shard"] = 1
|
|
other_consumable_amounts = await self.read_rom(ctx, 0x47400, 10)
|
|
self.consumable_stack_amounts["Tent"] = other_consumable_amounts[0] + 1
|
|
self.consumable_stack_amounts["Cabin"] = other_consumable_amounts[1] + 1
|
|
self.consumable_stack_amounts["House"] = other_consumable_amounts[2] + 1
|
|
self.consumable_stack_amounts["Heal"] = other_consumable_amounts[3] + 1
|
|
self.consumable_stack_amounts["Pure"] = other_consumable_amounts[4] + 1
|
|
self.consumable_stack_amounts["Soft"] = other_consumable_amounts[5] + 1
|
|
self.consumable_stack_amounts["Ext1"] = other_consumable_amounts[6] + 1
|
|
self.consumable_stack_amounts["Ext2"] = other_consumable_amounts[7] + 1
|
|
self.consumable_stack_amounts["Ext3"] = other_consumable_amounts[8] + 1
|
|
self.consumable_stack_amounts["Ext4"] = other_consumable_amounts[9] + 1
|
|
|
|
await self.location_check(ctx)
|
|
await self.received_items_check(ctx)
|
|
await self.process_weapons_queue(ctx)
|
|
await self.process_armor_queue(ctx)
|
|
|
|
except bizhawk.RequestFailedError:
|
|
# The connector didn't respond. Exit handler and return to main loop to reconnect
|
|
pass
|
|
|
|
async def location_check(self, ctx: "BizHawkClientContext"):
|
|
locations_data = await self.read_sram_values_guarded(ctx, locations_array_start, locations_array_length)
|
|
if locations_data is None:
|
|
return
|
|
locations_checked = []
|
|
if len(locations_data) > 0xFE and locations_data[0xFE] & 0x02 != 0 and not ctx.finished_game:
|
|
await ctx.send_msgs([
|
|
{"cmd": "StatusUpdate",
|
|
"status": ClientStatus.CLIENT_GOAL}
|
|
])
|
|
ctx.finished_game = True
|
|
for location in ctx.missing_locations:
|
|
# index will be - 0x100 or 0x200
|
|
index = location
|
|
if location < 0x200:
|
|
# Location is a chest
|
|
index -= 0x100
|
|
flag = 0x04
|
|
else:
|
|
# Location is an NPC
|
|
index -= 0x200
|
|
flag = 0x02
|
|
if locations_data[index] & flag != 0:
|
|
locations_checked.append(location)
|
|
|
|
found_locations = await ctx.check_locations(locations_checked)
|
|
for location in found_locations:
|
|
ctx.locations_checked.add(location)
|
|
location_name = ctx.location_names.lookup_in_game(location)
|
|
logger.info(
|
|
f'New Check: {location_name} ({len(ctx.locations_checked)}/'
|
|
f'{len(ctx.missing_locations) + len(ctx.checked_locations)})')
|
|
|
|
|
|
async def received_items_check(self, ctx: "BizHawkClientContext") -> None:
|
|
assert self.consumable_stack_amounts, "shouldn't call this function without reading consumable_stack_amounts"
|
|
write_list: list[tuple[int, list[int], str]] = []
|
|
items_received_count = await self.read_sram_value_guarded(ctx, items_obtained)
|
|
if items_received_count is None:
|
|
return
|
|
if items_received_count < len(ctx.items_received):
|
|
current_item = ctx.items_received[items_received_count]
|
|
current_item_id = current_item.item
|
|
current_item_name = ctx.item_names.lookup_in_game(current_item_id, ctx.game)
|
|
if current_item_name in key_items:
|
|
location = current_item_id - 0xE0
|
|
write_list.append((location, [1], self.sram))
|
|
elif current_item_name in movement_items:
|
|
location = current_item_id - 0x1E0
|
|
if current_item_name != "Canal":
|
|
write_list.append((location, [1], self.sram))
|
|
else:
|
|
write_list.append((location, [0], self.sram))
|
|
elif current_item_name in no_overworld_items:
|
|
if current_item_name == "Sigil":
|
|
location = 0x28
|
|
else:
|
|
location = 0x12
|
|
write_list.append((location, [1], self.sram))
|
|
elif current_item_name in gold_items:
|
|
gold_amount = int(current_item_name[4:])
|
|
current_gold_value = await self.read_sram_values_guarded(ctx, gp_location_low, 3)
|
|
if current_gold_value is None:
|
|
return
|
|
current_gold = int.from_bytes(current_gold_value, "little")
|
|
new_gold = min(gold_amount + current_gold, 999999)
|
|
lower_byte = new_gold % (2 ** 8)
|
|
middle_byte = (new_gold // (2 ** 8)) % (2 ** 8)
|
|
upper_byte = new_gold // (2 ** 16)
|
|
write_list.append((gp_location_low, [lower_byte], self.sram))
|
|
write_list.append((gp_location_middle, [middle_byte], self.sram))
|
|
write_list.append((gp_location_high, [upper_byte], self.sram))
|
|
elif current_item_name in consumables:
|
|
location = current_item_id - 0xE0
|
|
current_value = await self.read_sram_value_guarded(ctx, location)
|
|
if current_value is None:
|
|
return
|
|
amount_to_add = self.consumable_stack_amounts[current_item_name]
|
|
new_value = min(current_value + amount_to_add, 99)
|
|
write_list.append((location, [new_value], self.sram))
|
|
elif current_item_name in extended_consumables:
|
|
ext_name = ext_consumables_lookup[current_item_name]
|
|
location = ext_consumables_locations[ext_name]
|
|
current_value = await self.read_sram_value_guarded(ctx, location)
|
|
if current_value is None:
|
|
return
|
|
amount_to_add = self.consumable_stack_amounts[ext_name]
|
|
new_value = min(current_value + amount_to_add, 99)
|
|
write_list.append((location, [new_value], self.sram))
|
|
elif current_item_name in weapons:
|
|
self.weapons_queue.appendleft(current_item_id - 0x11B)
|
|
elif current_item_name in armor:
|
|
self.armor_queue.appendleft(current_item_id - 0x143)
|
|
write_list.append((items_obtained, [items_received_count + 1], self.sram))
|
|
write_successful = await self.write_sram_values_guarded(ctx, write_list)
|
|
if write_successful:
|
|
await bizhawk.display_message(ctx.bizhawk_ctx, f"Received {current_item_name}")
|
|
|
|
async def process_weapons_queue(self, ctx: "BizHawkClientContext"):
|
|
empty_slots = deque()
|
|
char1_slots = await self.read_sram_values_guarded(ctx, weapons_arrays_starts[0], 4)
|
|
char2_slots = await self.read_sram_values_guarded(ctx, weapons_arrays_starts[1], 4)
|
|
char3_slots = await self.read_sram_values_guarded(ctx, weapons_arrays_starts[2], 4)
|
|
char4_slots = await self.read_sram_values_guarded(ctx, weapons_arrays_starts[3], 4)
|
|
if char1_slots is None or char2_slots is None or char3_slots is None or char4_slots is None:
|
|
return
|
|
for i, slot in enumerate(char1_slots):
|
|
if slot == 0:
|
|
empty_slots.appendleft(weapons_arrays_starts[0] + i)
|
|
for i, slot in enumerate(char2_slots):
|
|
if slot == 0:
|
|
empty_slots.appendleft(weapons_arrays_starts[1] + i)
|
|
for i, slot in enumerate(char3_slots):
|
|
if slot == 0:
|
|
empty_slots.appendleft(weapons_arrays_starts[2] + i)
|
|
for i, slot in enumerate(char4_slots):
|
|
if slot == 0:
|
|
empty_slots.appendleft(weapons_arrays_starts[3] + i)
|
|
while len(empty_slots) > 0 and len(self.weapons_queue) > 0:
|
|
current_slot = empty_slots.pop()
|
|
current_weapon = self.weapons_queue.pop()
|
|
await self.write_sram_guarded(ctx, current_slot, current_weapon)
|
|
|
|
async def process_armor_queue(self, ctx: "BizHawkClientContext"):
|
|
empty_slots = deque()
|
|
char1_slots = await self.read_sram_values_guarded(ctx, armors_arrays_starts[0], 4)
|
|
char2_slots = await self.read_sram_values_guarded(ctx, armors_arrays_starts[1], 4)
|
|
char3_slots = await self.read_sram_values_guarded(ctx, armors_arrays_starts[2], 4)
|
|
char4_slots = await self.read_sram_values_guarded(ctx, armors_arrays_starts[3], 4)
|
|
if char1_slots is None or char2_slots is None or char3_slots is None or char4_slots is None:
|
|
return
|
|
for i, slot in enumerate(char1_slots):
|
|
if slot == 0:
|
|
empty_slots.appendleft(armors_arrays_starts[0] + i)
|
|
for i, slot in enumerate(char2_slots):
|
|
if slot == 0:
|
|
empty_slots.appendleft(armors_arrays_starts[1] + i)
|
|
for i, slot in enumerate(char3_slots):
|
|
if slot == 0:
|
|
empty_slots.appendleft(armors_arrays_starts[2] + i)
|
|
for i, slot in enumerate(char4_slots):
|
|
if slot == 0:
|
|
empty_slots.appendleft(armors_arrays_starts[3] + i)
|
|
while len(empty_slots) > 0 and len(self.armor_queue) > 0:
|
|
current_slot = empty_slots.pop()
|
|
current_armor = self.armor_queue.pop()
|
|
await self.write_sram_guarded(ctx, current_slot, current_armor)
|
|
|
|
|
|
async def read_sram_value(self, ctx: "BizHawkClientContext", location: int):
|
|
value = ((await bizhawk.read(ctx.bizhawk_ctx, [(location, 1, self.sram)]))[0])
|
|
return int.from_bytes(value, "little")
|
|
|
|
async def read_sram_values_guarded(self, ctx: "BizHawkClientContext", location: int, size: int):
|
|
value = await bizhawk.guarded_read(ctx.bizhawk_ctx,
|
|
[(location, size, self.sram)],
|
|
[(status_a_location, [self.guard_character], self.sram)])
|
|
if value is None:
|
|
return None
|
|
return value[0]
|
|
|
|
async def read_sram_value_guarded(self, ctx: "BizHawkClientContext", location: int):
|
|
value = await bizhawk.guarded_read(ctx.bizhawk_ctx,
|
|
[(location, 1, self.sram)],
|
|
[(status_a_location, [self.guard_character], self.sram)])
|
|
if value is None:
|
|
return None
|
|
return int.from_bytes(value[0], "little")
|
|
|
|
async def read_rom(self, ctx: "BizHawkClientContext", location: int, size: int):
|
|
return (await bizhawk.read(ctx.bizhawk_ctx, [(location, size, self.rom)]))[0]
|
|
|
|
async def write_sram_guarded(self, ctx: "BizHawkClientContext", location: int, value: int):
|
|
return await bizhawk.guarded_write(ctx.bizhawk_ctx,
|
|
[(location, [value], self.sram)],
|
|
[(status_a_location, [self.guard_character], self.sram)])
|
|
|
|
async def write_sram_values_guarded(self, ctx: "BizHawkClientContext", write_list):
|
|
return await bizhawk.guarded_write(ctx.bizhawk_ctx,
|
|
write_list,
|
|
[(status_a_location, [self.guard_character], self.sram)])
|