 1698c17caa
			
		
	
	1698c17caa
	
	
	
		
			
			* Fix links to TASVideos.org using HTTP * Revise all docs mentioning Lua in EmuHawk which are in English resolves TASEmulators/BizHawk#3650 * Correct capitalisation of "BizHawk" in strings and camelCase identifiers * Use the term "EmuHawk" when referring to the app, in English docs --------- Co-authored-by: black-sliver <59490463+black-sliver@users.noreply.github.com>
		
			
				
	
	
		
			394 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			394 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Based (read: copied almost wholesale and edited) off the FF1 Client.
 | |
| 
 | |
| import asyncio
 | |
| import copy
 | |
| import json
 | |
| import logging
 | |
| import os
 | |
| import subprocess
 | |
| import time
 | |
| import typing
 | |
| from asyncio import StreamReader, StreamWriter
 | |
| from typing import List
 | |
| 
 | |
| import Utils
 | |
| from Utils import async_start
 | |
| from worlds import lookup_any_location_id_to_name
 | |
| from CommonClient import CommonContext, server_loop, gui_enabled, console_loop, ClientCommandProcessor, logger, \
 | |
|     get_base_parser
 | |
| 
 | |
| from worlds.tloz.Items import item_game_ids
 | |
| from worlds.tloz.Locations import location_ids
 | |
| from worlds.tloz import Items, Locations, Rom
 | |
| 
 | |
| SYSTEM_MESSAGE_ID = 0
 | |
| 
 | |
| CONNECTION_TIMING_OUT_STATUS = "Connection timing out. Please restart your emulator, then restart connector_tloz.lua"
 | |
| CONNECTION_REFUSED_STATUS = "Connection Refused. Please start your emulator and make sure connector_tloz.lua is running"
 | |
| CONNECTION_RESET_STATUS = "Connection was reset. Please restart your emulator, then restart connector_tloz.lua"
 | |
| CONNECTION_TENTATIVE_STATUS = "Initial Connection Made"
 | |
| CONNECTION_CONNECTED_STATUS = "Connected"
 | |
| CONNECTION_INITIAL_STATUS = "Connection has not been initiated"
 | |
| 
 | |
| DISPLAY_MSGS = True
 | |
| 
 | |
| item_ids = item_game_ids
 | |
| location_ids = location_ids
 | |
| items_by_id = {id: item for item, id in item_ids.items()}
 | |
| locations_by_id = {id: location for location, id in location_ids.items()}
 | |
| 
 | |
| 
 | |
| class ZeldaCommandProcessor(ClientCommandProcessor):
 | |
| 
 | |
|     def _cmd_nes(self):
 | |
|         """Check NES Connection State"""
 | |
|         if isinstance(self.ctx, ZeldaContext):
 | |
|             logger.info(f"NES Status: {self.ctx.nes_status}")
 | |
| 
 | |
|     def _cmd_toggle_msgs(self):
 | |
|         """Toggle displaying messages in EmuHawk"""
 | |
|         global DISPLAY_MSGS
 | |
|         DISPLAY_MSGS = not DISPLAY_MSGS
 | |
|         logger.info(f"Messages are now {'enabled' if DISPLAY_MSGS else 'disabled'}")
 | |
| 
 | |
| 
 | |
| class ZeldaContext(CommonContext):
 | |
|     command_processor = ZeldaCommandProcessor
 | |
|     items_handling = 0b101  # get sent remote and starting items
 | |
|     # Infinite Hyrule compatibility
 | |
|     overworld_item = 0x5F
 | |
|     armos_item = 0x24
 | |
| 
 | |
|     def __init__(self, server_address, password):
 | |
|         super().__init__(server_address, password)
 | |
|         self.bonus_items = []
 | |
|         self.nes_streams: (StreamReader, StreamWriter) = None
 | |
|         self.nes_sync_task = None
 | |
|         self.messages = {}
 | |
|         self.locations_array = None
 | |
|         self.nes_status = CONNECTION_INITIAL_STATUS
 | |
|         self.game = 'The Legend of Zelda'
 | |
|         self.awaiting_rom = False
 | |
|         self.shop_slots_left = 0
 | |
