 4641456ba2
			
		
	
	4641456ba2
	
	
	
		
			
			Co-authored-by: el-u <109771707+el-u@users.noreply.github.com> Co-authored-by: Zach Parks <zach@alliware.com>
		
			
				
	
	
		
			377 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			377 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import asyncio
 | |
| import hashlib
 | |
| import json
 | |
| import os
 | |
| import multiprocessing
 | |
| import subprocess
 | |
| import zipfile
 | |
| 
 | |
| from asyncio import StreamReader, StreamWriter
 | |
| 
 | |
| import bsdiff4
 | |
| 
 | |
| from CommonClient import CommonContext, server_loop, gui_enabled, \
 | |
|     ClientCommandProcessor, logger, get_base_parser
 | |
| import Utils
 | |
| from NetUtils import ClientStatus
 | |
| from worlds.mmbn3.Items import items_by_id
 | |
| from worlds.mmbn3.Rom import get_base_rom_path
 | |
| from worlds.mmbn3.Locations import all_locations, scoutable_locations
 | |
| 
 | |
| SYSTEM_MESSAGE_ID = 0
 | |
| 
 | |
| CONNECTION_TIMING_OUT_STATUS = "Connection timing out. Please restart your emulator, then restart connector_mmbn3.lua"
 | |
| CONNECTION_REFUSED_STATUS = \
 | |
|     "Connection refused. Please start your emulator and make sure connector_mmbn3.lua is running"
 | |
| CONNECTION_RESET_STATUS = "Connection was reset. Please restart your emulator, then restart connector_mmbn3.lua"
 | |
| CONNECTION_TENTATIVE_STATUS = "Initial Connection Made"
 | |
| CONNECTION_CONNECTED_STATUS = "Connected"
 | |
| CONNECTION_INITIAL_STATUS = "Connection has not been initiated"
 | |
| CONNECTION_INCORRECT_ROM = "Supplied Base Rom does not match US GBA Blue Version. Please provide the correct ROM version"
 | |
| 
 | |
| script_version: int = 2
 | |
| 
 | |
| debugEnabled = False
 | |
| locations_checked = []
 | |
| items_sent = []
 | |
| itemIndex = 1
 | |
| 
 | |
| CHECKSUM_BLUE = "6fe31df0144759b34ad666badaacc442"
 | |
| 
 | |
| 
 | |
| class MMBN3CommandProcessor(ClientCommandProcessor):
 | |
|     def __init__(self, ctx):
 | |
|         super().__init__(ctx)
 | |
| 
 | |
|     def _cmd_gba(self):
 | |
|         """Check GBA Connection State"""
 | |
|         if isinstance(self.ctx, MMBN3Context):
 | |
|             logger.info(f"GBA Status: {self.ctx.gba_status}")
 | |
| 
 | |
|     def _cmd_debug(self):
 | |
|         """Toggle the Debug Text overlay in ROM"""
 | |
|         global debugEnabled
 | |
|         debugEnabled = not debugEnabled
 | |
|         logger.info("Debug Overlay Enabled" if debugEnabled else "Debug Overlay Disabled")
 | |
| 
 | |
| 
 | |
| class MMBN3Context(CommonContext):
 | |
|     command_processor = MMBN3CommandProcessor
 | |
|     game = "MegaMan Battle Network 3"
 | |
|     items_handling = 0b101  # full local except starting items
 | |
| 
 | |
|     def __init__(self, server_address, password):
 | |
|         super().__init__(server_address, password)
 | |
|         self.gba_streams: (StreamReader, StreamWriter) = None
 | |
|         self.gba_sync_task = None
 | |
|         self.gba_status = CONNECTION_INITIAL_STATUS
 | |
|         self.awaiting_rom = False
 | |
|         self.location_table = {}
 | |
|         self.version_warning = False
 | |
|         self.auth_name = None
 | |
|         self.slot_data = dict()
 | |
|         self.patching_error = False
 | |
|         self.sent_hints = []
 | |
| 
 | |
|     async def server_auth(self, password_requested: bool = False):
 | |
|         if password_requested and not self.password:
 | |
|             await super(MMBN3Context, self).server_auth(password_requested)
 | |
| 
 | |
|         if self.auth_name is None:
 | |
|             self.awaiting_rom = True
 | |
|             logger.info("No ROM detected, awaiting conection to Bizhawk to authenticate to the multiworld server")
 | |
|             return
 | |
| 
 | |
|         logger.info("Attempting to decode from ROM... ")
 | |
|         self.awaiting_rom = False
 | |
|         self.auth = self.auth_name.decode("utf8").replace('\x00', '')
 | |
