 955a86803f
			
		
	
	955a86803f
	
	
	
		
			
			Co-authored-by: Nicholas Saylor <79181893+nicholassaylor@users.noreply.github.com> Co-authored-by: Exempt-Medic <60412657+Exempt-Medic@users.noreply.github.com> Co-authored-by: alchav <alchav@jalchavware.com>
		
			
				
	
	
		
			251 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			251 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import base64
 | |
| import logging
 | |
| 
 | |
| from NetUtils import ClientStatus
 | |
| from worlds._bizhawk.client import BizHawkClient
 | |
| from worlds._bizhawk import read, write, guarded_write
 | |
| 
 | |
| from .rom_addresses import rom_addresses
 | |
| 
 | |
| logger = logging.getLogger("Client")
 | |
| 
 | |
| BANK_EXCHANGE_RATE = 20000000000
 | |
| 
 | |
| overworld_music = (0x05, 0x06, 0x0D, 0x0E, 0x10, 0x12, 0x1B, 0x1C, 0x1E)
 | |
| 
 | |
| class MarioLand2Client(BizHawkClient):
 | |
|     system = ("GB", "SGB")
 | |
|     patch_suffix = ".apsml2"
 | |
|     game = "Super Mario Land 2"
 | |
| 
 | |
|     def __init__(self):
 | |
|         super().__init__()
 | |
|         self.locations_array = []
 | |
|         self.previous_level = None
 | |
| 
 | |
|     async def validate_rom(self, ctx):
 | |
|         game_name = await read(ctx.bizhawk_ctx, [(0x134, 10, "ROM")])
 | |
|         game_name = game_name[0].decode("ascii")
 | |
|         if game_name == "MARIOLAND2":
 | |
|             ctx.game = self.game
 | |
|             ctx.items_handling = 0b111
 | |
|             return True
 | |
|         return False
 | |
| 
 | |
|     async def set_auth(self, ctx):
 | |
|         auth_name = await read(ctx.bizhawk_ctx, [(0x77777, 21, "ROM")])
 | |
|         auth_name = base64.b64encode(auth_name[0]).decode()
 | |
|         ctx.auth = auth_name
 | |
| 
 | |
|     async def game_watcher(self, ctx):
 | |
|         from . import START_IDS
 | |
|         from .items import items
 | |
|         from .locations import locations, level_id_to_name, coins_coords, location_name_to_id
 | |
| 
 | |
|         (game_loaded_check, level_data, music, auto_scroll_levels, current_level,
 | |
|          midway_point, bcd_lives, num_items_received, coins, options) = \
 | |
|             await read(ctx.bizhawk_ctx, [(0x0046, 10, "CartRAM"), (0x0848, 42, "CartRAM"), (0x0469, 1, "CartRAM"),
 | |
|                                          (rom_addresses["Auto_Scroll_Levels_B"], 32, "ROM"),
 | |
|                                          (0x0269, 1, "CartRAM"), (0x02A0, 1, "CartRAM"), (0x022C, 1, "CartRAM"),
 | |
|                                          (0x00F0, 2, "CartRAM"), (0x0262, 2, "CartRAM"),
 | |
|                                          (rom_addresses["Coins_Required"], 8, "ROM")])
 | |
| 
 | |
|         coins_required = int.from_bytes(options[:2], "big")
 | |
|         difficulty_mode = options[2]
 | |
|         star_count = int.from_bytes(options[3:5], "big")
 | |
|         midway_bells = options[5]
 | |
|         energy_link = options[6]
 | |
|         coin_mode = options[7]
 | |
| 
 | |
|         current_level = int.from_bytes(current_level, "big")
 | |
|         auto_scroll_levels = list(auto_scroll_levels)
 | |
|         midway_point = int.from_bytes(midway_point, "big")
 | |
|         music = int.from_bytes(music, "big")
 | |
|         level_data = list(level_data)
 | |
|         lives = bcd_lives.hex()
 | |
|         num_items_received = int.from_bytes(num_items_received, "big")
 | |
|         if num_items_received == 0xFFFF:
 | |
|             num_items_received = 0
 | |
| 
 | |
|         items_received = [list(items.keys())[item.item - START_IDS] for item in ctx.items_received]
 | |
|         write_num_items_received = len(items_received).to_bytes(2, "big")
 | |
| 
 | |
