mirror of
https://github.com/MarioSpore/Grinch-AP.git
synced 2025-10-21 12:11:33 -06:00

Co-authored-by: Nicholas Saylor <79181893+nicholassaylor@users.noreply.github.com> Co-authored-by: Exempt-Medic <60412657+Exempt-Medic@users.noreply.github.com> Co-authored-by: alchav <alchav@jalchavware.com>
251 lines
13 KiB
Python
251 lines
13 KiB
Python
import base64
|
|
import logging
|
|
|
|
from NetUtils import ClientStatus
|
|
from worlds._bizhawk.client import BizHawkClient
|
|
from worlds._bizhawk import read, write, guarded_write
|
|
|
|
from .rom_addresses import rom_addresses
|
|
|
|
logger = logging.getLogger("Client")
|
|
|
|
BANK_EXCHANGE_RATE = 20000000000
|
|
|
|
overworld_music = (0x05, 0x06, 0x0D, 0x0E, 0x10, 0x12, 0x1B, 0x1C, 0x1E)
|
|
|
|
class MarioLand2Client(BizHawkClient):
|
|
system = ("GB", "SGB")
|
|
patch_suffix = ".apsml2"
|
|
game = "Super Mario Land 2"
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.locations_array = []
|
|
self.previous_level = None
|
|
|
|
async def validate_rom(self, ctx):
|
|
game_name = await read(ctx.bizhawk_ctx, [(0x134, 10, "ROM")])
|
|
game_name = game_name[0].decode("ascii")
|
|
if game_name == "MARIOLAND2":
|
|
ctx.game = self.game
|
|
ctx.items_handling = 0b111
|
|
return True
|
|
return False
|
|
|
|
async def set_auth(self, ctx):
|
|
auth_name = await read(ctx.bizhawk_ctx, [(0x77777, 21, "ROM")])
|
|
auth_name = base64.b64encode(auth_name[0]).decode()
|
|
ctx.auth = auth_name
|
|
|
|
async def game_watcher(self, ctx):
|
|
from . import START_IDS
|
|
from .items import items
|
|
from .locations import locations, level_id_to_name, coins_coords, location_name_to_id
|
|
|
|
(game_loaded_check, level_data, music, auto_scroll_levels, current_level,
|
|
midway_point, bcd_lives, num_items_received, coins, options) = \
|
|
await read(ctx.bizhawk_ctx, [(0x0046, 10, "CartRAM"), (0x0848, 42, "CartRAM"), (0x0469, 1, "CartRAM"),
|
|
(rom_addresses["Auto_Scroll_Levels_B"], 32, "ROM"),
|
|
(0x0269, 1, "CartRAM"), (0x02A0, 1, "CartRAM"), (0x022C, 1, "CartRAM"),
|
|
(0x00F0, 2, "CartRAM"), (0x0262, 2, "CartRAM"),
|
|
(rom_addresses["Coins_Required"], 8, "ROM")])
|
|
|
|
coins_required = int.from_bytes(options[:2], "big")
|
|
difficulty_mode = options[2]
|
|
star_count = int.from_bytes(options[3:5], "big")
|
|
midway_bells = options[5]
|
|
energy_link = options[6]
|
|
coin_mode = options[7]
|
|
|
|
current_level = int.from_bytes(current_level, "big")
|
|
auto_scroll_levels = list(auto_scroll_levels)
|
|
midway_point = int.from_bytes(midway_point, "big")
|
|
music = int.from_bytes(music, "big")
|
|
level_data = list(level_data)
|
|
lives = bcd_lives.hex()
|
|
num_items_received = int.from_bytes(num_items_received, "big")
|
|
if num_items_received == 0xFFFF:
|
|
num_items_received = 0
|
|
|
|
items_received = [list(items.keys())[item.item - START_IDS] for item in ctx.items_received]
|
|
write_num_items_received = len(items_received).to_bytes(2, "big")
|
|
|
|
level_progression = {
|
|
"Space Zone Progression",
|
|
"Tree Zone Progression",
|
|
"Macro Zone Progression",
|
|
"Pumpkin Zone Progression",
|
|
"Mario Zone Progression",
|
|
"Turtle Zone Progression",
|
|
}
|
|
for level_item in level_progression:
|
|
for _ in range(items_received.count(level_item + " x2")):
|
|
items_received += ([level_item] * 2)
|
|
|
|
if "Pipe Traversal" in items_received:
|
|
items_received += ["Pipe Traversal - Left", "Pipe Traversal - Right",
|
|
"Pipe Traversal - Up", "Pipe Traversal - Down"]
|
|
|
|
if coin_mode == 2 and items_received.count("Mario Coin Fragment") >= coins_required:
|
|
items_received.append("Mario Coin")
|
|
|
|
if current_level == 255 and self.previous_level != 255:
|
|
if coin_mode < 2:
|
|
logger.info(f"Golden Coins required: {coins_required}")
|
|
else:
|
|
logger.info(f"Mario Coin Fragments required: {coins_required}. "
|
|
f"You have {items_received.count('Mario Coin Fragment')}")
|
|
self.previous_level = current_level
|
|
|
|
# There is no music in the title screen demos, this is how we guard against anything in the demos registering.
|
|
# There is also no music at the door to Mario's Castle, which is why the above is before this check.
|
|
if game_loaded_check != b'\x124Vx\xff\xff\xff\xff\xff\xff' or music == 0:
|
|
return
|
|
|
|
locations_checked = []
|
|
if current_level in level_id_to_name:
|
|
level_name = level_id_to_name[current_level]
|
|
coin_tile_data = await read(ctx.bizhawk_ctx, [(0xB000 + ((coords[1] * 256) + coords[0]), 1, "System Bus")
|
|
for coords in coins_coords[level_name]])
|
|
num_coins = len([tile[0] for tile in coin_tile_data if tile[0] in (0x7f, 0x60, 0x07)])
|
|
locations_checked = [location_name_to_id[f"{level_name} - {i} Coin{'s' if i > 1 else ''}"]
|
|
for i in range(1, num_coins + 1)]
|
|
|
|
new_lives = int(lives)
|
|
energy_link_add = None
|
|
if energy_link:
|
|
if new_lives == 0:
|
|
if (f"EnergyLink{ctx.team}" in ctx.stored_data
|
|
and ctx.stored_data[f"EnergyLink{ctx.team}"]
|
|
and ctx.stored_data[f"EnergyLink{ctx.team}"] >= BANK_EXCHANGE_RATE):
|
|
new_lives = 1
|
|
energy_link_add = -BANK_EXCHANGE_RATE
|
|
elif new_lives > 1:
|
|
energy_link_add = BANK_EXCHANGE_RATE * (new_lives - 1)
|
|
new_lives = 1
|
|
# Convert back to binary-coded-decimal
|
|
new_lives = int(str(new_lives), 16)
|
|
|
|
new_coins = coins.hex()
|
|
new_coins = int(new_coins[2:] + new_coins[:2])
|
|
for item in items_received[num_items_received:]:
|
|
if item.endswith("Coins") or item == "1 Coin":
|
|
new_coins += int(item.split(" ")[0])
|
|
# Limit to 999 and convert back to binary-coded-decimal
|
|
new_coins = int(str(min(new_coins, 999)), 16).to_bytes(2, "little")
|
|
|
|
modified_level_data = level_data.copy()
|
|
for ID, (location, data) in enumerate(locations.items(), START_IDS):
|
|
if "clear_condition" in data:
|
|
if items_received.count(data["clear_condition"][0]) >= data["clear_condition"][1]:
|
|
modified_level_data[data["ram_index"]] |= (0x08 if data["type"] == "bell"
|
|
else 0x01 if data["type"] == "secret" else 0x80)
|
|
|
|
if data["type"] == "level" and level_data[data["ram_index"]] & 0x40:
|
|
locations_checked.append(ID)
|
|
if data["type"] == "secret" and level_data[data["ram_index"]] & 0x02:
|
|
locations_checked.append(ID)
|
|
elif data["type"] == "bell" and data["id"] == current_level and midway_point == 0xFF:
|
|
locations_checked.append(ID)
|
|
|
|
invincibility_length = int((832.0 / (star_count + 1))
|
|
* (items_received.count("Super Star Duration Increase") + 1))
|
|
|
|
if "Easy Mode" in items_received:
|
|
difficulty_mode = 1
|
|
elif "Normal Mode" in items_received:
|
|
difficulty_mode = 0
|
|
|
|
data_writes = [
|
|
(rom_addresses["Space_Physics"], [0x7e] if "Space Physics" in items_received else [0xaf], "ROM"),
|
|
(rom_addresses["Get_Hurt_To_Big_Mario"], [1] if "Mushroom" in items_received else [0], "ROM"),
|
|
(rom_addresses["Get_Mushroom_A"], [0xea, 0x16, 0xa2] if "Mushroom" in items_received else [0, 0, 0], "ROM"),
|
|
(rom_addresses["Get_Mushroom_B"], [0xea, 0x16, 0xa2] if "Mushroom" in items_received else [0, 0, 0], "ROM"),
|
|
(rom_addresses["Get_Mushroom_C"], [00] if "Mushroom" in items_received else [0xd8], "ROM"),
|
|
(rom_addresses["Get_Carrot_A"], [0xea, 0x16, 0xa2] if "Carrot" in items_received else [0, 0, 0], "ROM"),
|
|
(rom_addresses["Get_Carrot_B"], [0xea, 0x16, 0xa2] if "Carrot" in items_received else [0, 0, 0], "ROM"),
|
|
(rom_addresses["Get_Carrot_C"], [00] if "Carrot" in items_received else [0xc8], "ROM"),
|
|
(rom_addresses["Get_Fire_Flower_A"], [0xea, 0x16, 0xa2] if "Fire Flower" in items_received else [0, 0, 0], "ROM"),
|
|
(rom_addresses["Get_Fire_Flower_B"], [0xea, 0x16, 0xa2] if "Fire Flower" in items_received else [0, 0, 0], "ROM"),
|
|
(rom_addresses["Get_Fire_Flower_C"], [00] if "Fire Flower" in items_received else [0xc8], "ROM"),
|
|
(rom_addresses["Invincibility_Star_A"], [(invincibility_length >> 8) + 1], "ROM"),
|
|
(rom_addresses["Invincibility_Star_B"], [invincibility_length & 0xFF], "ROM"),
|
|
(rom_addresses["Enable_Bubble"], [0xcb, 0xd7] if "Hippo Bubble" in items_received else [0, 0], "ROM"),
|
|
(rom_addresses["Enable_Swim"], [0xcb, 0xcf] if "Water Physics" in items_received else [0, 0], "ROM"),
|
|
(rom_addresses["Pipe_Traversal_A"], [16] if "Pipe Traversal - Down" in items_received else [0], "ROM"),
|
|
(rom_addresses["Pipe_Traversal_B"], [32] if "Pipe Traversal - Up" in items_received else [10], "ROM"),
|
|
(rom_addresses["Pipe_Traversal_C"], [48] if "Pipe Traversal - Right" in items_received else [0], "ROM"),
|
|
(rom_addresses["Pipe_Traversal_D"], [64] if "Pipe Traversal - Left" in items_received else [0], "ROM"),
|
|
(rom_addresses["Pipe_Traversal_SFX_A"], [5] if "Pipe Traversal - Down" in items_received else [0], "ROM"),
|
|
(rom_addresses["Pipe_Traversal_SFX_B"], [5] if "Pipe Traversal - Up" in items_received else [0], "ROM"),
|
|
(rom_addresses["Pipe_Traversal_SFX_C"], [5] if "Pipe Traversal - Right" in items_received else [0], "ROM"),
|
|
(rom_addresses["Pipe_Traversal_SFX_D"], [5] if "Pipe Traversal - Left" in items_received else [0], "ROM"),
|
|
(0x022c, [new_lives], "CartRAM"),
|
|
(0x02E4, [difficulty_mode], "CartRAM"),
|
|
(0x0848, modified_level_data, "CartRAM"),
|
|
(0x0262, new_coins, "CartRAM"),
|
|
]
|
|
|
|
if items_received:
|
|
data_writes.append((0x00F0, write_num_items_received, "CartRAM"))
|
|
|
|
if midway_point == 0xFF and (midway_bells or music in overworld_music):
|
|
# after registering the check for the midway bell, clear the value just for safety.
|
|
data_writes.append((0x02A0, [0], "CartRAM"))
|
|
|
|
for i in range(32):
|
|
if auto_scroll_levels[i] == 3:
|
|
if "Auto Scroll" in items_received or f"Auto Scroll - {level_id_to_name[i]}" in items_received:
|
|
auto_scroll_levels[i] = 1
|
|
if i == current_level:
|
|
data_writes.append((0x02C8, [0x01], "CartRAM"))
|
|
else:
|
|
auto_scroll_levels[i] = 0
|
|
elif auto_scroll_levels[i] == 2:
|
|
if ("Cancel Auto Scroll" in items_received
|
|
or f"Cancel Auto Scroll - {level_id_to_name[i]}" in items_received):
|
|
auto_scroll_levels[i] = 0
|
|
if i == current_level:
|
|
data_writes.append((0x02C8, [0x00], "CartRAM"))
|
|
else:
|
|
auto_scroll_levels[i] = 1
|
|
data_writes.append((rom_addresses["Auto_Scroll_Levels"], auto_scroll_levels, "ROM"))
|
|
|
|
success = await guarded_write(ctx.bizhawk_ctx, data_writes, [(0x0848, level_data, "CartRAM"),
|
|
(0x022C, [int.from_bytes(bcd_lives, "big")],
|
|
"CartRAM"),
|
|
[0x0262, coins, "CartRAM"]])
|
|
|
|
if success and energy_link_add is not None:
|
|
await ctx.send_msgs([{
|
|
"cmd": "Set", "key": f"EnergyLink{ctx.team}", "operations":
|
|
[{"operation": "add", "value": energy_link_add},
|
|
{"operation": "max", "value": 0}],
|
|
}])
|
|
|
|
if not ctx.server or not ctx.server.socket.open or ctx.server.socket.closed:
|
|
return
|
|
|
|
if locations_checked and locations_checked != self.locations_array:
|
|
self.locations_array = locations_checked
|
|
await ctx.send_msgs([{"cmd": "LocationChecks", "locations": locations_checked}])
|
|
|
|
if music == 0x18:
|
|
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
|
|
ctx.finished_game = True
|
|
|
|
def on_package(self, ctx, cmd: str, args: dict):
|
|
super().on_package(ctx, cmd, args)
|
|
if cmd == 'Connected':
|
|
if ctx.slot_data["energy_link"]:
|
|
ctx.set_notify(f"EnergyLink{ctx.team}")
|
|
if ctx.ui:
|
|
ctx.ui.enable_energy_link()
|
|
ctx.ui.energy_link_label.text = "Lives: Standby"
|
|
elif cmd == "SetReply" and args["key"].startswith("EnergyLink"):
|
|
if ctx.ui:
|
|
ctx.ui.energy_link_label.text = f"Lives: {int(args['value'] / BANK_EXCHANGE_RATE)}"
|
|
elif cmd == "Retrieved":
|
|
if f"EnergyLink{ctx.team}" in args["keys"] and args['keys'][f'EnergyLink{ctx.team}'] and ctx.ui:
|
|
ctx.ui.energy_link_label.text = f"Lives: {int(args['keys'][f'EnergyLink{ctx.team}'] / BANK_EXCHANGE_RATE)}"
|