251 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			251 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | import base64 | ||
|  | import logging | ||
|  | 
 | ||
|  | from NetUtils import ClientStatus | ||
|  | from worlds._bizhawk.client import BizHawkClient | ||
|  | from worlds._bizhawk import read, write, guarded_write | ||
|  | 
 | ||
|  | from .rom_addresses import rom_addresses | ||
|  | 
 | ||
|  | logger = logging.getLogger("Client") | ||
|  | 
 | ||
|  | BANK_EXCHANGE_RATE = 20000000000 | ||
|  | 
 | ||
|  | overworld_music = (0x05, 0x06, 0x0D, 0x0E, 0x10, 0x12, 0x1B, 0x1C, 0x1E) | ||
|  | 
 | ||
|  | class MarioLand2Client(BizHawkClient): | ||
|  |     system = ("GB", "SGB") | ||
|  |     patch_suffix = ".apsml2" | ||
|  |     game = "Super Mario Land 2" | ||
|  | 
 | ||
|  |     def __init__(self): | ||
|  |         super().__init__() | ||
|  |         self.locations_array = [] | ||
|  |         self.previous_level = None | ||
|  | 
 | ||
|  |     async def validate_rom(self, ctx): | ||
|  |         game_name = await read(ctx.bizhawk_ctx, [(0x134, 10, "ROM")]) | ||
|  |         game_name = game_name[0].decode("ascii") | ||
|  |         if game_name == "MARIOLAND2": | ||
|  |             ctx.game = self.game | ||
|  |             ctx.items_handling = 0b111 | ||
|  |             return True | ||
|  |         return False | ||
|  | 
 | ||
|  |     async def set_auth(self, ctx): | ||
|  |         auth_name = await read(ctx.bizhawk_ctx, [(0x77777, 21, "ROM")]) | ||
|  |         auth_name = base64.b64encode(auth_name[0]).decode() | ||
|  |         ctx.auth = auth_name | ||
|  | 
 | ||
|  |     async def game_watcher(self, ctx): | ||
|  |         from . import START_IDS | ||
|  |         from .items import items | ||
|  |         from .locations import locations, level_id_to_name, coins_coords, location_name_to_id | ||
|  | 
 | ||
|  |         (game_loaded_check, level_data, music, auto_scroll_levels, current_level, | ||
|  |          midway_point, bcd_lives, num_items_received, coins, options) = \ | ||
|  |             await read(ctx.bizhawk_ctx, [(0x0046, 10, "CartRAM"), (0x0848, 42, "CartRAM"), (0x0469, 1, "CartRAM"), | ||
|  |                                          (rom_addresses["Auto_Scroll_Levels_B"], 32, "ROM"), | ||
|  |                                          (0x0269, 1, "CartRAM"), (0x02A0, 1, "CartRAM"), (0x022C, 1, "CartRAM"), | ||
|  |                                          (0x00F0, 2, "CartRAM"), (0x0262, 2, "CartRAM"), | ||
|  |                                          (rom_addresses["Coins_Required"], 8, "ROM")]) | ||
|  | 
 | ||
|  |         coins_required = int.from_bytes(options[:2], "big") | ||
|  |         difficulty_mode = options[2] | ||
|  |         star_count = int.from_bytes(options[3:5], "big") | ||
|  |         midway_bells = options[5] | ||
|  |         energy_link = options[6] | ||
|  |         coin_mode = options[7] | ||
|  | 
 | ||
|  |         current_level = int.from_bytes(current_level, "big") | ||
|  |         auto_scroll_levels = list(auto_scroll_levels) | ||
|  |         midway_point = int.from_bytes(midway_point, "big") | ||
|  |         music = int.from_bytes(music, "big") | ||
|  |         level_data = list(level_data) | ||
|  |         lives = bcd_lives.hex() | ||
|  |         num_items_received = int.from_bytes(num_items_received, "big") | ||
|  |         if num_items_received == 0xFFFF: | ||
|  |             num_items_received = 0 | ||
|  | 
 | ||
|  |         items_received = [list(items.keys())[item.item - START_IDS] for item in ctx.items_received] | ||
|  |         write_num_items_received = len(items_received).to_bytes(2, "big") | ||
|  | 
 | ||
