Files
Grinch-AP/worlds/ff1/Client.py

333 lines
17 KiB
Python

import logging
from collections import deque
from typing import TYPE_CHECKING
from NetUtils import ClientStatus
import worlds._bizhawk as bizhawk
from worlds._bizhawk.client import BizHawkClient
if TYPE_CHECKING:
from worlds._bizhawk.context import BizHawkClientContext
base_id = 7000
logger = logging.getLogger("Client")
rom_name_location = 0x07FFE3
locations_array_start = 0x200
locations_array_length = 0x100
items_obtained = 0x03
gp_location_low = 0x1C
gp_location_middle = 0x1D
gp_location_high = 0x1E
weapons_arrays_starts = [0x118, 0x158, 0x198, 0x1D8]
armors_arrays_starts = [0x11C, 0x15C, 0x19C, 0x1DC]
status_a_location = 0x102
status_b_location = 0x0FC
status_c_location = 0x0A3
key_items = ["Lute", "Crown", "Crystal", "Herb", "Key", "Tnt", "Adamant", "Slab", "Ruby", "Rod",
"Floater", "Chime", "Tail", "Cube", "Bottle", "Oxyale", "EarthOrb", "FireOrb", "WaterOrb", "AirOrb"]
consumables = ["Shard", "Tent", "Cabin", "House", "Heal", "Pure", "Soft"]
weapons = ["WoodenNunchucks", "SmallKnife", "WoodenRod", "Rapier", "IronHammer", "ShortSword", "HandAxe", "Scimitar",
"IronNunchucks", "LargeKnife", "IronStaff", "Sabre", "LongSword", "GreatAxe", "Falchon", "SilverKnife",
"SilverSword", "SilverHammer", "SilverAxe", "FlameSword", "IceSword", "DragonSword", "GiantSword",
"SunSword", "CoralSword", "WereSword", "RuneSword", "PowerRod", "LightAxe", "HealRod", "MageRod", "Defense",
"WizardRod", "Vorpal", "CatClaw", "ThorHammer", "BaneSword", "Katana", "Xcalber", "Masamune"]
armor = ["Cloth", "WoodenArmor", "ChainArmor", "IronArmor", "SteelArmor", "SilverArmor", "FlameArmor", "IceArmor",
"OpalArmor", "DragonArmor", "Copper", "Silver", "Gold", "Opal", "WhiteShirt", "BlackShirt", "WoodenShield",
"IronShield", "SilverShield", "FlameShield", "IceShield", "OpalShield", "AegisShield", "Buckler", "ProCape",
"Cap", "WoodenHelm", "IronHelm", "SilverHelm", "OpalHelm", "HealHelm", "Ribbon", "Gloves", "CopperGauntlets",
"IronGauntlets", "SilverGauntlets", "ZeusGauntlets", "PowerGauntlets", "OpalGauntlets", "ProRing"]
gold_items = ["Gold10", "Gold20", "Gold25", "Gold30", "Gold55", "Gold70", "Gold85", "Gold110", "Gold135", "Gold155",
"Gold160", "Gold180", "Gold240", "Gold255", "Gold260", "Gold295", "Gold300", "Gold315", "Gold330",
"Gold350", "Gold385", "Gold400", "Gold450", "Gold500", "Gold530", "Gold575", "Gold620", "Gold680",
"Gold750", "Gold795", "Gold880", "Gold1020", "Gold1250", "Gold1455", "Gold1520", "Gold1760", "Gold1975",
"Gold2000", "Gold2750", "Gold3400", "Gold4150", "Gold5000", "Gold5450", "Gold6400", "Gold6720",
"Gold7340", "Gold7690", "Gold7900", "Gold8135", "Gold9000", "Gold9300", "Gold9500", "Gold9900",
"Gold10000", "Gold12350", "Gold13000", "Gold13450", "Gold14050", "Gold14720", "Gold15000", "Gold17490",
"Gold18010", "Gold19990", "Gold20000", "Gold20010", "Gold26000", "Gold45000", "Gold65000"]
extended_consumables = ["FullCure", "Phoenix", "Blast", "Smoke",
"Refresh", "Flare", "Black", "Guard",
"Quick", "HighPotion", "Wizard", "Cloak"]
ext_consumables_lookup = {"FullCure": "Ext1", "Phoenix": "Ext2", "Blast": "Ext3", "Smoke": "Ext4",
"Refresh": "Ext1", "Flare": "Ext2", "Black": "Ext3", "Guard": "Ext4",
"Quick": "Ext1", "HighPotion": "Ext2", "Wizard": "Ext3", "Cloak": "Ext4"}
ext_consumables_locations = {"Ext1": 0x3C, "Ext2": 0x3D, "Ext3": 0x3E, "Ext4": 0x3F}
movement_items = ["Ship", "Bridge", "Canal", "Canoe"]
no_overworld_items = ["Sigil", "Mark"]
class FF1Client(BizHawkClient):
game = "Final Fantasy"
system = "NES"
weapons_queue: deque[int]
armor_queue: deque[int]
consumable_stack_amounts: dict[str, int] | None
def __init__(self) -> None:
self.wram = "RAM"
self.sram = "WRAM"
self.rom = "PRG ROM"
self.consumable_stack_amounts = None
self.weapons_queue = deque()
self.armor_queue = deque()
self.guard_character = 0x00
async def validate_rom(self, ctx: "BizHawkClientContext") -> bool:
try:
if (await bizhawk.get_memory_size(ctx.bizhawk_ctx, self.rom)) < rom_name_location + 0x0D:
return False # ROM is not large enough to be a Final Fantasy 1 ROM
# Check ROM name/patch version
rom_name = ((await bizhawk.read(ctx.bizhawk_ctx, [(rom_name_location, 0x0D, self.rom)]))[0])
rom_name = rom_name.decode("ascii")
if rom_name != "FINAL FANTASY":
return False # Not a Final Fantasy 1 ROM
except UnicodeDecodeError:
return False # rom_name returned invalid text
except bizhawk.RequestFailedError:
return False # Not able to get a response, say no for now
ctx.game = self.game
ctx.items_handling = 0b111
ctx.want_slot_data = True
# Resetting these in case of switching ROMs
self.consumable_stack_amounts = None
self.weapons_queue = deque()
self.armor_queue = deque()
return True
async def game_watcher(self, ctx: "BizHawkClientContext") -> None:
if ctx.server is None:
return
if ctx.slot is None:
return
try:
self.guard_character = await self.read_sram_value(ctx, status_a_location)
# If the first character's name starts with a 0 value, we're at the title screen/character creation.
# In that case, don't allow any read/writes.
# We do this by setting the guard to 1 because that's neither a valid character nor the initial value.
if self.guard_character == 0:
self.guard_character = 0x01
if self.consumable_stack_amounts is None:
self.consumable_stack_amounts = {}
self.consumable_stack_amounts["Shard"] = 1
other_consumable_amounts = await self.read_rom(ctx, 0x47400, 10)
self.consumable_stack_amounts["Tent"] = other_consumable_amounts[0] + 1
self.consumable_stack_amounts["Cabin"] = other_consumable_amounts[1] + 1
self.consumable_stack_amounts["House"] = other_consumable_amounts[2] + 1
self.consumable_stack_amounts["Heal"] = other_consumable_amounts[3] + 1
self.consumable_stack_amounts["Pure"] = other_consumable_amounts[4] + 1
self.consumable_stack_amounts["Soft"] = other_consumable_amounts[5] + 1
self.consumable_stack_amounts["Ext1"] = other_consumable_amounts[6] + 1
self.consumable_stack_amounts["Ext2"] = other_consumable_amounts[7] + 1
self.consumable_stack_amounts["Ext3"] = other_consumable_amounts[8] + 1
self.consumable_stack_amounts["Ext4"] = other_consumable_amounts[9] + 1
await self.location_check(ctx)
await self.received_items_check(ctx)
await self.process_weapons_queue(ctx)
await self.process_armor_queue(ctx)
except bizhawk.RequestFailedError:
# The connector didn't respond. Exit handler and return to main loop to reconnect
pass
async def location_check(self, ctx: "BizHawkClientContext"):
locations_data = await self.read_sram_values_guarded(ctx, locations_array_start, locations_array_length)
if locations_data is None:
return
locations_checked = []
if len(locations_data) > 0xFE and locations_data[0xFE] & 0x02 != 0 and not ctx.finished_game:
await ctx.send_msgs([
{"cmd": "StatusUpdate",
"status": ClientStatus.CLIENT_GOAL}
])
ctx.finished_game = True
for location in ctx.missing_locations:
# index will be - 0x100 or 0x200
index = location
if location < 0x200:
# Location is a chest
index -= 0x100
flag = 0x04
else:
# Location is an NPC
index -= 0x200
flag = 0x02
if locations_data[index] & flag != 0:
locations_checked.append(location)
found_locations = await ctx.check_locations(locations_checked)
for location in found_locations:
ctx.locations_checked.add(location)
location_name = ctx.location_names.lookup_in_game(location)
logger.info(
f'New Check: {location_name} ({len(ctx.locations_checked)}/'
f'{len(ctx.missing_locations) + len(ctx.checked_locations)})')
async def received_items_check(self, ctx: "BizHawkClientContext") -> None:
assert self.consumable_stack_amounts, "shouldn't call this function without reading consumable_stack_amounts"
write_list: list[tuple[int, list[int], str]] = []
items_received_count = await self.read_sram_value_guarded(ctx, items_obtained)
if items_received_count is None:
return
if items_received_count < len(ctx.items_received):
current_item = ctx.items_received[items_received_count]
current_item_id = current_item.item
current_item_name = ctx.item_names.lookup_in_game(current_item_id, ctx.game)
if current_item_name in key_items:
location = current_item_id - 0xE0
write_list.append((location, [1], self.sram))
elif current_item_name in movement_items:
location = current_item_id - 0x1E0
if current_item_name != "Canal":
write_list.append((location, [1], self.sram))
else:
write_list.append((location, [0], self.sram))
elif current_item_name in no_overworld_items:
if current_item_name == "Sigil":
location = 0x28
else:
location = 0x12
write_list.append((location, [1], self.sram))
elif current_item_name in gold_items:
gold_amount = int(current_item_name[4:])
current_gold_value = await self.read_sram_values_guarded(ctx, gp_location_low, 3)
if current_gold_value is None:
return
current_gold = int.from_bytes(current_gold_value, "little")
new_gold = min(gold_amount + current_gold, 999999)
lower_byte = new_gold % (2 ** 8)
middle_byte = (new_gold // (2 ** 8)) % (2 ** 8)
upper_byte = new_gold // (2 ** 16)
write_list.append((gp_location_low, [lower_byte], self.sram))
write_list.append((gp_location_middle, [middle_byte], self.sram))
write_list.append((gp_location_high, [upper_byte], self.sram))
elif current_item_name in consumables:
location = current_item_id - 0xE0
current_value = await self.read_sram_value_guarded(ctx, location)
if current_value is None:
return
amount_to_add = self.consumable_stack_amounts[current_item_name]
new_value = min(current_value + amount_to_add, 99)
write_list.append((location, [new_value], self.sram))
elif current_item_name in extended_consumables:
ext_name = ext_consumables_lookup[current_item_name]
location = ext_consumables_locations[ext_name]
current_value = await self.read_sram_value_guarded(ctx, location)
if current_value is None:
return
amount_to_add = self.consumable_stack_amounts[ext_name]
new_value = min(current_value + amount_to_add, 99)
write_list.append((location, [new_value], self.sram))
elif current_item_name in weapons:
self.weapons_queue.appendleft(current_item_id - 0x11B)
elif current_item_name in armor:
self.armor_queue.appendleft(current_item_id - 0x143)
write_list.append((items_obtained, [items_received_count + 1], self.sram))
write_successful = await self.write_sram_values_guarded(ctx, write_list)
if write_successful:
await bizhawk.display_message(ctx.bizhawk_ctx, f"Received {current_item_name}")
async def process_weapons_queue(self, ctx: "BizHawkClientContext"):
empty_slots = deque()
char1_slots = await self.read_sram_values_guarded(ctx, weapons_arrays_starts[0], 4)
char2_slots = await self.read_sram_values_guarded(ctx, weapons_arrays_starts[1], 4)
char3_slots = await self.read_sram_values_guarded(ctx, weapons_arrays_starts[2], 4)
char4_slots = await self.read_sram_values_guarded(ctx, weapons_arrays_starts[3], 4)
if char1_slots is None or char2_slots is None or char3_slots is None or char4_slots is None:
return
for i, slot in enumerate(char1_slots):
if slot == 0:
empty_slots.appendleft(weapons_arrays_starts[0] + i)
for i, slot in enumerate(char2_slots):
if slot == 0:
empty_slots.appendleft(weapons_arrays_starts[1] + i)
for i, slot in enumerate(char3_slots):
if slot == 0:
empty_slots.appendleft(weapons_arrays_starts[2] + i)
for i, slot in enumerate(char4_slots):
if slot == 0:
empty_slots.appendleft(weapons_arrays_starts[3] + i)
while len(empty_slots) > 0 and len(self.weapons_queue) > 0:
current_slot = empty_slots.pop()
current_weapon = self.weapons_queue.pop()
await self.write_sram_guarded(ctx, current_slot, current_weapon)
async def process_armor_queue(self, ctx: "BizHawkClientContext"):
empty_slots = deque()
char1_slots = await self.read_sram_values_guarded(ctx, armors_arrays_starts[0], 4)
char2_slots = await self.read_sram_values_guarded(ctx, armors_arrays_starts[1], 4)
char3_slots = await self.read_sram_values_guarded(ctx, armors_arrays_starts[2], 4)
char4_slots = await self.read_sram_values_guarded(ctx, armors_arrays_starts[3], 4)
if char1_slots is None or char2_slots is None or char3_slots is None or char4_slots is None:
return
for i, slot in enumerate(char1_slots):
if slot == 0:
empty_slots.appendleft(armors_arrays_starts[0] + i)
for i, slot in enumerate(char2_slots):
if slot == 0:
empty_slots.appendleft(armors_arrays_starts[1] + i)
for i, slot in enumerate(char3_slots):
if slot == 0:
empty_slots.appendleft(armors_arrays_starts[2] + i)
for i, slot in enumerate(char4_slots):
if slot == 0:
empty_slots.appendleft(armors_arrays_starts[3] + i)
while len(empty_slots) > 0 and len(self.armor_queue) > 0:
current_slot = empty_slots.pop()
current_armor = self.armor_queue.pop()
await self.write_sram_guarded(ctx, current_slot, current_armor)
async def read_sram_value(self, ctx: "BizHawkClientContext", location: int):
value = ((await bizhawk.read(ctx.bizhawk_ctx, [(location, 1, self.sram)]))[0])
return int.from_bytes(value, "little")
async def read_sram_values_guarded(self, ctx: "BizHawkClientContext", location: int, size: int):
value = await bizhawk.guarded_read(ctx.bizhawk_ctx,
[(location, size, self.sram)],
[(status_a_location, [self.guard_character], self.sram)])
if value is None:
return None
return value[0]
async def read_sram_value_guarded(self, ctx: "BizHawkClientContext", location: int):
value = await bizhawk.guarded_read(ctx.bizhawk_ctx,
[(location, 1, self.sram)],
[(status_a_location, [self.guard_character], self.sram)])
if value is None:
return None
return int.from_bytes(value[0], "little")
async def read_rom(self, ctx: "BizHawkClientContext", location: int, size: int):
return (await bizhawk.read(ctx.bizhawk_ctx, [(location, size, self.rom)]))[0]
async def write_sram_guarded(self, ctx: "BizHawkClientContext", location: int, value: int):
return await bizhawk.guarded_write(ctx.bizhawk_ctx,
[(location, [value], self.sram)],
[(status_a_location, [self.guard_character], self.sram)])
async def write_sram_values_guarded(self, ctx: "BizHawkClientContext", write_list):
return await bizhawk.guarded_write(ctx.bizhawk_ctx,
write_list,
[(status_a_location, [self.guard_character], self.sram)])