LttP: Implement DeathLink

This commit is contained in:
Fabian Dill
2021-11-01 19:37:47 +01:00
parent 8ff01ca979
commit 0e0cc0ad16
7 changed files with 99 additions and 50 deletions

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
import argparse
import atexit
exit_func = atexit.register(input, "Press enter to close.")
import threading
import time
@@ -12,13 +14,10 @@ import logging
import asyncio
from json import loads, dumps
from Utils import get_item_name_from_id
import ModuleUpdate
ModuleUpdate.update()
from Utils import get_item_name_from_id
import colorama
from NetUtils import *
@@ -35,7 +34,10 @@ snes_logger = logging.getLogger("SNES")
from MultiServer import mark_raw
class LttPCommandProcessor(ClientCommandProcessor):
ctx: Context
def _cmd_slow_mode(self, toggle: str = ""):
"""Toggle slow mode, which limits how fast you send / receive items."""
if toggle:
@@ -47,17 +49,18 @@ class LttPCommandProcessor(ClientCommandProcessor):
@mark_raw
def _cmd_snes(self, snes_options: str = "") -> bool:
"""Connect to a snes. Optionally include network address of a snes to connect to, otherwise show available devices; and a SNES device number if more than one SNES is detected"""
"""Connect to a snes. Optionally include network address of a snes to connect to,
otherwise show available devices; and a SNES device number if more than one SNES is detected"""
snes_address = self.ctx.snes_address
snes_device_number = -1
options = snes_options.split()
num_options = len(options)
if num_options > 0:
snes_address = options[0]
if num_options > 1:
try:
snes_device_number = int(options[1])
@@ -94,6 +97,7 @@ class Context(CommonContext):
self.snes_request_lock = asyncio.Lock()
self.snes_write_buffer = []
self.snes_connector_lock = threading.Lock()
self.death_state = False # for death link flop behaviour
self.awaiting_rom = False
self.rom = None
@@ -109,7 +113,7 @@ class Context(CommonContext):
raise Exception('Invalid ROM detected, '
'please verify that you have loaded the correct rom and reconnect your snes (/snes)')
async def server_auth(self, password_requested):
async def server_auth(self, password_requested: bool = False):
if password_requested and not self.password:
await super(Context, self).server_auth(password_requested)
if self.rom is None:
@@ -121,10 +125,17 @@ class Context(CommonContext):
self.auth = self.rom
auth = base64.b64encode(self.rom).decode()
await self.send_msgs([{"cmd": 'Connect',
'password': self.password, 'name': auth, 'version': Utils.version_tuple,
'tags': self.tags,
'uuid': Utils.get_unique_identifier(), 'game': "A Link to the Past"
}])
'password': self.password, 'name': auth, 'version': Utils.version_tuple,
'tags': self.tags,
'uuid': Utils.get_unique_identifier(), 'game': "A Link to the Past"
}])
def on_deathlink(self, data: dict):
snes_buffered_write(self, WRAM_START+0xF36D, bytes([0]))
snes_buffered_write(self, WRAM_START+0x0373, bytes([8]))
asyncio.create_task(snes_flush_writes(self))
self.death_state = True
snes_logger.info(f"Received DeathLink from {data['source']}")
def color_item(item_id: int, green: bool = False) -> str:
@@ -147,6 +158,7 @@ ROMNAME_SIZE = 0x15
INGAME_MODES = {0x07, 0x09, 0x0b}
ENDGAME_MODES = {0x19, 0x1a}
DEATH_MODES = {0x12}
SAVEDATA_START = WRAM_START + 0xF000
SAVEDATA_SIZE = 0x500
@@ -162,6 +174,8 @@ SCOUTREPLY_ITEM_ADDR = SAVEDATA_START + 0x4D9 # 1 byte
SCOUTREPLY_PLAYER_ADDR = SAVEDATA_START + 0x4DA # 1 byte
SHOP_ADDR = SAVEDATA_START + 0x302 # 2 bytes
DEATH_LINK_ACTIVE_ADDR = ROM_START + 0x18008D # 1 byte
location_shop_ids = set([info[0] for name, info in Shops.shop_table.items()])
location_table_uw = {"Blind's Hideout - Top": (0x11d, 0x10),
@@ -385,7 +399,7 @@ location_table_uw = {"Blind's Hideout - Top": (0x11d, 0x10),
'Ganons Tower - Pre-Moldorm Chest': (0x3d, 0x40),
'Ganons Tower - Validation Chest': (0x4d, 0x10)}
location_table_uw_id = {Regions.lookup_name_to_id[name] : data for name, data in location_table_uw.items()}
location_table_uw_id = {Regions.lookup_name_to_id[name]: data for name, data in location_table_uw.items()}
location_table_npc = {'Mushroom': 0x1000,
'King Zora': 0x2,
@@ -401,7 +415,7 @@ location_table_npc = {'Mushroom': 0x1000,
'Stumpy': 0x8,
'Bombos Tablet': 0x200}
location_table_npc_id = {Regions.lookup_name_to_id[name] : data for name, data in location_table_npc.items()}
location_table_npc_id = {Regions.lookup_name_to_id[name]: data for name, data in location_table_npc.items()}
location_table_ow = {'Flute Spot': 0x2a,
'Sunken Treasure': 0x3b,
@@ -416,14 +430,15 @@ location_table_ow = {'Flute Spot': 0x2a,
'Bumper Cave Ledge': 0x4a,
'Floating Island': 0x5}
location_table_ow_id = {Regions.lookup_name_to_id[name] : data for name, data in location_table_ow.items()}
location_table_ow_id = {Regions.lookup_name_to_id[name]: data for name, data in location_table_ow.items()}
location_table_misc = {'Bottle Merchant': (0x3c9, 0x2),
'Purple Chest': (0x3c9, 0x10),
"Link's Uncle": (0x3c6, 0x1),
'Hobo': (0x3c9, 0x1)}
location_table_misc_id = {Regions.lookup_name_to_id[name] : data for name, data in location_table_misc.items()}
location_table_misc_id = {Regions.lookup_name_to_id[name]: data for name, data in location_table_misc.items()}
class SNESState(enum.IntEnum):
SNES_DISCONNECTED = 0
@@ -446,10 +461,11 @@ def launch_sni(ctx: Context):
if os.path.isfile(sni_path):
snes_logger.info(f"Attempting to start {sni_path}")
import subprocess
if Utils.is_frozen(): # if it spawns a visible console, may as well populate it
if Utils.is_frozen(): # if it spawns a visible console, may as well populate it
subprocess.Popen(sni_path, cwd=os.path.dirname(sni_path))
else:
subprocess.Popen(sni_path, cwd=os.path.dirname(sni_path), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
subprocess.Popen(sni_path, cwd=os.path.dirname(sni_path), stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
else:
snes_logger.info(
f"Attempt to start SNI was aborted as path {sni_path} was not found, "
@@ -500,12 +516,11 @@ async def get_snes_devices(ctx: Context):
reply = loads(await socket.recv())
devices = reply['Results'] if 'Results' in reply and len(reply['Results']) > 0 else None
await socket.close()
return devices
async def snes_connect(ctx: Context, address, deviceIndex = -1):
async def snes_connect(ctx: Context, address, deviceIndex=-1):
global SNES_RECONNECT_DELAY
if ctx.snes_socket is not None and ctx.snes_state == SNESState.SNES_CONNECTED:
if ctx.rom:
@@ -534,7 +549,8 @@ async def snes_connect(ctx: Context, address, deviceIndex = -1):
device = devices[ctx.snes_attached_device[0]]
elif numDevices > 1:
if deviceIndex == -1:
snes_logger.info("Found " + str(numDevices) + " SNES devices; connect to one with /snes <address> <device number>:")
snes_logger.info(
"Found " + str(numDevices) + " SNES devices; connect to one with /snes <address> <device number>:")
for idx, availableDevice in enumerate(devices):
snes_logger.info(str(idx + 1) + ": " + availableDevice)
@@ -544,7 +560,7 @@ async def snes_connect(ctx: Context, address, deviceIndex = -1):
else:
device = devices[deviceIndex - 1]
if device is None:
await snes_disconnect(ctx)
return
@@ -676,6 +692,7 @@ async def snes_write(ctx: Context, write_list):
for address, data in write_list:
PutAddress_Request['Operands'] = [hex(address)[2:], hex(len(data))[2:]]
if ctx.snes_socket is not None:
snes_logger.info((PutAddress_Request, data))
await ctx.snes_socket.send(dumps(PutAddress_Request))
await ctx.snes_socket.send(data)
else:
@@ -712,7 +729,8 @@ async def track_locations(ctx: Context, roomid, roomdata):
new_locations.append(location_id)
ctx.locations_checked.add(location_id)
location = ctx.location_name_getter(location_id)
snes_logger.info(f'New Check: {location} ({len(ctx.locations_checked)}/{len(ctx.missing_locations) + len(ctx.checked_locations)})')
snes_logger.info(
f'New Check: {location} ({len(ctx.locations_checked)}/{len(ctx.missing_locations) + len(ctx.checked_locations)})')
try:
if roomid in location_shop_ids:
@@ -782,7 +800,6 @@ async def track_locations(ctx: Context, roomid, roomdata):
if misc_data[offset - 0x3c6] & mask != 0 and location_id not in ctx.locations_checked:
new_check(location_id)
if new_locations:
await ctx.send_msgs([{"cmd": 'LocationChecks', "locations": new_locations}])
@@ -804,6 +821,17 @@ async def game_watcher(ctx: Context):
continue
ctx.rom = rom
death_link = await snes_read(ctx, DEATH_LINK_ACTIVE_ADDR, 1)
if death_link:
death_link = bool(death_link[0])
old_tags = ctx.tags.copy()
if death_link:
ctx.tags.add("DeathLink")
else:
ctx.tags -= {"DeathLink"}
if old_tags != ctx.tags and ctx.server and not ctx.server.socket.closed:
snes_logger.info("Forcing reconnect to set DeathLink state.")
await ctx.disconnect() # set correct tags
if not ctx.prev_rom or ctx.prev_rom != ctx.rom:
ctx.locations_checked = set()
ctx.locations_scouted = set()
@@ -817,6 +845,14 @@ async def game_watcher(ctx: Context):
await ctx.disconnect()
gamemode = await snes_read(ctx, WRAM_START + 0x10, 1)
if "DeathLink" in ctx.tags and gamemode and ctx.last_death_link + 1 < time.time():
snes_logger.info((ctx.last_death_link + 1 < time.time(), ctx.last_death_link, time.time()))
if gamemode[0] in DEATH_MODES:
if not ctx.death_state: # new death
await ctx.send_death()
ctx.death_state = True
else:
ctx.death_state = False # reset death state, so next death can trigger
gameend = await snes_read(ctx, SAVEDATA_START + 0x443, 1)
game_timer = await snes_read(ctx, SAVEDATA_START + 0x42E, 4)
if gamemode is None or gameend is None or game_timer is None or \
@@ -891,6 +927,7 @@ async def run_game(romfile):
subprocess.Popen([auto_start, romfile],
stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
async def main():
multiprocessing.freeze_support()
parser = argparse.ArgumentParser()