|         self.shop_slots_middle = 0
 | |
|         self.shop_slots_right = 0
 | |
|         self.shop_slots = [self.shop_slots_left, self.shop_slots_middle, self.shop_slots_right]
 | |
|         self.slot_data = dict()
 | |
| 
 | |
|     async def server_auth(self, password_requested: bool = False):
 | |
|         if password_requested and not self.password:
 | |
|             await super(ZeldaContext, self).server_auth(password_requested)
 | |
|         if not self.auth:
 | |
|             self.awaiting_rom = True
 | |
|             logger.info('Awaiting connection to NES to get Player information')
 | |
|             return
 | |
| 
 | |
|         await self.send_connect()
 | |
| 
 | |
|     def _set_message(self, msg: str, msg_id: int):
 | |
|         if DISPLAY_MSGS:
 | |
|             self.messages[(time.time(), msg_id)] = msg
 | |
| 
 | |
|     def on_package(self, cmd: str, args: dict):
 | |
|         if cmd == 'Connected':
 | |
|             self.slot_data = args.get("slot_data", {})
 | |
|             asyncio.create_task(parse_locations(self.locations_array, self, True))
 | |
|         elif cmd == 'Print':
 | |
|             msg = args['text']
 | |
|             if ': !' not in msg:
 | |
|                 self._set_message(msg, SYSTEM_MESSAGE_ID)
 | |
| 
 | |
|     def on_print_json(self, args: dict):
 | |
|         if self.ui:
 | |
|             self.ui.print_json(copy.deepcopy(args["data"]))
 | |
|         else:
 | |
|             text = self.jsontotextparser(copy.deepcopy(args["data"]))
 | |
|             logger.info(text)
 | |
|         relevant = args.get("type", None) in {"Hint", "ItemSend"}
 | |
|         if relevant:
 | |
|             item = args["item"]
 | |
|             # goes to this world
 | |
|             if self.slot_concerns_self(args["receiving"]):
 | |
|                 relevant = True
 | |
|             # found in this world
 | |
|             elif self.slot_concerns_self(item.player):
 | |
|                 relevant = True
 | |
|             # not related
 | |
|             else:
 | |
|                 relevant = False
 | |
|             if relevant:
 | |
|                 item = args["item"]
 | |
|                 msg = self.raw_text_parser(copy.deepcopy(args["data"]))
 | |
|                 self._set_message(msg, item.item)
 | |
| 
 | |
|     def run_gui(self):
 | |
|         from kvui import GameManager
 | |
| 
 | |
|         class ZeldaManager(GameManager):
 | |
|             logging_pairs = [
 | |
|                 ("Client", "Archipelago")
 | |
|             ]
 | |
|             base_title = "Archipelago Zelda 1 Client"
 | |
| 
 | |
|         self.ui = ZeldaManager(self)
 | |
|         self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI")
 | |
| 
 | |
| 
 | |
| def get_payload(ctx: ZeldaContext):
 | |
|     current_time = time.time()
 | |
|     bonus_items = [item for item in ctx.bonus_items]
 | |
|     return json.dumps(
 | |
|         {
 | |
|             "items": [item.item for item in ctx.items_received],
 | |
|             "messages": {f'{key[0]}:{key[1]}': value for key, value in ctx.messages.items()
 | |
|                          if key[0] > current_time - 10},
 | |
|             "shops": {
 | |
|                 "left": ctx.shop_slots_left,
 | |
|                 "middle": ctx.shop_slots_middle,
 | |
|                 "right": ctx.shop_slots_right
 | |
|             },
 | |
|             "bonusItems": bonus_items
 | |
|         }
 | |
|     )
 | |
| 
 | |
| 
 | |
| def reconcile_shops(ctx: ZeldaContext):
 | |
|     checked_location_names = [lookup_any_location_id_to_name[location] for location in ctx.checked_locations]
 | |
|     shops = [location for location in checked_location_names if "Shop" in location]
 | |
|     left_slots = [shop for shop in shops if "Left" in shop]
 | |
|     middle_slots = [shop for shop in shops if "Middle" in shop]
 | |
|     right_slots = [shop for shop in shops if "Right" in shop]
 | |