|         logger.info("Connecting as "+self.auth)
 | |
|         await self.send_connect(name=self.auth)
 | |
| 
 | |
|     def run_gui(self):
 | |
|         from kvui import GameManager
 | |
| 
 | |
|         class MMBN3Manager(GameManager):
 | |
|             logging_pairs = [
 | |
|                 ("Client", "Archipelago")
 | |
|             ]
 | |
|             base_title = "Archipelago MegaMan Battle Network 3 Client"
 | |
| 
 | |
|         self.ui = MMBN3Manager(self)
 | |
|         self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI")
 | |
| 
 | |
|     def on_package(self, cmd: str, args: dict):
 | |
|         if cmd == 'Connected':
 | |
|             self.slot_data = args.get("slot_data", {})
 | |
|             print(self.slot_data)
 | |
| 
 | |
| class ItemInfo:
 | |
|     id = 0x00
 | |
|     sender = ""
 | |
|     type = ""
 | |
|     count = 1
 | |
|     itemName = "Unknown"
 | |
|     itemID = 0x00  # Item ID, Chip ID, etc.
 | |
|     subItemID = 0x00  # Code for chips, color for programs
 | |
|     itemIndex = 1
 | |
| 
 | |
|     def __init__(self, id, sender, type):
 | |
|         self.id = id
 | |
|         self.sender = sender
 | |
|         self.type = type
 | |
| 
 | |
|     def get_json(self):
 | |
|         json_data = {
 | |
|             "id": self.id,
 | |
|             "sender": self.sender,
 | |
|             "type": self.type,
 | |
|             "itemName": self.itemName,
 | |
|             "itemID": self.itemID,
 | |
|             "subItemID": self.subItemID,
 | |
|             "count": self.count,
 | |
|             "itemIndex": self.itemIndex
 | |
|         }
 | |
|         return json_data
 | |
| 
 | |
| 
 | |
| def get_payload(ctx: MMBN3Context):
 | |
|     global debugEnabled
 | |
| 
 | |
|     items_sent = []
 | |
|     for i, item in enumerate(ctx.items_received):
 | |
|         item_data = items_by_id[item.item]
 | |
|         new_item = ItemInfo(i, ctx.player_names[item.player], item_data.type)
 | |
|         new_item.itemIndex = i+1
 | |
|         new_item.itemName = item_data.itemName
 | |
|         new_item.type = item_data.type
 | |
|         new_item.itemID = item_data.itemID
 | |
|         new_item.subItemID = item_data.subItemID
 | |
|         new_item.count = item_data.count
 | |
|         items_sent.append(new_item)
 | |
| 
 | |
|     return json.dumps({
 | |
|         "items": [item.get_json() for item in items_sent],
 | |
|         "debug": debugEnabled
 | |
|         })
 | |
| 
 | |
| 
 | |
| async def parse_payload(payload: dict, ctx: MMBN3Context, force: bool):
 | |
|     # Game completion handling
 | |
|     if payload["gameComplete"] and not ctx.finished_game:
 | |
|         await ctx.send_msgs([{
 | |
|             "cmd": "StatusUpdate",
 | |
|             "status": ClientStatus.CLIENT_GOAL
 | |
|         }])
 | |
|         ctx.finished_game = True
 | |
| 
 | |
|     # Locations handling
 | |
|     if ctx.location_table != payload["locations"]:
 | |
|         ctx.location_table = payload["locations"]
 | |
|         locs = [loc.id for loc in all_locations
 | |
|                 if check_location_packet(loc, ctx.location_table)]
 | |
|         await ctx.send_msgs([{
 | |
|             "cmd": "LocationChecks",
 | |
|             "locations": locs
 | |
|         }])
 | |
| 
 | |
|     # If trade hinting is enabled, send scout checks
 | |
|     if ctx.slot_data.get("trade_quest_hinting", 0) == 2:
 | |
|         trade_bits = [loc.id for loc in scoutable_locations
 | |
|                         if check_location_scouted(loc, payload["locations"])]
 | |
|         scouted_locs = [loc for loc in trade_bits if loc not in ctx.sent_hints]
 | |
|         if len(scouted_locs) > 0:
 | |
|             ctx.sent_hints.extend(scouted_locs)
 | |
|             await ctx.send_msgs([{
 | |
|                 "cmd": "LocationScouts",
 | |
|                 "locations": scouted_locs,
 | |
|                 "create_as_hint": 2
 | |
|             }])
 | |
| 
 | |
| 
 | |
| def check_location_packet(location, memory):
 | |