|  |         level_progression = { | ||
|  |             "Space Zone Progression", | ||
|  |             "Tree Zone Progression", | ||
|  |             "Macro Zone Progression", | ||
|  |             "Pumpkin Zone Progression", | ||
|  |             "Mario Zone Progression", | ||
|  |             "Turtle Zone Progression", | ||
|  |         } | ||
|  |         for level_item in level_progression: | ||
|  |             for _ in range(items_received.count(level_item + " x2")): | ||
|  |                 items_received += ([level_item] * 2) | ||
|  | 
 | ||
|  |         if "Pipe Traversal" in items_received: | ||
|  |             items_received += ["Pipe Traversal - Left", "Pipe Traversal - Right", | ||
|  |                                "Pipe Traversal - Up", "Pipe Traversal - Down"] | ||
|  | 
 | ||
|  |         if coin_mode == 2 and items_received.count("Mario Coin Fragment") >= coins_required: | ||
|  |             items_received.append("Mario Coin") | ||
|  | 
 | ||
|  |         if current_level == 255 and self.previous_level != 255: | ||
|  |             if coin_mode < 2: | ||
|  |                 logger.info(f"Golden Coins required: {coins_required}") | ||
|  |             else: | ||
|  |                 logger.info(f"Mario Coin Fragments required: {coins_required}. " | ||
|  |                             f"You have {items_received.count('Mario Coin Fragment')}") | ||
|  |         self.previous_level = current_level | ||
|  | 
 | ||
|  |         # There is no music in the title screen demos, this is how we guard against anything in the demos registering. | ||
|  |         # There is also no music at the door to Mario's Castle, which is why the above is before this check. | ||
|  |         if game_loaded_check != b'\x124Vx\xff\xff\xff\xff\xff\xff' or music == 0: | ||
|  |             return | ||
|  | 
 | ||
|  |         locations_checked = [] | ||
|  |         if current_level in level_id_to_name: | ||
|  |             level_name = level_id_to_name[current_level] | ||
|  |             coin_tile_data = await read(ctx.bizhawk_ctx, [(0xB000 + ((coords[1] * 256) + coords[0]), 1, "System Bus") | ||
|  |                                                           for coords in coins_coords[level_name]]) | ||
|  |             num_coins = len([tile[0] for tile in coin_tile_data if tile[0] in (0x7f, 0x60, 0x07)]) | ||
|  |             locations_checked = [location_name_to_id[f"{level_name} - {i} Coin{'s' if i > 1 else ''}"] | ||
|  |                                  for i in range(1, num_coins + 1)] | ||
|  | 
 | ||
|  |         new_lives = int(lives) | ||
|  |         energy_link_add = None | ||
|  |         if energy_link: | ||
|  |             if new_lives == 0: | ||
|  |                 if (f"EnergyLink{ctx.team}" in ctx.stored_data | ||
|  |                         and ctx.stored_data[f"EnergyLink{ctx.team}"] | ||
|  |                         and ctx.stored_data[f"EnergyLink{ctx.team}"] >= BANK_EXCHANGE_RATE): | ||
|  |                     new_lives = 1 | ||
|  |                     energy_link_add = -BANK_EXCHANGE_RATE | ||
|  |             elif new_lives > 1: | ||
|  |                 energy_link_add = BANK_EXCHANGE_RATE * (new_lives - 1) | ||
|  |                 new_lives = 1 | ||
|  |         # Convert back to binary-coded-decimal | ||
|  |         new_lives = int(str(new_lives), 16) | ||
|  | 
 | ||
|  |         new_coins = coins.hex() | ||
|  |         new_coins = int(new_coins[2:] + new_coins[:2]) | ||
|  |         for item in items_received[num_items_received:]: | ||
|  |             if item.endswith("Coins") or item == "1 Coin": | ||
|  |                 new_coins += int(item.split(" ")[0]) | ||
|  |         # Limit to 999 and convert back to binary-coded-decimal | ||
|  |         new_coins = int(str(min(new_coins, 999)), 16).to_bytes(2, "little") | ||
|  | 
 | ||
|  |         modified_level_data = level_data.copy() | ||
|  |         for ID, (location, data) in enumerate(locations.items(), START_IDS): | ||
|  |             if "clear_condition" in data: | ||
|  |                 if items_received.count(data["clear_condition"][0]) >= data["clear_condition"][1]: | ||
|  |                     modified_level_data[data["ram_index"]] |= (0x08 if data["type"] == "bell" | ||
|  |                                                                else 0x01 if data["type"] == "secret" else 0x80) | ||
|  | 
 | ||