|     for shop in left_slots:
 | |
|         ctx.shop_slots_left |= get_shop_bit_from_name(shop)
 | |
|     for shop in middle_slots:
 | |
|         ctx.shop_slots_middle |= get_shop_bit_from_name(shop)
 | |
|     for shop in right_slots:
 | |
|         ctx.shop_slots_right |= get_shop_bit_from_name(shop)
 | |
| 
 | |
| 
 | |
| def get_shop_bit_from_name(location_name):
 | |
|     if "Potion" in location_name:
 | |
|         return Rom.potion_shop
 | |
|     elif "Arrow" in location_name:
 | |
|         return Rom.arrow_shop
 | |
|     elif "Shield" in location_name:
 | |
|         return Rom.shield_shop
 | |
|     elif "Ring" in location_name:
 | |
|         return Rom.ring_shop
 | |
|     elif "Candle" in location_name:
 | |
|         return Rom.candle_shop
 | |
|     elif "Take" in location_name:
 | |
|         return Rom.take_any
 | |
|     return 0  # this should never be hit
 | |
| 
 | |
| 
 | |
| async def parse_locations(locations_array, ctx: ZeldaContext, force: bool, zone="None"):
 | |
|     if locations_array == ctx.locations_array and not force:
 | |
|         return
 | |
|     else:
 | |
|         # print("New values")
 | |
|         ctx.locations_array = locations_array
 | |
|         locations_checked = []
 | |
|         location = None
 | |
|         for location in ctx.missing_locations:
 | |
|             location_name = lookup_any_location_id_to_name[location]
 | |
| 
 | |
|             if location_name in Locations.overworld_locations and zone == "overworld":
 | |
|                 status = locations_array[Locations.major_location_offsets[location_name]]
 | |
|                 if location_name == "Ocean Heart Container":
 | |
|                     status = locations_array[ctx.overworld_item]
 | |
|                 if location_name == "Armos Knights":
 | |
|                     status = locations_array[ctx.armos_item]
 | |
|                 if status & 0x10:
 | |
|                     ctx.locations_checked.add(location)
 | |
|                     locations_checked.append(location)
 | |
|             elif location_name in Locations.underworld1_locations and zone == "underworld1":
 | |
|                 status = locations_array[Locations.floor_location_game_offsets_early[location_name]]
 | |
|                 if status & 0x10:
 | |
|                     ctx.locations_checked.add(location)
 | |
|                     locations_checked.append(location)
 | |
|             elif location_name in Locations.underworld2_locations and zone == "underworld2":
 | |
|                 status = locations_array[Locations.floor_location_game_offsets_late[location_name]]
 | |
|                 if status & 0x10:
 | |
|                     ctx.locations_checked.add(location)
 | |
|                     locations_checked.append(location)
 | |
|             elif (location_name in Locations.shop_locations or "Take" in location_name) and zone == "caves":
 | |
|                 shop_bit = get_shop_bit_from_name(location_name)
 | |
|                 slot = 0
 | |
|                 context_slot = 0
 | |
|                 if "Left" in location_name:
 | |
|                     slot = "slot1"
 | |
|                     context_slot = 0
 | |
|                 elif "Middle" in location_name:
 | |
|                     slot = "slot2"
 | |
|                     context_slot = 1
 | |
|                 elif "Right" in location_name:
 | |
|                     slot = "slot3"
 | |
|                     context_slot = 2
 | |
|                 if locations_array[slot] & shop_bit > 0:
 | |
|                     locations_checked.append(location)
 | |
|                     ctx.shop_slots[context_slot] |= shop_bit
 | |
|                 if locations_array["takeAnys"] and locations_array["takeAnys"] >= 4:
 | |
|                     if "Take Any" in location_name:
 | |
|                         short_name = None
 | |
|                         if "Left" in location_name:
 | |
|                             short_name = "TakeAnyLeft"
 | |
|                         elif "Middle" in location_name:
 | |
|                             short_name = "TakeAnyMiddle"
 | |
|                         elif "Right" in location_name:
 | |
|                             short_name = "TakeAnyRight"
 | |
|                         if short_name is not None:
 | |