|         level_progression = {
 | |
|             "Space Zone Progression",
 | |
|             "Tree Zone Progression",
 | |
|             "Macro Zone Progression",
 | |
|             "Pumpkin Zone Progression",
 | |
|             "Mario Zone Progression",
 | |
|             "Turtle Zone Progression",
 | |
|         }
 | |
|         for level_item in level_progression:
 | |
|             for _ in range(items_received.count(level_item + " x2")):
 | |
|                 items_received += ([level_item] * 2)
 | |
| 
 | |
|         if "Pipe Traversal" in items_received:
 | |
|             items_received += ["Pipe Traversal - Left", "Pipe Traversal - Right",
 | |
|                                "Pipe Traversal - Up", "Pipe Traversal - Down"]
 | |
| 
 | |
|         if coin_mode == 2 and items_received.count("Mario Coin Fragment") >= coins_required:
 | |
|             items_received.append("Mario Coin")
 | |
| 
 | |
|         if current_level == 255 and self.previous_level != 255:
 | |
|             if coin_mode < 2:
 | |
|                 logger.info(f"Golden Coins required: {coins_required}")
 | |
|             else:
 | |
|                 logger.info(f"Mario Coin Fragments required: {coins_required}. "
 | |
|                             f"You have {items_received.count('Mario Coin Fragment')}")
 | |
|         self.previous_level = current_level
 | |
| 
 | |
|         # There is no music in the title screen demos, this is how we guard against anything in the demos registering.
 | |
|         # There is also no music at the door to Mario's Castle, which is why the above is before this check.
 | |
|         if game_loaded_check != b'\x124Vx\xff\xff\xff\xff\xff\xff' or music == 0:
 | |
|             return
 | |
| 
 | |
|         locations_checked = []
 | |
|         if current_level in level_id_to_name:
 | |
|             level_name = level_id_to_name[current_level]
 | |
|             coin_tile_data = await read(ctx.bizhawk_ctx, [(0xB000 + ((coords[1] * 256) + coords[0]), 1, "System Bus")
 | |
|                                                           for coords in coins_coords[level_name]])
 | |
|             num_coins = len([tile[0] for tile in coin_tile_data if tile[0] in (0x7f, 0x60, 0x07)])
 | |
|             locations_checked = [location_name_to_id[f"{level_name} - {i} Coin{'s' if i > 1 else ''}"]
 | |
|                                  for i in range(1, num_coins + 1)]
 | |
| 
 | |
|         new_lives = int(lives)
 | |
|         energy_link_add = None
 | |
|         if energy_link:
 | |
|             if new_lives == 0:
 | |
|                 if (f"EnergyLink{ctx.team}" in ctx.stored_data
 | |
|                         and ctx.stored_data[f"EnergyLink{ctx.team}"]
 | |
|                         and ctx.stored_data[f"EnergyLink{ctx.team}"] >= BANK_EXCHANGE_RATE):
 | |
|                     new_lives = 1
 | |
|                     energy_link_add = -BANK_EXCHANGE_RATE
 | |
|             elif new_lives > 1:
 | |
|                 energy_link_add = BANK_EXCHANGE_RATE * (new_lives - 1)
 | |
|                 new_lives = 1
 | |
|         # Convert back to binary-coded-decimal
 | |
|         new_lives = int(str(new_lives), 16)
 | |
| 
 | |
|         new_coins = coins.hex()
 | |
|         new_coins = int(new_coins[2:] + new_coins[:2])
 | |
|         for item in items_received[num_items_received:]:
 | |
|             if item.endswith("Coins") or item == "1 Coin":
 | |
|                 new_coins += int(item.split(" ")[0])
 | |
|         # Limit to 999 and convert back to binary-coded-decimal
 | |
|         new_coins = int(str(min(new_coins, 999)), 16).to_bytes(2, "little")
 | |
| 
 | |
|         modified_level_data = level_data.copy()
 | |
|         for ID, (location, data) in enumerate(locations.items(), START_IDS):
 | |
|             if "clear_condition" in data:
 | |
|                 if items_received.count(data["clear_condition"][0]) >= data["clear_condition"][1]:
 | |
|                     modified_level_data[data["ram_index"]] |= (0x08 if data["type"] == "bell"
 | |
|                                                                else 0x01 if data["type"] == "secret" else 0x80)
 | |
| 
 | |