|  |             if data["type"] == "level" and level_data[data["ram_index"]] & 0x40: | ||
|  |                 locations_checked.append(ID) | ||
|  |             if data["type"] == "secret" and level_data[data["ram_index"]] & 0x02: | ||
|  |                 locations_checked.append(ID) | ||
|  |             elif data["type"] == "bell" and data["id"] == current_level and midway_point == 0xFF: | ||
|  |                 locations_checked.append(ID) | ||
|  | 
 | ||
|  |         invincibility_length = int((832.0 / (star_count + 1)) | ||
|  |                                    * (items_received.count("Super Star Duration Increase") + 1)) | ||
|  | 
 | ||
|  |         if "Easy Mode" in items_received: | ||
|  |             difficulty_mode = 1 | ||
|  |         elif "Normal Mode" in items_received: | ||
|  |             difficulty_mode = 0 | ||
|  | 
 | ||
|  |         data_writes = [ | ||
|  |             (rom_addresses["Space_Physics"], [0x7e] if "Space Physics" in items_received else [0xaf], "ROM"), | ||
|  |             (rom_addresses["Get_Hurt_To_Big_Mario"], [1] if "Mushroom" in items_received else [0], "ROM"), | ||
|  |             (rom_addresses["Get_Mushroom_A"], [0xea, 0x16, 0xa2] if "Mushroom" in items_received else [0, 0, 0], "ROM"), | ||
|  |             (rom_addresses["Get_Mushroom_B"], [0xea, 0x16, 0xa2] if "Mushroom" in items_received else [0, 0, 0], "ROM"), | ||
|  |             (rom_addresses["Get_Mushroom_C"], [00] if "Mushroom" in items_received else [0xd8], "ROM"), | ||
|  |             (rom_addresses["Get_Carrot_A"], [0xea, 0x16, 0xa2] if "Carrot" in items_received else [0, 0, 0], "ROM"), | ||
|  |             (rom_addresses["Get_Carrot_B"], [0xea, 0x16, 0xa2] if "Carrot" in items_received else [0, 0, 0], "ROM"), | ||
|  |             (rom_addresses["Get_Carrot_C"], [00] if "Carrot" in items_received else [0xc8], "ROM"), | ||
|  |             (rom_addresses["Get_Fire_Flower_A"], [0xea, 0x16, 0xa2] if "Fire Flower" in items_received else [0, 0, 0], "ROM"), | ||
|  |             (rom_addresses["Get_Fire_Flower_B"], [0xea, 0x16, 0xa2] if "Fire Flower" in items_received else [0, 0, 0], "ROM"), | ||
|  |             (rom_addresses["Get_Fire_Flower_C"], [00] if "Fire Flower" in items_received else [0xc8], "ROM"), | ||
|  |             (rom_addresses["Invincibility_Star_A"], [(invincibility_length >> 8) + 1], "ROM"), | ||
|  |             (rom_addresses["Invincibility_Star_B"], [invincibility_length & 0xFF], "ROM"), | ||
|  |             (rom_addresses["Enable_Bubble"], [0xcb, 0xd7] if "Hippo Bubble" in items_received else [0, 0], "ROM"), | ||
|  |             (rom_addresses["Enable_Swim"], [0xcb, 0xcf] if "Water Physics" in items_received else [0, 0], "ROM"), | ||
|  |             (rom_addresses["Pipe_Traversal_A"], [16] if "Pipe Traversal - Down" in items_received else [0], "ROM"), | ||
|  |             (rom_addresses["Pipe_Traversal_B"], [32] if "Pipe Traversal - Up" in items_received else [10], "ROM"), | ||
|  |             (rom_addresses["Pipe_Traversal_C"], [48] if "Pipe Traversal - Right" in items_received else [0], "ROM"), | ||
|  |             (rom_addresses["Pipe_Traversal_D"], [64] if "Pipe Traversal - Left" in items_received else [0], "ROM"), | ||
|  |             (rom_addresses["Pipe_Traversal_SFX_A"], [5] if "Pipe Traversal - Down" in items_received else [0], "ROM"), | ||
|  |             (rom_addresses["Pipe_Traversal_SFX_B"], [5] if "Pipe Traversal - Up" in items_received else [0], "ROM"), | ||
|  |             (rom_addresses["Pipe_Traversal_SFX_C"], [5] if "Pipe Traversal - Right" in items_received else [0], "ROM"), | ||
|  |             (rom_addresses["Pipe_Traversal_SFX_D"], [5] if "Pipe Traversal - Left" in items_received else [0], "ROM"), | ||
|  |             (0x022c, [new_lives], "CartRAM"), | ||
|  |             (0x02E4, [difficulty_mode], "CartRAM"), | ||
|  |             (0x0848, modified_level_data, "CartRAM"), | ||
|  |             (0x0262, new_coins, "CartRAM"), | ||
|  |         ] | ||
|  | 
 | ||
