mirror of
https://github.com/MarioSpore/Grinch-AP.git
synced 2025-10-21 12:11:33 -06:00
TLoZ: Implementing The Legend of Zelda (#1354)
Co-authored-by: t3hf1gm3nt <59876300+t3hf1gm3nt@users.noreply.github.com> Co-authored-by: Alchav <59858495+Alchav@users.noreply.github.com>
This commit is contained in:
393
Zelda1Client.py
Normal file
393
Zelda1Client.py
Normal file
@@ -0,0 +1,393 @@
|
||||
# Based (read: copied almost wholesale and edited) off the FF1 Client.
|
||||
|
||||
import asyncio
|
||||
import copy
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
import typing
|
||||
from asyncio import StreamReader, StreamWriter
|
||||
from typing import List
|
||||
|
||||
import Utils
|
||||
from Utils import async_start
|
||||
from worlds import lookup_any_location_id_to_name
|
||||
from CommonClient import CommonContext, server_loop, gui_enabled, console_loop, ClientCommandProcessor, logger, \
|
||||
get_base_parser
|
||||
|
||||
from worlds.tloz.Items import item_game_ids
|
||||
from worlds.tloz.Locations import location_ids
|
||||
from worlds.tloz import Items, Locations, Rom
|
||||
|
||||
SYSTEM_MESSAGE_ID = 0
|
||||
|
||||
CONNECTION_TIMING_OUT_STATUS = "Connection timing out. Please restart your emulator, then restart Zelda_connector.lua"
|
||||
CONNECTION_REFUSED_STATUS = "Connection Refused. Please start your emulator and make sure Zelda_connector.lua is running"
|
||||
CONNECTION_RESET_STATUS = "Connection was reset. Please restart your emulator, then restart Zelda_connector.lua"
|
||||
CONNECTION_TENTATIVE_STATUS = "Initial Connection Made"
|
||||
CONNECTION_CONNECTED_STATUS = "Connected"
|
||||
CONNECTION_INITIAL_STATUS = "Connection has not been initiated"
|
||||
|
||||
DISPLAY_MSGS = True
|
||||
|
||||
item_ids = item_game_ids
|
||||
location_ids = location_ids
|
||||
items_by_id = {id: item for item, id in item_ids.items()}
|
||||
locations_by_id = {id: location for location, id in location_ids.items()}
|
||||
|
||||
|
||||
class ZeldaCommandProcessor(ClientCommandProcessor):
|
||||
|
||||
def _cmd_nes(self):
|
||||
"""Check NES Connection State"""
|
||||
if isinstance(self.ctx, ZeldaContext):
|
||||
logger.info(f"NES Status: {self.ctx.nes_status}")
|
||||
|
||||
def _cmd_toggle_msgs(self):
|
||||
"""Toggle displaying messages in bizhawk"""
|
||||
global DISPLAY_MSGS
|
||||
DISPLAY_MSGS = not DISPLAY_MSGS
|
||||
logger.info(f"Messages are now {'enabled' if DISPLAY_MSGS else 'disabled'}")
|
||||
|
||||
|
||||
class ZeldaContext(CommonContext):
|
||||
command_processor = ZeldaCommandProcessor
|
||||
items_handling = 0b101 # get sent remote and starting items
|
||||
# Infinite Hyrule compatibility
|
||||
overworld_item = 0x5F
|
||||
armos_item = 0x24
|
||||
|
||||
def __init__(self, server_address, password):
|
||||
super().__init__(server_address, password)
|
||||
self.bonus_items = []
|
||||
self.nes_streams: (StreamReader, StreamWriter) = None
|
||||
self.nes_sync_task = None
|
||||
self.messages = {}
|
||||
self.locations_array = None
|
||||
self.nes_status = CONNECTION_INITIAL_STATUS
|
||||
self.game = 'The Legend of Zelda'
|
||||
self.awaiting_rom = False
|
||||
self.shop_slots_left = 0
|
||||
self.shop_slots_middle = 0
|
||||
self.shop_slots_right = 0
|
||||
self.shop_slots = [self.shop_slots_left, self.shop_slots_middle, self.shop_slots_right]
|
||||
self.slot_data = dict()
|
||||
|
||||
async def server_auth(self, password_requested: bool = False):
|
||||
if password_requested and not self.password:
|
||||
await super(ZeldaContext, self).server_auth(password_requested)
|
||||
if not self.auth:
|
||||
self.awaiting_rom = True
|
||||
logger.info('Awaiting connection to NES to get Player information')
|
||||
return
|
||||
|
||||
await self.send_connect()
|
||||
|
||||
def _set_message(self, msg: str, msg_id: int):
|
||||
if DISPLAY_MSGS:
|
||||
self.messages[(time.time(), msg_id)] = msg
|
||||
|
||||
def on_package(self, cmd: str, args: dict):
|
||||
if cmd == 'Connected':
|
||||
self.slot_data = args.get("slot_data", {})
|
||||
asyncio.create_task(parse_locations(self.locations_array, self, True))
|
||||
elif cmd == 'Print':
|
||||
msg = args['text']
|
||||
if ': !' not in msg:
|
||||
self._set_message(msg, SYSTEM_MESSAGE_ID)
|
||||
|
||||
def on_print_json(self, args: dict):
|
||||
if self.ui:
|
||||
self.ui.print_json(copy.deepcopy(args["data"]))
|
||||
else:
|
||||
text = self.jsontotextparser(copy.deepcopy(args["data"]))
|
||||
logger.info(text)
|
||||
relevant = args.get("type", None) in {"Hint", "ItemSend"}
|
||||
if relevant:
|
||||
item = args["item"]
|
||||
# goes to this world
|
||||
if self.slot_concerns_self(args["receiving"]):
|
||||
relevant = True
|
||||
# found in this world
|
||||
elif self.slot_concerns_self(item.player):
|
||||
relevant = True
|
||||
# not related
|
||||
else:
|
||||
relevant = False
|
||||
if relevant:
|
||||
item = args["item"]
|
||||
msg = self.raw_text_parser(copy.deepcopy(args["data"]))
|
||||
self._set_message(msg, item.item)
|
||||
|
||||
def run_gui(self):
|
||||
from kvui import GameManager
|
||||
|
||||
class ZeldaManager(GameManager):
|
||||
logging_pairs = [
|
||||
("Client", "Archipelago")
|
||||
]
|
||||
base_title = "Archipelago Zelda 1 Client"
|
||||
|
||||
self.ui = ZeldaManager(self)
|
||||
self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI")
|
||||
|
||||
|
||||
def get_payload(ctx: ZeldaContext):
|
||||
current_time = time.time()
|
||||
bonus_items = [item for item in ctx.bonus_items]
|
||||
return json.dumps(
|
||||
{
|
||||
"items": [item.item for item in ctx.items_received],
|
||||
"messages": {f'{key[0]}:{key[1]}': value for key, value in ctx.messages.items()
|
||||
if key[0] > current_time - 10},
|
||||
"shops": {
|
||||
"left": ctx.shop_slots_left,
|
||||
"middle": ctx.shop_slots_middle,
|
||||
"right": ctx.shop_slots_right
|
||||
},
|
||||
"bonusItems": bonus_items
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def reconcile_shops(ctx: ZeldaContext):
|
||||
checked_location_names = [lookup_any_location_id_to_name[location] for location in ctx.checked_locations]
|
||||
shops = [location for location in checked_location_names if "Shop" in location]
|
||||
left_slots = [shop for shop in shops if "Left" in shop]
|
||||
middle_slots = [shop for shop in shops if "Middle" in shop]
|
||||
right_slots = [shop for shop in shops if "Right" in shop]
|
||||
for shop in left_slots:
|
||||
ctx.shop_slots_left |= get_shop_bit_from_name(shop)
|
||||
for shop in middle_slots:
|
||||
ctx.shop_slots_middle |= get_shop_bit_from_name(shop)
|
||||
for shop in right_slots:
|
||||
ctx.shop_slots_right |= get_shop_bit_from_name(shop)
|
||||
|
||||
|
||||
def get_shop_bit_from_name(location_name):
|
||||
if "Potion" in location_name:
|
||||
return Rom.potion_shop
|
||||
elif "Arrow" in location_name:
|
||||
return Rom.arrow_shop
|
||||
elif "Shield" in location_name:
|
||||
return Rom.shield_shop
|
||||
elif "Ring" in location_name:
|
||||
return Rom.ring_shop
|
||||
elif "Candle" in location_name:
|
||||
return Rom.candle_shop
|
||||
elif "Take" in location_name:
|
||||
return Rom.take_any
|
||||
return 0 # this should never be hit
|
||||
|
||||
|
||||
async def parse_locations(locations_array, ctx: ZeldaContext, force: bool, zone="None"):
|
||||
if locations_array == ctx.locations_array and not force:
|
||||
return
|
||||
else:
|
||||
# print("New values")
|
||||
ctx.locations_array = locations_array
|
||||
locations_checked = []
|
||||
location = None
|
||||
for location in ctx.missing_locations:
|
||||
location_name = lookup_any_location_id_to_name[location]
|
||||
|
||||
if location_name in Locations.overworld_locations and zone == "overworld":
|
||||
status = locations_array[Locations.major_location_offsets[location_name]]
|
||||
if location_name == "Ocean Heart Container":
|
||||
status = locations_array[ctx.overworld_item]
|
||||
if location_name == "Armos Knights":
|
||||
status = locations_array[ctx.armos_item]
|
||||
if status & 0x10:
|
||||
ctx.locations_checked.add(location)
|
||||
locations_checked.append(location)
|
||||
elif location_name in Locations.underworld1_locations and zone == "underworld1":
|
||||
status = locations_array[Locations.floor_location_game_offsets_early[location_name]]
|
||||
if status & 0x10:
|
||||
ctx.locations_checked.add(location)
|
||||
locations_checked.append(location)
|
||||
elif location_name in Locations.underworld2_locations and zone == "underworld2":
|
||||
status = locations_array[Locations.floor_location_game_offsets_late[location_name]]
|
||||
if status & 0x10:
|
||||
ctx.locations_checked.add(location)
|
||||
locations_checked.append(location)
|
||||
elif (location_name in Locations.shop_locations or "Take" in location_name) and zone == "caves":
|
||||
shop_bit = get_shop_bit_from_name(location_name)
|
||||
slot = 0
|
||||
context_slot = 0
|
||||
if "Left" in location_name:
|
||||
slot = "slot1"
|
||||
context_slot = 0
|
||||
elif "Middle" in location_name:
|
||||
slot = "slot2"
|
||||
context_slot = 1
|
||||
elif "Right" in location_name:
|
||||
slot = "slot3"
|
||||
context_slot = 2
|
||||
if locations_array[slot] & shop_bit > 0:
|
||||
locations_checked.append(location)
|
||||
ctx.shop_slots[context_slot] |= shop_bit
|
||||
if locations_array["takeAnys"] and locations_array["takeAnys"] >= 4:
|
||||
if "Take Any" in location_name:
|
||||
short_name = None
|
||||
if "Left" in location_name:
|
||||
short_name = "TakeAnyLeft"
|
||||
elif "Middle" in location_name:
|
||||
short_name = "TakeAnyMiddle"
|
||||
elif "Right" in location_name:
|
||||
short_name = "TakeAnyRight"
|
||||
if short_name is not None:
|
||||
item_code = ctx.slot_data[short_name]
|
||||
if item_code > 0:
|
||||
ctx.bonus_items.append(item_code)
|
||||
locations_checked.append(location)
|
||||
if locations_checked:
|
||||
await ctx.send_msgs([
|
||||
{"cmd": "LocationChecks",
|
||||
"locations": locations_checked}
|
||||
])
|
||||
|
||||
|
||||
async def nes_sync_task(ctx: ZeldaContext):
|
||||
logger.info("Starting nes connector. Use /nes for status information")
|
||||
while not ctx.exit_event.is_set():
|
||||
error_status = None
|
||||
if ctx.nes_streams:
|
||||
(reader, writer) = ctx.nes_streams
|
||||
msg = get_payload(ctx).encode()
|
||||
writer.write(msg)
|
||||
writer.write(b'\n')
|
||||
try:
|
||||
await asyncio.wait_for(writer.drain(), timeout=1.5)
|
||||
try:
|
||||
# Data will return a dict with up to two fields:
|
||||
# 1. A keepalive response of the Players Name (always)
|
||||
# 2. An array representing the memory values of the locations area (if in game)
|
||||
data = await asyncio.wait_for(reader.readline(), timeout=5)
|
||||
data_decoded = json.loads(data.decode())
|
||||
if data_decoded["overworldHC"] is not None:
|
||||
ctx.overworld_item = data_decoded["overworldHC"]
|
||||
if data_decoded["overworldPB"] is not None:
|
||||
ctx.armos_item = data_decoded["overworldPB"]
|
||||
if data_decoded['gameMode'] == 19 and ctx.finished_game == False:
|
||||
await ctx.send_msgs([
|
||||
{"cmd": "StatusUpdate",
|
||||
"status": 30}
|
||||
])
|
||||
ctx.finished_game = True
|
||||
if ctx.game is not None and 'overworld' in data_decoded:
|
||||
# Not just a keep alive ping, parse
|
||||
asyncio.create_task(parse_locations(data_decoded['overworld'], ctx, False, "overworld"))
|
||||
if ctx.game is not None and 'underworld1' in data_decoded:
|
||||
asyncio.create_task(parse_locations(data_decoded['underworld1'], ctx, False, "underworld1"))
|
||||
if ctx.game is not None and 'underworld2' in data_decoded:
|
||||
asyncio.create_task(parse_locations(data_decoded['underworld2'], ctx, False, "underworld2"))
|
||||
if ctx.game is not None and 'caves' in data_decoded:
|
||||
asyncio.create_task(parse_locations(data_decoded['caves'], ctx, False, "caves"))
|
||||
if not ctx.auth:
|
||||
ctx.auth = ''.join([chr(i) for i in data_decoded['playerName'] if i != 0])
|
||||
if ctx.auth == '':
|
||||
logger.info("Invalid ROM detected. No player name built into the ROM. Please regenerate"
|
||||
"the ROM using the same link but adding your slot name")
|
||||
if ctx.awaiting_rom:
|
||||
await ctx.server_auth(False)
|
||||
reconcile_shops(ctx)
|
||||
except asyncio.TimeoutError:
|
||||
logger.debug("Read Timed Out, Reconnecting")
|
||||
error_status = CONNECTION_TIMING_OUT_STATUS
|
||||
writer.close()
|
||||
ctx.nes_streams = None
|
||||
except ConnectionResetError as e:
|
||||
logger.debug("Read failed due to Connection Lost, Reconnecting")
|
||||
error_status = CONNECTION_RESET_STATUS
|
||||
writer.close()
|
||||
ctx.nes_streams = None
|
||||
except TimeoutError:
|
||||
logger.debug("Connection Timed Out, Reconnecting")
|
||||
error_status = CONNECTION_TIMING_OUT_STATUS
|
||||
writer.close()
|
||||
ctx.nes_streams = None
|
||||
except ConnectionResetError:
|
||||
logger.debug("Connection Lost, Reconnecting")
|
||||
error_status = CONNECTION_RESET_STATUS
|
||||
writer.close()
|
||||
ctx.nes_streams = None
|
||||
if ctx.nes_status == CONNECTION_TENTATIVE_STATUS:
|
||||
if not error_status:
|
||||
logger.info("Successfully Connected to NES")
|
||||
ctx.nes_status = CONNECTION_CONNECTED_STATUS
|
||||
else:
|
||||
ctx.nes_status = f"Was tentatively connected but error occured: {error_status}"
|
||||
elif error_status:
|
||||
ctx.nes_status = error_status
|
||||
logger.info("Lost connection to nes and attempting to reconnect. Use /nes for status updates")
|
||||
else:
|
||||
try:
|
||||
logger.debug("Attempting to connect to NES")
|
||||
ctx.nes_streams = await asyncio.wait_for(asyncio.open_connection("localhost", 52980), timeout=10)
|
||||
ctx.nes_status = CONNECTION_TENTATIVE_STATUS
|
||||
except TimeoutError:
|
||||
logger.debug("Connection Timed Out, Trying Again")
|
||||
ctx.nes_status = CONNECTION_TIMING_OUT_STATUS
|
||||
continue
|
||||
except ConnectionRefusedError:
|
||||
logger.debug("Connection Refused, Trying Again")
|
||||
ctx.nes_status = CONNECTION_REFUSED_STATUS
|
||||
continue
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Text Mode to use !hint and such with games that have no text entry
|
||||
Utils.init_logging("ZeldaClient")
|
||||
|
||||
options = Utils.get_options()
|
||||
DISPLAY_MSGS = options["tloz_options"]["display_msgs"]
|
||||
|
||||
|
||||
async def run_game(romfile: str) -> None:
|
||||
auto_start = typing.cast(typing.Union[bool, str],
|
||||
Utils.get_options()["tloz_options"].get("rom_start", True))
|
||||
if auto_start is True:
|
||||
import webbrowser
|
||||
webbrowser.open(romfile)
|
||||
elif isinstance(auto_start, str) and os.path.isfile(auto_start):
|
||||
subprocess.Popen([auto_start, romfile],
|
||||
stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||
|
||||
|
||||
async def main(args):
|
||||
if args.diff_file:
|
||||
import Patch
|
||||
logging.info("Patch file was supplied. Creating nes rom..")
|
||||
meta, romfile = Patch.create_rom_file(args.diff_file)
|
||||
if "server" in meta:
|
||||
args.connect = meta["server"]
|
||||
logging.info(f"Wrote rom file to {romfile}")
|
||||
async_start(run_game(romfile))
|
||||
ctx = ZeldaContext(args.connect, args.password)
|
||||
ctx.server_task = asyncio.create_task(server_loop(ctx), name="ServerLoop")
|
||||
if gui_enabled:
|
||||
ctx.run_gui()
|
||||
ctx.run_cli()
|
||||
ctx.nes_sync_task = asyncio.create_task(nes_sync_task(ctx), name="NES Sync")
|
||||
|
||||
await ctx.exit_event.wait()
|
||||
ctx.server_address = None
|
||||
|
||||
await ctx.shutdown()
|
||||
|
||||
if ctx.nes_sync_task:
|
||||
await ctx.nes_sync_task
|
||||
|
||||
|
||||
import colorama
|
||||
|
||||
parser = get_base_parser()
|
||||
parser.add_argument('diff_file', default="", type=str, nargs="?",
|
||||
help='Path to a Archipelago Binary Patch file')
|
||||
args = parser.parse_args()
|
||||
colorama.init()
|
||||
|
||||
asyncio.run(main(args))
|
||||
colorama.deinit()
|
Reference in New Issue
Block a user