|             if data["type"] == "level" and level_data[data["ram_index"]] & 0x40:
 | |
|                 locations_checked.append(ID)
 | |
|             if data["type"] == "secret" and level_data[data["ram_index"]] & 0x02:
 | |
|                 locations_checked.append(ID)
 | |
|             elif data["type"] == "bell" and data["id"] == current_level and midway_point == 0xFF:
 | |
|                 locations_checked.append(ID)
 | |
| 
 | |
|         invincibility_length = int((832.0 / (star_count + 1))
 | |
|                                    * (items_received.count("Super Star Duration Increase") + 1))
 | |
| 
 | |
|         if "Easy Mode" in items_received:
 | |
|             difficulty_mode = 1
 | |
|         elif "Normal Mode" in items_received:
 | |
|             difficulty_mode = 0
 | |
| 
 | |
|         data_writes = [
 | |
|             (rom_addresses["Space_Physics"], [0x7e] if "Space Physics" in items_received else [0xaf], "ROM"),
 | |
|             (rom_addresses["Get_Hurt_To_Big_Mario"], [1] if "Mushroom" in items_received else [0], "ROM"),
 | |
|             (rom_addresses["Get_Mushroom_A"], [0xea, 0x16, 0xa2] if "Mushroom" in items_received else [0, 0, 0], "ROM"),
 | |
|             (rom_addresses["Get_Mushroom_B"], [0xea, 0x16, 0xa2] if "Mushroom" in items_received else [0, 0, 0], "ROM"),
 | |
|             (rom_addresses["Get_Mushroom_C"], [00] if "Mushroom" in items_received else [0xd8], "ROM"),
 | |
|             (rom_addresses["Get_Carrot_A"], [0xea, 0x16, 0xa2] if "Carrot" in items_received else [0, 0, 0], "ROM"),
 | |
|             (rom_addresses["Get_Carrot_B"], [0xea, 0x16, 0xa2] if "Carrot" in items_received else [0, 0, 0], "ROM"),
 | |
|             (rom_addresses["Get_Carrot_C"], [00] if "Carrot" in items_received else [0xc8], "ROM"),
 | |
|             (rom_addresses["Get_Fire_Flower_A"], [0xea, 0x16, 0xa2] if "Fire Flower" in items_received else [0, 0, 0], "ROM"),
 | |
|             (rom_addresses["Get_Fire_Flower_B"], [0xea, 0x16, 0xa2] if "Fire Flower" in items_received else [0, 0, 0], "ROM"),
 | |
|             (rom_addresses["Get_Fire_Flower_C"], [00] if "Fire Flower" in items_received else [0xc8], "ROM"),
 | |
|             (rom_addresses["Invincibility_Star_A"], [(invincibility_length >> 8) + 1], "ROM"),
 | |
|             (rom_addresses["Invincibility_Star_B"], [invincibility_length & 0xFF], "ROM"),
 | |
|             (rom_addresses["Enable_Bubble"], [0xcb, 0xd7] if "Hippo Bubble" in items_received else [0, 0], "ROM"),
 | |
|             (rom_addresses["Enable_Swim"], [0xcb, 0xcf] if "Water Physics" in items_received else [0, 0], "ROM"),
 | |
|             (rom_addresses["Pipe_Traversal_A"], [16] if "Pipe Traversal - Down" in items_received else [0], "ROM"),
 | |
|             (rom_addresses["Pipe_Traversal_B"], [32] if "Pipe Traversal - Up" in items_received else [10], "ROM"),
 | |
|             (rom_addresses["Pipe_Traversal_C"], [48] if "Pipe Traversal - Right" in items_received else [0], "ROM"),
 | |
|             (rom_addresses["Pipe_Traversal_D"], [64] if "Pipe Traversal - Left" in items_received else [0], "ROM"),
 | |
|             (rom_addresses["Pipe_Traversal_SFX_A"], [5] if "Pipe Traversal - Down" in items_received else [0], "ROM"),
 | |
|             (rom_addresses["Pipe_Traversal_SFX_B"], [5] if "Pipe Traversal - Up" in items_received else [0], "ROM"),
 | |
|             (rom_addresses["Pipe_Traversal_SFX_C"], [5] if "Pipe Traversal - Right" in items_received else [0], "ROM"),
 | |
|             (rom_addresses["Pipe_Traversal_SFX_D"], [5] if "Pipe Traversal - Left" in items_received else [0], "ROM"),
 | |
|             (0x022c, [new_lives], "CartRAM"),
 | |
|             (0x02E4, [difficulty_mode], "CartRAM"),
 | |
|             (0x0848, modified_level_data, "CartRAM"),
 | |
|             (0x0262, new_coins, "CartRAM"),
 | |
|         ]
 | |