|  |         if items_received: | ||
|  |             data_writes.append((0x00F0, write_num_items_received, "CartRAM")) | ||
|  | 
 | ||
|  |         if midway_point == 0xFF and (midway_bells or music in overworld_music): | ||
|  |             # after registering the check for the midway bell, clear the value just for safety. | ||
|  |             data_writes.append((0x02A0, [0], "CartRAM")) | ||
|  | 
 | ||
|  |         for i in range(32): | ||
|  |             if auto_scroll_levels[i] == 3: | ||
|  |                 if "Auto Scroll" in items_received or f"Auto Scroll - {level_id_to_name[i]}" in items_received: | ||
|  |                     auto_scroll_levels[i] = 1 | ||
|  |                     if i == current_level: | ||
|  |                         data_writes.append((0x02C8, [0x01], "CartRAM")) | ||
|  |                 else: | ||
|  |                     auto_scroll_levels[i] = 0 | ||
|  |             elif auto_scroll_levels[i] == 2: | ||
|  |                 if ("Cancel Auto Scroll" in items_received | ||
|  |                         or f"Cancel Auto Scroll - {level_id_to_name[i]}" in items_received): | ||
|  |                     auto_scroll_levels[i] = 0 | ||
|  |                     if i == current_level: | ||
|  |                         data_writes.append((0x02C8, [0x00], "CartRAM")) | ||
|  |                 else: | ||
|  |                     auto_scroll_levels[i] = 1 | ||
|  |         data_writes.append((rom_addresses["Auto_Scroll_Levels"], auto_scroll_levels, "ROM")) | ||
|  | 
 | ||
|  |         success = await guarded_write(ctx.bizhawk_ctx, data_writes, [(0x0848, level_data, "CartRAM"), | ||
|  |                                                                      (0x022C, [int.from_bytes(bcd_lives, "big")], | ||
|  |                                                                       "CartRAM"), | ||
|  |                                                                      [0x0262, coins, "CartRAM"]]) | ||
|  | 
 | ||
|  |         if success and energy_link_add is not None: | ||
|  |             await ctx.send_msgs([{ | ||
|  |                 "cmd": "Set", "key": f"EnergyLink{ctx.team}", "operations": | ||
|  |                     [{"operation": "add", "value": energy_link_add}, | ||
|  |                      {"operation": "max", "value": 0}], | ||
|  |             }]) | ||
|  | 
 | ||
|  |         if not ctx.server or not ctx.server.socket.open or ctx.server.socket.closed: | ||
|  |             return | ||
|  | 
 | ||
|  |         if locations_checked and locations_checked != self.locations_array: | ||
|  |             self.locations_array = locations_checked | ||
|  |             await ctx.send_msgs([{"cmd": "LocationChecks", "locations": locations_checked}]) | ||
|  | 
 | ||
|  |         if music == 0x18: | ||
|  |             await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}]) | ||
|  |             ctx.finished_game = True | ||
|  | 
 | ||
|  |     def on_package(self, ctx, cmd: str, args: dict): | ||
|  |         super().on_package(ctx, cmd, args) | ||
|  |         if cmd == 'Connected': | ||
|  |             if ctx.slot_data["energy_link"]: | ||
|  |                 ctx.set_notify(f"EnergyLink{ctx.team}") | ||
|  |                 if ctx.ui: | ||
|  |                     ctx.ui.enable_energy_link() | ||
|  |                     ctx.ui.energy_link_label.text = "Lives: Standby" | ||
|  |         elif cmd == "SetReply" and args["key"].startswith("EnergyLink"): | ||
|  |             if ctx.ui: | ||
|  |                 ctx.ui.energy_link_label.text = f"Lives: {int(args['value'] / BANK_EXCHANGE_RATE)}" | ||
|  |         elif cmd == "Retrieved": | ||
|  |             if f"EnergyLink{ctx.team}" in args["keys"] and args['keys'][f'EnergyLink{ctx.team}'] and ctx.ui: | ||
|  |                 ctx.ui.energy_link_label.text = f"Lives: {int(args['keys'][f'EnergyLink{ctx.team}'] / BANK_EXCHANGE_RATE)}" |