|                             item_code = ctx.slot_data[short_name]
 | |
|                             if item_code > 0:
 | |
|                                 ctx.bonus_items.append(item_code)
 | |
|                             locations_checked.append(location)
 | |
|         if locations_checked:
 | |
|             await ctx.send_msgs([
 | |
|                 {"cmd": "LocationChecks",
 | |
|                  "locations": locations_checked}
 | |
|             ])
 | |
| 
 | |
| 
 | |
| async def nes_sync_task(ctx: ZeldaContext):
 | |
|     logger.info("Starting nes connector. Use /nes for status information")
 | |
|     while not ctx.exit_event.is_set():
 | |
|         error_status = None
 | |
|         if ctx.nes_streams:
 | |
|             (reader, writer) = ctx.nes_streams
 | |
|             msg = get_payload(ctx).encode()
 | |
|             writer.write(msg)
 | |
|             writer.write(b'\n')
 | |
|             try:
 | |
|                 await asyncio.wait_for(writer.drain(), timeout=1.5)
 | |
|                 try:
 | |
|                     # Data will return a dict with up to two fields:
 | |
|                     # 1. A keepalive response of the Players Name (always)
 | |
|                     # 2. An array representing the memory values of the locations area (if in game)
 | |
|                     data = await asyncio.wait_for(reader.readline(), timeout=5)
 | |
|                     data_decoded = json.loads(data.decode())
 | |
|                     if data_decoded["overworldHC"] is not None:
 | |
|                         ctx.overworld_item = data_decoded["overworldHC"]
 | |
|                     if data_decoded["overworldPB"] is not None:
 | |
|                         ctx.armos_item = data_decoded["overworldPB"]
 | |
|                     if data_decoded['gameMode'] == 19 and ctx.finished_game == False:
 | |
|                         await ctx.send_msgs([
 | |
|                             {"cmd": "StatusUpdate",
 | |
|                              "status": 30}
 | |
|                         ])
 | |
|                         ctx.finished_game = True
 | |
|                     if ctx.game is not None and 'overworld' in data_decoded:
 | |
|                         # Not just a keep alive ping, parse
 | |
|                         asyncio.create_task(parse_locations(data_decoded['overworld'], ctx, False, "overworld"))
 | |
|                     if ctx.game is not None and 'underworld1' in data_decoded:
 | |
|                         asyncio.create_task(parse_locations(data_decoded['underworld1'], ctx, False, "underworld1"))
 | |
|                     if ctx.game is not None and 'underworld2' in data_decoded:
 | |
|                         asyncio.create_task(parse_locations(data_decoded['underworld2'], ctx, False, "underworld2"))
 | |
|                     if ctx.game is not None and 'caves' in data_decoded:
 | |
|                         asyncio.create_task(parse_locations(data_decoded['caves'], ctx, False, "caves"))
 | |
|                     if not ctx.auth:
 | |
|                         ctx.auth = ''.join([chr(i) for i in data_decoded['playerName'] if i != 0])
 | |
|                         if ctx.auth == '':
 | |
|                             logger.info("Invalid ROM detected. No player name built into the ROM. Please regenerate"
 | |
|                                         "the ROM using the same link but adding your slot name")
 | |
|                         if ctx.awaiting_rom:
 | |
|                             await ctx.server_auth(False)
 | |
|                     reconcile_shops(ctx)
 | |
|                 except asyncio.TimeoutError:
 | |
|                     logger.debug("Read Timed Out, Reconnecting")
 | |
|                     error_status = CONNECTION_TIMING_OUT_STATUS
 | |
|                     writer.close()
 | |
|                     ctx.nes_streams = None
 | |
|                 except ConnectionResetError as e:
 | |
|                     logger.debug("Read failed due to Connection Lost, Reconnecting")
 | |
|                     error_status = CONNECTION_RESET_STATUS
 | |
|                     writer.close()
 | |
|                     ctx.nes_streams = None
 | |
|             except TimeoutError:
 | |
|                 logger.debug("Connection Timed Out, Reconnecting")
 | |
|                 error_status = CONNECTION_TIMING_OUT_STATUS
 | |
|                 writer.close()
 | |
