| 
									
										
										
										
											2024-05-06 03:15:06 -04:00
										 |  |  | from typing import TYPE_CHECKING, Optional, Set, List, Dict | 
					
						
							|  |  |  | import struct | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from NetUtils import ClientStatus | 
					
						
							|  |  |  | from .Locations import roomCount, nonBlock, beanstones, roomException, shop, badge, pants, eReward | 
					
						
							|  |  |  | from .Items import items_by_id | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import asyncio | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import worlds._bizhawk as bizhawk | 
					
						
							|  |  |  | from worlds._bizhawk.client import BizHawkClient | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if TYPE_CHECKING: | 
					
						
							|  |  |  |     from worlds._bizhawk.context import BizHawkClientContext | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | ROOM_ARRAY_POINTER = 0x51FA00 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class MLSSClient(BizHawkClient): | 
					
						
							|  |  |  |     game = "Mario & Luigi Superstar Saga" | 
					
						
							|  |  |  |     system = "GBA" | 
					
						
							|  |  |  |     patch_suffix = ".apmlss" | 
					
						
							|  |  |  |     local_checked_locations: Set[int] | 
					
						
							|  |  |  |     goal_flag: int | 
					
						
							|  |  |  |     rom_slot_name: Optional[str] | 
					
						
							|  |  |  |     eUsed: List[int] | 
					
						
							|  |  |  |     room: int | 
					
						
							|  |  |  |     local_events: List[int] | 
					
						
							|  |  |  |     player_name: Optional[str] | 
					
						
							|  |  |  |     checked_flags: Dict[int, list] = {} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self) -> None: | 
					
						
							|  |  |  |         super().__init__() | 
					
						
							|  |  |  |         self.local_checked_locations = set() | 
					
						
							|  |  |  |         self.local_set_events = {} | 
					
						
							|  |  |  |         self.local_found_key_items = {} | 
					
						
							|  |  |  |         self.rom_slot_name = None | 
					
						
							|  |  |  |         self.seed_verify = False | 
					
						
							|  |  |  |         self.eUsed = [] | 
					
						
							|  |  |  |         self.room = 0 | 
					
						
							|  |  |  |         self.local_events = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     async def validate_rom(self, ctx: "BizHawkClientContext") -> bool: | 
					
						
							|  |  |  |         from CommonClient import logger | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             # Check ROM name/patch version | 
					
						
							|  |  |  |             rom_name_bytes = await bizhawk.read(ctx.bizhawk_ctx, [(0xA0, 14, "ROM")]) | 
					
						
							|  |  |  |             rom_name = bytes([byte for byte in rom_name_bytes[0] if byte != 0]).decode("UTF-8") | 
					
						
							| 
									
										
										
										
											2024-05-18 16:26:50 -04:00
										 |  |  |             if not rom_name.startswith("MARIO&LUIGIUA8"): | 
					
						
							| 
									
										
										
										
											2024-05-06 03:15:06 -04:00
										 |  |  |                 return False | 
					
						
							|  |  |  |         except UnicodeDecodeError: | 
					
						
							|  |  |  |             return False | 
					
						
							|  |  |  |         except bizhawk.RequestFailedError: | 
					
						
							|  |  |  |             return False  # Should verify on the next pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         ctx.game = self.game | 
					
						
							|  |  |  |         ctx.items_handling = 0b101 | 
					
						
							|  |  |  |         ctx.want_slot_data = True | 
					
						
							|  |  |  |         ctx.watcher_timeout = 0.125 | 
					
						
							|  |  |  |         self.rom_slot_name = rom_name | 
					
						
							|  |  |  |         self.seed_verify = False | 
					
						
							|  |  |  |         name_bytes = (await bizhawk.read(ctx.bizhawk_ctx, [(0xDF0000, 16, "ROM")]))[0] | 
					
						
							|  |  |  |         name = bytes([byte for byte in name_bytes if byte != 0]).decode("UTF-8") | 
					
						
							|  |  |  |         self.player_name = name | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for i in range(59): | 
					
						
							|  |  |  |             self.checked_flags[i] = [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     async def set_auth(self, ctx: "BizHawkClientContext") -> None: | 
					
						
							|  |  |  |         ctx.auth = self.player_name | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def on_package(self, ctx, cmd, args) -> None: | 
					
						
							|  |  |  |         if cmd == "RoomInfo": | 
					
						
							|  |  |  |             ctx.seed_name = args["seed_name"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     async def game_watcher(self, ctx: "BizHawkClientContext") -> None: | 
					
						
							|  |  |  |         from CommonClient import logger | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             if ctx.seed_name is None: | 
					
						
							|  |  |  |                 return | 
					
						
							|  |  |  |             if not self.seed_verify: | 
					
						
							|  |  |  |                 seed = await bizhawk.read(ctx.bizhawk_ctx, [(0xDF00A0, len(ctx.seed_name), "ROM")]) | 
					
						
							|  |  |  |                 seed = seed[0].decode("UTF-8") | 
					
						
							| 
									
										
										
										
											2024-09-18 13:33:02 -04:00
										 |  |  |                 if seed not in ctx.seed_name: | 
					
						
							| 
									
										
										
										
											2024-05-06 03:15:06 -04:00
										 |  |  |                     logger.info( | 
					
						
							|  |  |  |                         "ERROR: The ROM you loaded is for a different game of AP. " | 
					
						
							|  |  |  |                         "Please make sure the host has sent you the correct patch file," | 
					
						
							|  |  |  |                         "and that you have opened the correct ROM." | 
					
						
							|  |  |  |                     ) | 
					
						
							|  |  |  |                     raise bizhawk.ConnectorError("Loaded ROM is for Incorrect lobby.") | 
					
						
							|  |  |  |                 self.seed_verify = True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             read_state = await bizhawk.read( | 
					
						
							|  |  |  |                 ctx.bizhawk_ctx, | 
					
						
							|  |  |  |                 [ | 
					
						
							|  |  |  |                     (0x4564, 59, "EWRAM"), | 
					
						
							|  |  |  |                     (0x2330, 2, "IWRAM"), | 
					
						
							|  |  |  |                     (0x3FE0, 1, "IWRAM"), | 
					
						
							|  |  |  |                     (0x304A, 1, "EWRAM"), | 
					
						
							|  |  |  |                     (0x304B, 1, "EWRAM"), | 
					
						
							|  |  |  |                     (0x304C, 4, "EWRAM"), | 
					
						
							|  |  |  |                     (0x3060, 6, "EWRAM"), | 
					
						
							|  |  |  |                     (0x4808, 2, "EWRAM"), | 
					
						
							|  |  |  |                     (0x4407, 1, "EWRAM"), | 
					
						
							|  |  |  |                     (0x2339, 1, "IWRAM"), | 
					
						
							|  |  |  |                 ] | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  |             flags = read_state[0] | 
					
						
							|  |  |  |             current_room = int.from_bytes(read_state[1], "little") | 
					
						
							|  |  |  |             shop_init = read_state[2][0] | 
					
						
							|  |  |  |             shop_scroll = read_state[3][0] & 0x1F | 
					
						
							|  |  |  |             is_buy = read_state[4][0] != 0 | 
					
						
							|  |  |  |             shop_address = (struct.unpack("<I", read_state[5])[0]) & 0xFFFFFF | 
					
						
							|  |  |  |             logo = bytes([byte for byte in read_state[6] if byte < 0x70]).decode("UTF-8") | 
					
						
							|  |  |  |             received_index = (read_state[7][0] << 8) + read_state[7][1] | 
					
						
							|  |  |  |             cackletta = read_state[8][0] & 0x40 | 
					
						
							|  |  |  |             shopping = read_state[9][0] & 0xF | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if logo != "MLSSAP": | 
					
						
							|  |  |  |                 return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             locs_to_send = set() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Checking shop purchases | 
					
						
							|  |  |  |             if is_buy: | 
					
						
							|  |  |  |                 await bizhawk.write(ctx.bizhawk_ctx, [(0x304A, [0x0, 0x0], "EWRAM")]) | 
					
						
							|  |  |  |                 if shop_address != 0x3C0618 and shop_address != 0x3C0684: | 
					
						
							|  |  |  |                     location = shop[shop_address][shop_scroll] | 
					
						
							|  |  |  |                 else: | 
					
						
							|  |  |  |                     if shop_init & 0x1 != 0: | 
					
						
							|  |  |  |                         location = badge[shop_address][shop_scroll] | 
					
						
							|  |  |  |                     else: | 
					
						
							|  |  |  |                         location = pants[shop_address][shop_scroll] | 
					
						
							|  |  |  |                 if location in ctx.server_locations: | 
					
						
							|  |  |  |                     locs_to_send.add(location) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Loop for receiving items. Item is written as an ID into 0x3057. | 
					
						
							|  |  |  |             # ASM reads the ID in a loop and give the player the item before resetting the RAM address to 0x0. | 
					
						
							|  |  |  |             # If RAM address isn't 0x0 yet break out and try again later to give the rest of the items | 
					
						
							|  |  |  |             for i in range(len(ctx.items_received) - received_index): | 
					
						
							|  |  |  |                 item_data = items_by_id[ctx.items_received[received_index + i].item] | 
					
						
							| 
									
										
										
										
											2024-09-18 13:33:02 -04:00
										 |  |  |                 result = False | 
					
						
							|  |  |  |                 total = 0 | 
					
						
							|  |  |  |                 while not result: | 
					
						
							|  |  |  |                     await asyncio.sleep(0.05) | 
					
						
							|  |  |  |                     total += 0.05 | 
					
						
							|  |  |  |                     result = await bizhawk.guarded_write( | 
					
						
							|  |  |  |                         ctx.bizhawk_ctx, | 
					
						
							|  |  |  |                         [ | 
					
						
							|  |  |  |                             (0x3057, [id_to_RAM(item_data.itemID)], "EWRAM") | 
					
						
							|  |  |  |                         ], | 
					
						
							|  |  |  |                         [(0x3057, [0x0], "EWRAM")] | 
					
						
							|  |  |  |                     ) | 
					
						
							|  |  |  |                     if result: | 
					
						
							|  |  |  |                         total = 0 | 
					
						
							|  |  |  |                     if total >= 1: | 
					
						
							|  |  |  |                         break | 
					
						
							|  |  |  |                 if not result: | 
					
						
							| 
									
										
										
										
											2024-05-06 03:15:06 -04:00
										 |  |  |                     break | 
					
						
							|  |  |  |                 await bizhawk.write( | 
					
						
							|  |  |  |                     ctx.bizhawk_ctx, | 
					
						
							|  |  |  |                     [ | 
					
						
							|  |  |  |                         (0x4808, [(received_index + i + 1) // 0x100, (received_index + i + 1) % 0x100], "EWRAM"), | 
					
						
							| 
									
										
										
										
											2024-09-18 13:33:02 -04:00
										 |  |  |                     ] | 
					
						
							| 
									
										
										
										
											2024-05-06 03:15:06 -04:00
										 |  |  |                 ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Early return and location send if you are currently in a shop, | 
					
						
							|  |  |  |             # since other flags aren't going to change | 
					
						
							|  |  |  |             if shopping & 0x3 == 0x3: | 
					
						
							|  |  |  |                 if locs_to_send != self.local_checked_locations: | 
					
						
							|  |  |  |                     self.local_checked_locations = locs_to_send | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                     if locs_to_send is not None: | 
					
						
							|  |  |  |                         await ctx.send_msgs([{"cmd": "LocationChecks", "locations": list(locs_to_send)}]) | 
					
						
							|  |  |  |                 return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Checking flags that aren't digspots or blocks | 
					
						
							|  |  |  |             for item in nonBlock: | 
					
						
							|  |  |  |                 address, mask, location = item | 
					
						
							|  |  |  |                 if location in self.local_checked_locations: | 
					
						
							|  |  |  |                     continue | 
					
						
							|  |  |  |                 flag_bytes = await bizhawk.read(ctx.bizhawk_ctx, [(address, 1, "EWRAM"), (0x3060, 6, "EWRAM")]) | 
					
						
							|  |  |  |                 flag_byte = flag_bytes[0][0] | 
					
						
							|  |  |  |                 backup_logo = bytes([byte for byte in flag_bytes[1] if byte < 0x70]).decode("UTF-8") | 
					
						
							|  |  |  |                 if backup_logo != "MLSSAP": | 
					
						
							|  |  |  |                     return | 
					
						
							|  |  |  |                 if flag_byte & mask != 0: | 
					
						
							|  |  |  |                     if location >= 0xDA0000 and location not in self.local_events: | 
					
						
							|  |  |  |                         self.local_events += [location] | 
					
						
							|  |  |  |                         await ctx.send_msgs( | 
					
						
							|  |  |  |                             [ | 
					
						
							|  |  |  |                                 { | 
					
						
							|  |  |  |                                     "cmd": "Set", | 
					
						
							|  |  |  |                                     "key": f"mlss_flag_{ctx.team}_{ctx.slot}", | 
					
						
							|  |  |  |                                     "default": 0, | 
					
						
							|  |  |  |                                     "want_reply": False, | 
					
						
							|  |  |  |                                     "operations": [{"operation": "or", "value": 1 << (location - 0xDA0000)}], | 
					
						
							|  |  |  |                                 } | 
					
						
							|  |  |  |                             ] | 
					
						
							|  |  |  |                         ) | 
					
						
							|  |  |  |                         continue | 
					
						
							|  |  |  |                     if location in roomException: | 
					
						
							|  |  |  |                         if current_room not in roomException[location]: | 
					
						
							|  |  |  |                             exception = True | 
					
						
							|  |  |  |                         else: | 
					
						
							|  |  |  |                             exception = False | 
					
						
							|  |  |  |                     else: | 
					
						
							|  |  |  |                         exception = True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                     if location in eReward: | 
					
						
							|  |  |  |                         if location not in self.eUsed: | 
					
						
							|  |  |  |                             self.eUsed += [location] | 
					
						
							|  |  |  |                             location = eReward[len(self.eUsed) - 1] | 
					
						
							|  |  |  |                         else: | 
					
						
							|  |  |  |                             continue | 
					
						
							|  |  |  |                     if (location in ctx.server_locations) and exception: | 
					
						
							|  |  |  |                         locs_to_send.add(location) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Check for set location flags. | 
					
						
							|  |  |  |             for byte_i, byte in enumerate(bytearray(flags)): | 
					
						
							|  |  |  |                 for j in range(8): | 
					
						
							|  |  |  |                     if j in self.checked_flags[byte_i]: | 
					
						
							|  |  |  |                         continue | 
					
						
							|  |  |  |                     and_value = 1 << j | 
					
						
							|  |  |  |                     if byte & and_value != 0: | 
					
						
							|  |  |  |                         flag_id = byte_i * 8 + (j + 1) | 
					
						
							|  |  |  |                         room, item = find_key(roomCount, flag_id) | 
					
						
							|  |  |  |                         pointer_arr = await bizhawk.read( | 
					
						
							|  |  |  |                             ctx.bizhawk_ctx, [(ROOM_ARRAY_POINTER + ((room - 1) * 4), 4, "ROM")] | 
					
						
							|  |  |  |                         ) | 
					
						
							|  |  |  |                         pointer = struct.unpack("<I", pointer_arr[0])[0] | 
					
						
							|  |  |  |                         pointer = pointer & 0xFFFFFF | 
					
						
							|  |  |  |                         offset = await bizhawk.read(ctx.bizhawk_ctx, [(pointer, 1, "ROM")]) | 
					
						
							|  |  |  |                         offset = offset[0][0] | 
					
						
							|  |  |  |                         if offset != 0: | 
					
						
							|  |  |  |                             offset = 2 | 
					
						
							|  |  |  |                         pointer += (item * 8) + 1 + offset | 
					
						
							|  |  |  |                         for key, value in beanstones.items(): | 
					
						
							|  |  |  |                             if pointer == value: | 
					
						
							|  |  |  |                                 pointer = key | 
					
						
							|  |  |  |                                 break | 
					
						
							|  |  |  |                         if pointer in ctx.server_locations: | 
					
						
							|  |  |  |                             self.checked_flags[byte_i] += [j] | 
					
						
							|  |  |  |                             locs_to_send.add(pointer) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if not ctx.finished_game and cackletta != 0 and current_room == 0x1C7: | 
					
						
							|  |  |  |                 await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if self.room != current_room: | 
					
						
							|  |  |  |                 self.room = current_room | 
					
						
							|  |  |  |                 await ctx.send_msgs( | 
					
						
							|  |  |  |                     [ | 
					
						
							|  |  |  |                         { | 
					
						
							|  |  |  |                             "cmd": "Set", | 
					
						
							|  |  |  |                             "key": f"mlss_room_{ctx.team}_{ctx.slot}", | 
					
						
							|  |  |  |                             "default": 0, | 
					
						
							|  |  |  |                             "want_reply": False, | 
					
						
							|  |  |  |                             "operations": [{"operation": "replace", "value": current_room}], | 
					
						
							|  |  |  |                         } | 
					
						
							|  |  |  |                     ] | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             # Send locations if there are any to send. | 
					
						
							|  |  |  |             if locs_to_send != self.local_checked_locations: | 
					
						
							|  |  |  |                 self.local_checked_locations = locs_to_send | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 if locs_to_send is not None: | 
					
						
							|  |  |  |                     await ctx.send_msgs([{"cmd": "LocationChecks", "locations": list(locs_to_send)}]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         except bizhawk.RequestFailedError: | 
					
						
							|  |  |  |             # Exit handler and return to main loop to reconnect. | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  |         except bizhawk.ConnectorError: | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def find_key(dictionary, target): | 
					
						
							|  |  |  |     leftover = target | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for key, value in dictionary.items(): | 
					
						
							|  |  |  |         if leftover > value: | 
					
						
							|  |  |  |             leftover -= value | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             return key, leftover | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def id_to_RAM(id_: int): | 
					
						
							|  |  |  |     code = id_ | 
					
						
							|  |  |  |     if 0x1C <= code <= 0x1F: | 
					
						
							|  |  |  |         code += 0xE | 
					
						
							|  |  |  |     if 0x20 <= code <= 0x26: | 
					
						
							|  |  |  |         code -= 0x4 | 
					
						
							|  |  |  |     return code |