mirror of
https://github.com/MarioSpore/Grinch-AP.git
synced 2025-10-21 12:11:33 -06:00

* Expand and validate the RAM cache * Part way through location improvement * Fixed location tracking * Preliminary entrance tracking support * Actually send entrance messages * Store found entrances on the server * Bit of cleanup * Added rupee count, items linked to checks * Send Magpie a handshAck * Got my own version wrong * Remove the Beta name * Only send slot_data if there's something in it * Ask the server for entrance updates * Small fix to stabilize Link's location when changing rooms * Oops, server storage is shared between worlds * Deal with null responses from the server * Added UNUSED_KEY item
267 lines
11 KiB
Python
267 lines
11 KiB
Python
import json
|
|
import typing
|
|
from websockets import WebSocketServerProtocol
|
|
|
|
from . import TrackerConsts as Consts
|
|
from .TrackerConsts import EntranceCoord
|
|
from .LADXR.entranceInfo import ENTRANCE_INFO
|
|
|
|
class Entrance:
|
|
outdoor_room: int
|
|
indoor_map: int
|
|
indoor_address: int
|
|
name: str
|
|
other_side_name: str = None
|
|
changed: bool = False
|
|
known_to_server: bool = False
|
|
|
|
def __init__(self, outdoor: int, indoor: int, name: str, indoor_address: int=None):
|
|
self.outdoor_room = outdoor
|
|
self.indoor_map = indoor
|
|
self.indoor_address = indoor_address
|
|
self.name = name
|
|
|
|
def map(self, other_side: str, known_to_server: bool = False):
|
|
if other_side != self.other_side_name:
|
|
self.changed = True
|
|
self.known_to_server = known_to_server
|
|
|
|
self.other_side_name = other_side
|
|
|
|
class GpsTracker:
|
|
room: int = None
|
|
last_room: int = None
|
|
last_different_room: int = None
|
|
room_same_for: int = 0
|
|
room_changed: bool = False
|
|
screen_x: int = 0
|
|
screen_y: int = 0
|
|
spawn_x: int = 0
|
|
spawn_y: int = 0
|
|
indoors: int = None
|
|
indoors_changed: bool = False
|
|
spawn_map: int = None
|
|
spawn_room: int = None
|
|
spawn_changed: bool = False
|
|
spawn_same_for: int = 0
|
|
entrance_mapping: typing.Dict[str, str] = None
|
|
entrances_by_name: typing.Dict[str, Entrance] = {}
|
|
needs_found_entrances: bool = False
|
|
needs_slot_data: bool = True
|
|
|
|
def __init__(self, gameboy) -> None:
|
|
self.gameboy = gameboy
|
|
|
|
self.gameboy.set_location_range(
|
|
Consts.link_motion_state,
|
|
Consts.transition_sequence - Consts.link_motion_state + 1,
|
|
[Consts.transition_state]
|
|
)
|
|
|
|
async def read_byte(self, b: int):
|
|
return (await self.gameboy.read_memory_cache([b]))[b]
|
|
|
|
def load_slot_data(self, slot_data: typing.Dict[str, typing.Any]):
|
|
if 'entrance_mapping' not in slot_data:
|
|
return
|
|
|
|
# We need to know how entrances were mapped at generation before we can autotrack them
|
|
self.entrance_mapping = {}
|
|
|
|
# Convert to upstream's newer format
|
|
for outside, inside in slot_data['entrance_mapping'].items():
|
|
new_inside = f"{inside}:inside"
|
|
self.entrance_mapping[outside] = new_inside
|
|
self.entrance_mapping[new_inside] = outside
|
|
|
|
self.entrances_by_name = {}
|
|
|
|
for name, info in ENTRANCE_INFO.items():
|
|
alternate_address = (
|
|
Consts.entrance_address_overrides[info.target]
|
|
if info.target in Consts.entrance_address_overrides
|
|
else None
|
|
)
|
|
|
|
entrance = Entrance(info.room, info.target, name, alternate_address)
|
|
self.entrances_by_name[name] = entrance
|
|
|
|
inside_entrance = Entrance(info.target, info.room, f"{name}:inside", alternate_address)
|
|
self.entrances_by_name[f"{name}:inside"] = inside_entrance
|
|
|
|
self.needs_slot_data = False
|
|
self.needs_found_entrances = True
|
|
|
|
async def read_location(self):
|
|
# We need to wait for screen transitions to finish
|
|
transition_state = await self.read_byte(Consts.transition_state)
|
|
transition_target_x = await self.read_byte(Consts.transition_target_x)
|
|
transition_target_y = await self.read_byte(Consts.transition_target_y)
|
|
transition_scroll_x = await self.read_byte(Consts.transition_scroll_x)
|
|
transition_scroll_y = await self.read_byte(Consts.transition_scroll_y)
|
|
transition_sequence = await self.read_byte(Consts.transition_sequence)
|
|
motion_state = await self.read_byte(Consts.link_motion_state)
|
|
if (transition_state != 0
|
|
or transition_target_x != transition_scroll_x
|
|
or transition_target_y != transition_scroll_y
|
|
or transition_sequence != 0x04):
|
|
return
|
|
|
|
indoors = await self.read_byte(Consts.indoor_flag)
|
|
|
|
if indoors != self.indoors and self.indoors != None:
|
|
self.indoors_changed = True
|
|
|
|
self.indoors = indoors
|
|
|
|
# We use the spawn point to know which entrance was most recently entered
|
|
spawn_map = await self.read_byte(Consts.spawn_map)
|
|
map_digit = Consts.map_map[spawn_map] << 8 if self.spawn_map else 0
|
|
spawn_room = await self.read_byte(Consts.spawn_room) + map_digit
|
|
spawn_x = await self.read_byte(Consts.spawn_x)
|
|
spawn_y = await self.read_byte(Consts.spawn_y)
|
|
|
|
# The spawn point needs to be settled before we can trust location data
|
|
if ((spawn_room != self.spawn_room and self.spawn_room != None)
|
|
or (spawn_map != self.spawn_map and self.spawn_map != None)
|
|
or (spawn_x != self.spawn_x and self.spawn_x != None)
|
|
or (spawn_y != self.spawn_y and self.spawn_y != None)):
|
|
self.spawn_changed = True
|
|
self.spawn_same_for = 0
|
|
else:
|
|
self.spawn_same_for += 1
|
|
|
|
self.spawn_map = spawn_map
|
|
self.spawn_room = spawn_room
|
|
self.spawn_x = spawn_x
|
|
self.spawn_y = spawn_y
|
|
|
|
# Spawn point is preferred, but doesn't work for the sidescroller entrances
|
|
# Those can be addressed by keeping track of which room we're in
|
|
# Also used to validate that we came from the right room for what the spawn point is mapped to
|
|
map_id = await self.read_byte(Consts.map_id)
|
|
if map_id not in Consts.map_map:
|
|
print(f'Unknown map ID {hex(map_id)}')
|
|
return
|
|
|
|
map_digit = Consts.map_map[map_id] << 8 if indoors else 0
|
|
self.last_room = self.room
|
|
self.room = await self.read_byte(Consts.room) + map_digit
|
|
|
|
# Again, the room needs to settle before we can trust location data
|
|
if self.last_room != self.room:
|
|
self.room_same_for = 0
|
|
self.room_changed = True
|
|
self.last_different_room = self.last_room
|
|
else:
|
|
self.room_same_for += 1
|
|
|
|
# Only update Link's location when he's not in the air to avoid weirdness
|
|
if motion_state in [0, 1]:
|
|
coords = await self.read_byte(Consts.screen_coord)
|
|
self.screen_x = coords & 0x0F
|
|
self.screen_y = (coords & 0xF0) >> 4
|
|
|
|
async def read_entrances(self):
|
|
if not self.last_different_room or not self.entrance_mapping:
|
|
return
|
|
|
|
if self.spawn_changed and self.spawn_same_for > 0 and self.room_same_for > 0:
|
|
# Use the spawn location, last room, and entrance mapping at generation to map the right entrance
|
|
# A bit overkill for simple ER, but necessary for upstream's advanced ER
|
|
spawn_coord = EntranceCoord(None, self.spawn_room, self.spawn_x, self.spawn_y)
|
|
if str(spawn_coord) in Consts.entrance_lookup:
|
|
valid_sources = {x.name for x in Consts.entrance_coords if x.room == self.last_different_room}
|
|
dest_entrance = Consts.entrance_lookup[str(spawn_coord)].name
|
|
source_entrance = [
|
|
x for x in self.entrance_mapping
|
|
if self.entrance_mapping[x] == dest_entrance and x in valid_sources
|
|
]
|
|
|
|
if source_entrance:
|
|
self.entrances_by_name[source_entrance[0]].map(dest_entrance)
|
|
|
|
self.spawn_changed = False
|
|
elif self.room_changed and self.room_same_for > 0:
|
|
# Check for the stupid sidescroller rooms that don't set your spawn point
|
|
if self.last_different_room in Consts.sidescroller_rooms:
|
|
source_entrance = Consts.sidescroller_rooms[self.last_different_room]
|
|
if source_entrance in self.entrance_mapping:
|
|
dest_entrance = self.entrance_mapping[source_entrance]
|
|
|
|
expected_room = self.entrances_by_name[dest_entrance].outdoor_room
|
|
if dest_entrance.endswith(":indoor"):
|
|
expected_room = self.entrances_by_name[dest_entrance].indoor_map
|
|
|
|
if expected_room == self.room:
|
|
self.entrances_by_name[source_entrance].map(dest_entrance)
|
|
|
|
if self.room in Consts.sidescroller_rooms:
|
|
valid_sources = {x.name for x in Consts.entrance_coords if x.room == self.last_different_room}
|
|
dest_entrance = Consts.sidescroller_rooms[self.room]
|
|
source_entrance = [
|
|
x for x in self.entrance_mapping
|
|
if self.entrance_mapping[x] == dest_entrance and x in valid_sources
|
|
]
|
|
|
|
if source_entrance:
|
|
self.entrances_by_name[source_entrance[0]].map(dest_entrance)
|
|
|
|
self.room_changed = False
|
|
|
|
last_location_message = {}
|
|
async def send_location(self, socket: WebSocketServerProtocol) -> None:
|
|
if self.room is None or self.room_same_for < 1:
|
|
return
|
|
|
|
message = {
|
|
"type":"location",
|
|
"refresh": True,
|
|
"room": f'0x{self.room:02X}',
|
|
"x": self.screen_x,
|
|
"y": self.screen_y,
|
|
"drawFine": True,
|
|
}
|
|
|
|
if message != self.last_location_message:
|
|
self.last_location_message = message
|
|
await socket.send(json.dumps(message))
|
|
|
|
async def send_entrances(self, socket: WebSocketServerProtocol, diff: bool=True) -> typing.Dict[str, str]:
|
|
if not self.entrance_mapping:
|
|
return
|
|
|
|
new_entrances = [x for x in self.entrances_by_name.values() if x.changed or (not diff and x.other_side_name)]
|
|
|
|
if not new_entrances:
|
|
return
|
|
|
|
message = {
|
|
"type":"entrance",
|
|
"refresh": True,
|
|
"diff": True,
|
|
"entranceMap": {},
|
|
}
|
|
|
|
for entrance in new_entrances:
|
|
message['entranceMap'][entrance.name] = entrance.other_side_name
|
|
entrance.changed = False
|
|
|
|
await socket.send(json.dumps(message))
|
|
|
|
new_to_server = {
|
|
entrance.name: entrance.other_side_name
|
|
for entrance in new_entrances
|
|
if not entrance.known_to_server
|
|
}
|
|
|
|
return new_to_server
|
|
|
|
def receive_found_entrances(self, found_entrances: typing.Dict[str, str]):
|
|
if not found_entrances:
|
|
return
|
|
|
|
for entrance, destination in found_entrances.items():
|
|
if entrance in self.entrances_by_name:
|
|
self.entrances_by_name[entrance].map(destination, known_to_server=True)
|