|     if len(memory) == 0:
 | |
|         return False
 | |
|     # Our keys have to be strings to come through the JSON lua plugin so we have to turn our memory address into a string as well
 | |
|     location_key = hex(location.flag_byte)[2:]
 | |
|     byte = memory.get(location_key)
 | |
|     if byte is not None:
 | |
|         return byte & location.flag_mask
 | |
| 
 | |
| 
 | |
| def check_location_scouted(location, memory):
 | |
|     if len(memory) == 0:
 | |
|         return False
 | |
|     location_key = hex(location.hint_flag)[2:]
 | |
|     byte = memory.get(location_key)
 | |
|     if byte is not None:
 | |
|         return byte & location.hint_flag_mask
 | |
| 
 | |
| 
 | |
| async def gba_sync_task(ctx: MMBN3Context):
 | |
|     logger.info("Starting GBA connector. Use /gba for status information.")
 | |
|     if ctx.patching_error:
 | |
|         logger.error('Unable to Patch ROM. No ROM provided or ROM does not match US GBA Blue Version.')
 | |
|     while not ctx.exit_event.is_set():
 | |
|         error_status = None
 | |
|         if ctx.gba_streams:
 | |
|             (reader, writer) = ctx.gba_streams
 | |
|             msg = get_payload(ctx).encode()
 | |
|             writer.write(msg)
 | |
|             writer.write(b'\n')
 | |
|             try:
 | |
|                 await asyncio.wait_for(writer.drain(), timeout=1.5)
 | |
|                 try:
 | |
|                     # Data will return a dict with up to four fields
 | |
|                     # 1. str: player name (always)
 | |
|                     # 2. int: script version (always)
 | |
|                     # 3. dict[str, byte]: value of location's memory byte
 | |
|                     # 4. bool: whether the game currently registers as complete
 | |
|                     data = await asyncio.wait_for(reader.readline(), timeout=10)
 | |
|                     data_decoded = json.loads(data.decode())
 | |
|                     reported_version = data_decoded.get("scriptVersion", 0)
 | |
|                     if reported_version >= script_version:
 | |
|                         if ctx.game is not None and "locations" in data_decoded:
 | |
|                             # Not just a keep alive ping, parse
 | |
|                             asyncio.create_task((parse_payload(data_decoded, ctx, False)))
 | |
|                         if not ctx.auth:
 | |
|                             ctx.auth_name = bytes(data_decoded["playerName"])
 | |
| 
 | |
|                             if ctx.awaiting_rom:
 | |
|                                 logger.info("Awaiting data from ROM...")
 | |
|                                 await ctx.server_auth(False)
 | |
|                     else:
 | |
|                         if not ctx.version_warning:
 | |
|                             logger.warning(f"Your Lua script is version {reported_version}, expected {script_version}."
 | |
|                                            "Please update to the latest version."
 | |
|                                            "Your connection to the Archipelago server will not be accepted.")
 | |
|                             ctx.version_warning = True
 | |
|                 except asyncio.TimeoutError:
 | |
|                     logger.debug("Read Timed Out, Reconnecting")
 | |
|                     error_status = CONNECTION_TIMING_OUT_STATUS
 | |
|                     writer.close()
 | |
|                     ctx.gba_streams = None
 | |
|                 except ConnectionResetError:
 | |
|                     logger.debug("Read failed due to Connection Lost, Reconnecting")
 | |
|                     error_status = CONNECTION_RESET_STATUS
 | |
|                     writer.close()
 | |
|                     ctx.gba_streams = None
 | |
|             except TimeoutError:
 | |
|                 logger.debug("Connection Timed Out, Reconnecting")
 | |
|                 error_status = CONNECTION_TIMING_OUT_STATUS
 | |
|                 writer.close()
 | |
|                 ctx.gba_streams = None
 | |
|             except ConnectionResetError:
 | |
|                 logger.debug("Connection Lost, Reconnecting")
 | |
|                 error_status = CONNECTION_RESET_STATUS
 | |
|                 writer.close()
 | |
|                 ctx.gba_streams = None
 | |
|             if ctx.gba_status == CONNECTION_TENTATIVE_STATUS:
 | |
|                 if not error_status:
 | |
|                     logger.info("Successfully Connected to GBA")
 | |
|                     ctx.gba_status = CONNECTION_CONNECTED_STATUS
 | |
|                 else:
 | |
|                     ctx.gba_status = f"Was tentatively connected but error occurred: {error_status}"
 | |
|             elif error_status:
 | |
|                 ctx.gba_status = error_status
 | |
|                 logger.info("Lost connection to GBA and attempting to reconnect. Use /gba for status updates")
 | |