| 
 | |
|         if items_received:
 | |
|             data_writes.append((0x00F0, write_num_items_received, "CartRAM"))
 | |
| 
 | |
|         if midway_point == 0xFF and (midway_bells or music in overworld_music):
 | |
|             # after registering the check for the midway bell, clear the value just for safety.
 | |
|             data_writes.append((0x02A0, [0], "CartRAM"))
 | |
| 
 | |
|         for i in range(32):
 | |
|             if auto_scroll_levels[i] == 3:
 | |
|                 if "Auto Scroll" in items_received or f"Auto Scroll - {level_id_to_name[i]}" in items_received:
 | |
|                     auto_scroll_levels[i] = 1
 | |
|                     if i == current_level:
 | |
|                         data_writes.append((0x02C8, [0x01], "CartRAM"))
 | |
|                 else:
 | |
|                     auto_scroll_levels[i] = 0
 | |
|             elif auto_scroll_levels[i] == 2:
 | |
|                 if ("Cancel Auto Scroll" in items_received
 | |
|                         or f"Cancel Auto Scroll - {level_id_to_name[i]}" in items_received):
 | |
|                     auto_scroll_levels[i] = 0
 | |
|                     if i == current_level:
 | |
|                         data_writes.append((0x02C8, [0x00], "CartRAM"))
 | |
|                 else:
 | |
|                     auto_scroll_levels[i] = 1
 | |
|         data_writes.append((rom_addresses["Auto_Scroll_Levels"], auto_scroll_levels, "ROM"))
 | |
| 
 | |
|         success = await guarded_write(ctx.bizhawk_ctx, data_writes, [(0x0848, level_data, "CartRAM"),
 | |
|                                                                      (0x022C, [int.from_bytes(bcd_lives, "big")],
 | |
|                                                                       "CartRAM"),
 | |
|                                                                      [0x0262, coins, "CartRAM"]])
 | |
| 
 | |
|         if success and energy_link_add is not None:
 | |
|             await ctx.send_msgs([{
 | |
|                 "cmd": "Set", "key": f"EnergyLink{ctx.team}", "operations":
 | |
|                     [{"operation": "add", "value": energy_link_add},
 | |
|                      {"operation": "max", "value": 0}],
 | |
|             }])
 | |
| 
 | |
|         if not ctx.server or not ctx.server.socket.open or ctx.server.socket.closed:
 | |
|             return
 | |
| 
 | |
|         if locations_checked and locations_checked != self.locations_array:
 | |
|             self.locations_array = locations_checked
 | |
|             await ctx.send_msgs([{"cmd": "LocationChecks", "locations": locations_checked}])
 | |
| 
 | |
|         if music == 0x18:
 | |
|             await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
 | |
|             ctx.finished_game = True
 | |
| 
 | |
|     def on_package(self, ctx, cmd: str, args: dict):
 | |
|         super().on_package(ctx, cmd, args)
 | |
|         if cmd == 'Connected':
 | |
|             if ctx.slot_data["energy_link"]:
 | |
|                 ctx.set_notify(f"EnergyLink{ctx.team}")
 | |
|                 if ctx.ui:
 | |
|                     ctx.ui.enable_energy_link()
 | |
|                     ctx.ui.energy_link_label.text = "Lives: Standby"
 | |
|         elif cmd == "SetReply" and args["key"].startswith("EnergyLink"):
 | |
|             if ctx.ui:
 | |
|                 ctx.ui.energy_link_label.text = f"Lives: {int(args['value'] / BANK_EXCHANGE_RATE)}"
 | |
|         elif cmd == "Retrieved":
 | |
|             if f"EnergyLink{ctx.team}" in args["keys"] and args['keys'][f'EnergyLink{ctx.team}'] and ctx.ui:
 | |
|                 ctx.ui.energy_link_label.text = f"Lives: {int(args['keys'][f'EnergyLink{ctx.team}'] / BANK_EXCHANGE_RATE)}"
 |