|                 ctx.nes_streams = None
 | |
|             except ConnectionResetError:
 | |
|                 logger.debug("Connection Lost, Reconnecting")
 | |
|                 error_status = CONNECTION_RESET_STATUS
 | |
|                 writer.close()
 | |
|                 ctx.nes_streams = None
 | |
|             if ctx.nes_status == CONNECTION_TENTATIVE_STATUS:
 | |
|                 if not error_status:
 | |
|                     logger.info("Successfully Connected to NES")
 | |
|                     ctx.nes_status = CONNECTION_CONNECTED_STATUS
 | |
|                 else:
 | |
|                     ctx.nes_status = f"Was tentatively connected but error occured: {error_status}"
 | |
|             elif error_status:
 | |
|                 ctx.nes_status = error_status
 | |
|                 logger.info("Lost connection to nes and attempting to reconnect. Use /nes for status updates")
 | |
|         else:
 | |
|             try:
 | |
|                 logger.debug("Attempting to connect to NES")
 | |
|                 ctx.nes_streams = await asyncio.wait_for(asyncio.open_connection("localhost", 52980), timeout=10)
 | |
|                 ctx.nes_status = CONNECTION_TENTATIVE_STATUS
 | |
|             except TimeoutError:
 | |
|                 logger.debug("Connection Timed Out, Trying Again")
 | |
|                 ctx.nes_status = CONNECTION_TIMING_OUT_STATUS
 | |
|                 continue
 | |
|             except ConnectionRefusedError:
 | |
|                 logger.debug("Connection Refused, Trying Again")
 | |
|                 ctx.nes_status = CONNECTION_REFUSED_STATUS
 | |
|                 continue
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     # Text Mode to use !hint and such with games that have no text entry
 | |
|     Utils.init_logging("ZeldaClient")
 | |
| 
 | |
|     options = Utils.get_options()
 | |
|     DISPLAY_MSGS = options["tloz_options"]["display_msgs"]
 | |
| 
 | |
| 
 | |
|     async def run_game(romfile: str) -> None:
 | |
|         auto_start = typing.cast(typing.Union[bool, str],
 | |
|                                  Utils.get_options()["tloz_options"].get("rom_start", True))
 | |
|         if auto_start is True:
 | |
|             import webbrowser
 | |
|             webbrowser.open(romfile)
 | |
|         elif isinstance(auto_start, str) and os.path.isfile(auto_start):
 | |
|             subprocess.Popen([auto_start, romfile],
 | |
|                              stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
 | |
| 
 | |
| 
 | |
|     async def main(args):
 | |
|         if args.diff_file:
 | |
|             import Patch
 | |
|             logging.info("Patch file was supplied. Creating nes rom..")
 | |
|             meta, romfile = Patch.create_rom_file(args.diff_file)
 | |
|             if "server" in meta:
 | |
|                 args.connect = meta["server"]
 | |
|             logging.info(f"Wrote rom file to {romfile}")
 | |
|             async_start(run_game(romfile))
 | |
|         ctx = ZeldaContext(args.connect, args.password)
 | |
|         ctx.server_task = asyncio.create_task(server_loop(ctx), name="ServerLoop")
 | |
|         if gui_enabled:
 | |
|             ctx.run_gui()
 | |
|         ctx.run_cli()
 | |
|         ctx.nes_sync_task = asyncio.create_task(nes_sync_task(ctx), name="NES Sync")
 | |
| 
 | |
|         await ctx.exit_event.wait()
 | |
|         ctx.server_address = None
 | |
| 
 | |
|         await ctx.shutdown()
 | |
| 
 | |
|         if ctx.nes_sync_task:
 | |
|             await ctx.nes_sync_task
 | |
| 
 | |
| 
 | |
|     import colorama
 | |
| 
 | |
|     parser = get_base_parser()
 | |
|     parser.add_argument('diff_file', default="", type=str, nargs="?",
 | |
|                         help='Path to a Archipelago Binary Patch file')
 | |
|     args = parser.parse_args()
 | |
|     colorama.init()
 | |
| 
 | |
|     asyncio.run(main(args))
 | |
|     colorama.deinit()
 |