|         else:
 | |
|             try:
 | |
|                 logger.debug("Attempting to connect to GBA")
 | |
|                 ctx.gba_streams = await asyncio.wait_for(asyncio.open_connection("localhost", 28922), timeout=10)
 | |
|                 ctx.gba_status = CONNECTION_TENTATIVE_STATUS
 | |
|             except TimeoutError:
 | |
|                 logger.debug("Connection Timed Out, Trying Again")
 | |
|                 ctx.gba_status = CONNECTION_TIMING_OUT_STATUS
 | |
|                 continue
 | |
|             except ConnectionRefusedError:
 | |
|                 logger.debug("Connection Refused, Trying Again")
 | |
|                 ctx.gba_status = CONNECTION_REFUSED_STATUS
 | |
|                 continue
 | |
| 
 | |
| 
 | |
| async def run_game(romfile):
 | |
|     options = Utils.get_options().get("mmbn3_options", None)
 | |
|     if options is None:
 | |
|         auto_start = True
 | |
|     else:
 | |
|         auto_start = options.get("rom_start", True)
 | |
|     if auto_start:
 | |
|         import webbrowser
 | |
|         webbrowser.open(romfile)
 | |
|     elif os.path.isfile(auto_start):
 | |
|         subprocess.Popen([auto_start, romfile],
 | |
|                          stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
 | |
| 
 | |
| 
 | |
| async def patch_and_run_game(apmmbn3_file):
 | |
|     base_name = os.path.splitext(apmmbn3_file)[0]
 | |
| 
 | |
|     with zipfile.ZipFile(apmmbn3_file, 'r') as patch_archive:
 | |
|         try:
 | |
|             with patch_archive.open("delta.bsdiff4", 'r') as stream:
 | |
|                 patch_data = stream.read()
 | |
|         except KeyError:
 | |
|             raise FileNotFoundError("Patch file missing from archive.")
 | |
|     rom_file = get_base_rom_path()
 | |
| 
 | |
|     with open(rom_file, 'rb') as rom:
 | |
|         rom_bytes = rom.read()
 | |
| 
 | |
|     patched_bytes = bsdiff4.patch(rom_bytes, patch_data)
 | |
|     patched_rom_file = base_name+".gba"
 | |
|     with open(patched_rom_file, 'wb') as patched_rom:
 | |
|         patched_rom.write(patched_bytes)
 | |
| 
 | |
|     asyncio.create_task(run_game(patched_rom_file))
 | |
| 
 | |
| 
 | |
| def confirm_checksum():
 | |
|     rom_file = get_base_rom_path()
 | |
|     if not os.path.exists(rom_file):
 | |
|         return False
 | |
| 
 | |
|     with open(rom_file, 'rb') as rom:
 | |
|         rom_bytes = rom.read()
 | |
| 
 | |
|     basemd5 = hashlib.md5()
 | |
|     basemd5.update(rom_bytes)
 | |
|     return CHECKSUM_BLUE == basemd5.hexdigest()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     Utils.init_logging("MMBN3Client")
 | |
| 
 | |
|     async def main():
 | |
|         multiprocessing.freeze_support()
 | |
|         parser = get_base_parser()
 | |
|         parser.add_argument("patch_file", default="", type=str, nargs="?",
 | |
|                             help="Path to an APMMBN3 file")
 | |
|         args = parser.parse_args()
 | |
|         checksum_matches = confirm_checksum()
 | |
|         if checksum_matches:
 | |
|             if args.patch_file:
 | |
|                 asyncio.create_task(patch_and_run_game(args.patch_file))
 | |
| 
 | |
|         ctx = MMBN3Context(args.connect, args.password)
 | |
|         if not checksum_matches:
 | |
|             ctx.patching_error = True
 | |
|         ctx.server_task = asyncio.create_task(server_loop(ctx), name="Server Loop")
 | |
|         if gui_enabled:
 | |
|             ctx.run_gui()
 | |
|         ctx.run_cli()
 | |
| 
 | |
|         ctx.gba_sync_task = asyncio.create_task(gba_sync_task(ctx), name="GBA Sync")
 | |
|         await ctx.exit_event.wait()
 | |
|         ctx.server_address = None
 | |
|         await ctx.shutdown()
 | |
| 
 | |
|         if ctx.gba_sync_task:
 | |
|             await ctx.gba_sync_task
 | |
| 
 | |
|     import colorama
 | |
| 
 | |
|     colorama.init()
 | |
| 
 | |
|     asyncio.run(main())
 | |
|     colorama.deinit()
 |