* Expand and validate the RAM cache * Part way through location improvement * Fixed location tracking * Preliminary entrance tracking support * Actually send entrance messages * Store found entrances on the server * Bit of cleanup * Added rupee count, items linked to checks * Send Magpie a handshAck * Got my own version wrong * Remove the Beta name * Only send slot_data if there's something in it * Ask the server for entrance updates * Small fix to stabilize Link's location when changing rooms * Oops, server storage is shared between worlds * Deal with null responses from the server * Added UNUSED_KEY item
		
			
				
	
	
		
			267 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			267 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import json
 | 
						|
import typing
 | 
						|
from websockets import WebSocketServerProtocol
 | 
						|
 | 
						|
from . import TrackerConsts as Consts
 | 
						|
from .TrackerConsts import EntranceCoord
 | 
						|
from .LADXR.entranceInfo import ENTRANCE_INFO
 | 
						|
 | 
						|
class Entrance:
 | 
						|
    outdoor_room: int
 | 
						|
    indoor_map: int
 | 
						|
    indoor_address: int
 | 
						|
    name: str
 | 
						|
    other_side_name: str = None
 | 
						|
    changed: bool = False
 | 
						|
    known_to_server: bool = False
 | 
						|
 | 
						|
    def __init__(self, outdoor: int, indoor: int, name: str, indoor_address: int=None):
 | 
						|
        self.outdoor_room = outdoor
 | 
						|
        self.indoor_map = indoor
 | 
						|
        self.indoor_address = indoor_address
 | 
						|
        self.name = name
 | 
						|
 | 
						|
    def map(self, other_side: str, known_to_server: bool = False):
 | 
						|
        if other_side != self.other_side_name:
 | 
						|
            self.changed = True
 | 
						|
            self.known_to_server = known_to_server
 | 
						|
        
 | 
						|
        self.other_side_name = other_side
 | 
						|
 | 
						|
class GpsTracker:
 | 
						|
    room: int = None
 | 
						|
    last_room: int = None
 | 
						|
    last_different_room: int = None
 | 
						|
    room_same_for: int = 0
 | 
						|
    room_changed: bool = False
 | 
						|
    screen_x: int = 0
 | 
						|
    screen_y: int = 0
 | 
						|
    spawn_x: int = 0
 | 
						|
    spawn_y: int = 0
 | 
						|
    indoors: int = None
 | 
						|
    indoors_changed: bool = False
 | 
						|
    spawn_map: int = None
 | 
						|
    spawn_room: int = None
 | 
						|
    spawn_changed: bool = False
 | 
						|
    spawn_same_for: int = 0
 | 
						|
    entrance_mapping: typing.Dict[str, str] = None
 | 
						|
    entrances_by_name: typing.Dict[str, Entrance] = {}
 | 
						|
    needs_found_entrances: bool = False
 | 
						|
    needs_slot_data: bool = True
 | 
						|
 | 
						|
    def __init__(self, gameboy) -> None:
 | 
						|
        self.gameboy = gameboy
 | 
						|
 | 
						|
        self.gameboy.set_location_range(
 | 
						|
            Consts.link_motion_state,
 | 
						|
            Consts.transition_sequence - Consts.link_motion_state + 1,
 | 
						|
            [Consts.transition_state]
 | 
						|
        )
 | 
						|
 | 
						|
    async def read_byte(self, b: int):
 | 
						|
        return (await self.gameboy.read_memory_cache([b]))[b]
 | 
						|
 | 
						|
    def load_slot_data(self, slot_data: typing.Dict[str, typing.Any]):
 | 
						|
        if 'entrance_mapping' not in slot_data:
 | 
						|
            return
 | 
						|
 | 
						|
        # We need to know how entrances were mapped at generation before we can autotrack them
 | 
						|
        self.entrance_mapping = {}
 | 
						|
 | 
						|
        # Convert to upstream's newer format
 | 
						|
        for outside, inside in slot_data['entrance_mapping'].items():
 | 
						|
            new_inside = f"{inside}:inside"
 | 
						|
            self.entrance_mapping[outside] = new_inside
 | 
						|
            self.entrance_mapping[new_inside] = outside
 | 
						|
 | 
						|
        self.entrances_by_name = {} 
 | 
						|
 | 
						|
        for name, info in ENTRANCE_INFO.items():
 | 
						|
            alternate_address = (
 | 
						|
                Consts.entrance_address_overrides[info.target]
 | 
						|
                if info.target in Consts.entrance_address_overrides
 | 
						|
                else None
 | 
						|
            )
 | 
						|
 | 
						|
            entrance = Entrance(info.room, info.target, name, alternate_address)
 | 
						|
            self.entrances_by_name[name] = entrance
 | 
						|
 | 
						|
            inside_entrance = Entrance(info.target, info.room, f"{name}:inside", alternate_address)
 | 
						|
            self.entrances_by_name[f"{name}:inside"] = inside_entrance
 | 
						|
        
 | 
						|
        self.needs_slot_data = False
 | 
						|
        self.needs_found_entrances = True
 | 
						|
 | 
						|
    async def read_location(self):
 | 
						|
        # We need to wait for screen transitions to finish
 | 
						|
        transition_state = await self.read_byte(Consts.transition_state)
 | 
						|
        transition_target_x = await self.read_byte(Consts.transition_target_x)
 | 
						|
        transition_target_y = await self.read_byte(Consts.transition_target_y)
 | 
						|
        transition_scroll_x = await self.read_byte(Consts.transition_scroll_x)
 | 
						|
        transition_scroll_y = await self.read_byte(Consts.transition_scroll_y)
 | 
						|
        transition_sequence = await self.read_byte(Consts.transition_sequence)
 | 
						|
        motion_state = await self.read_byte(Consts.link_motion_state)
 | 
						|
        if (transition_state != 0
 | 
						|
            or transition_target_x != transition_scroll_x
 | 
						|
            or transition_target_y != transition_scroll_y
 | 
						|
            or transition_sequence != 0x04):
 | 
						|
            return
 | 
						|
 | 
						|
        indoors = await self.read_byte(Consts.indoor_flag)
 | 
						|
 | 
						|
        if indoors != self.indoors and self.indoors != None:
 | 
						|
            self.indoors_changed = True
 | 
						|
 | 
						|
        self.indoors = indoors
 | 
						|
 | 
						|
        # We use the spawn point to know which entrance was most recently entered
 | 
						|
        spawn_map = await self.read_byte(Consts.spawn_map)
 | 
						|
        map_digit = Consts.map_map[spawn_map] << 8 if self.spawn_map else 0
 | 
						|
        spawn_room = await self.read_byte(Consts.spawn_room) + map_digit
 | 
						|
        spawn_x = await self.read_byte(Consts.spawn_x)
 | 
						|
        spawn_y = await self.read_byte(Consts.spawn_y)
 | 
						|
 | 
						|
        # The spawn point needs to be settled before we can trust location data
 | 
						|
        if ((spawn_room != self.spawn_room and self.spawn_room != None)
 | 
						|
            or (spawn_map != self.spawn_map and self.spawn_map != None)
 | 
						|
            or (spawn_x != self.spawn_x and self.spawn_x != None)
 | 
						|
            or (spawn_y != self.spawn_y and self.spawn_y != None)):
 | 
						|
            self.spawn_changed = True
 | 
						|
            self.spawn_same_for = 0
 | 
						|
        else:
 | 
						|
            self.spawn_same_for += 1
 | 
						|
 | 
						|
        self.spawn_map = spawn_map
 | 
						|
        self.spawn_room = spawn_room
 | 
						|
        self.spawn_x = spawn_x
 | 
						|
        self.spawn_y = spawn_y
 | 
						|
 | 
						|
        # Spawn point is preferred, but doesn't work for the sidescroller entrances
 | 
						|
        # Those can be addressed by keeping track of which room we're in
 | 
						|
        # Also used to validate that we came from the right room for what the spawn point is mapped to
 | 
						|
        map_id = await self.read_byte(Consts.map_id)
 | 
						|
        if map_id not in Consts.map_map:
 | 
						|
            print(f'Unknown map ID {hex(map_id)}')
 | 
						|
            return
 | 
						|
 | 
						|
        map_digit = Consts.map_map[map_id] << 8 if indoors else 0
 | 
						|
        self.last_room = self.room
 | 
						|
        self.room = await self.read_byte(Consts.room) + map_digit
 | 
						|
 | 
						|
        # Again, the room needs to settle before we can trust location data
 | 
						|
        if self.last_room != self.room:
 | 
						|
            self.room_same_for = 0
 | 
						|
            self.room_changed = True
 | 
						|
            self.last_different_room = self.last_room
 | 
						|
        else:
 | 
						|
            self.room_same_for += 1
 | 
						|
 | 
						|
        # Only update Link's location when he's not in the air to avoid weirdness
 | 
						|
        if motion_state in [0, 1]:
 | 
						|
            coords = await self.read_byte(Consts.screen_coord)
 | 
						|
            self.screen_x = coords & 0x0F
 | 
						|
            self.screen_y = (coords & 0xF0) >> 4
 | 
						|
 | 
						|
    async def read_entrances(self):
 | 
						|
        if not self.last_different_room or not self.entrance_mapping:
 | 
						|
            return
 | 
						|
 | 
						|
        if self.spawn_changed and self.spawn_same_for > 0 and self.room_same_for > 0:
 | 
						|
            # Use the spawn location, last room, and entrance mapping at generation to map the right entrance
 | 
						|
            # A bit overkill for simple ER, but necessary for upstream's advanced ER
 | 
						|
            spawn_coord = EntranceCoord(None, self.spawn_room, self.spawn_x, self.spawn_y)
 | 
						|
            if str(spawn_coord) in Consts.entrance_lookup:
 | 
						|
                valid_sources = {x.name for x in Consts.entrance_coords if x.room == self.last_different_room}
 | 
						|
                dest_entrance = Consts.entrance_lookup[str(spawn_coord)].name
 | 
						|
                source_entrance = [
 | 
						|
                    x for x in self.entrance_mapping
 | 
						|
                    if self.entrance_mapping[x] == dest_entrance and x in valid_sources
 | 
						|
                ]
 | 
						|
 | 
						|
                if source_entrance:
 | 
						|
                    self.entrances_by_name[source_entrance[0]].map(dest_entrance)
 | 
						|
 | 
						|
            self.spawn_changed = False
 | 
						|
        elif self.room_changed and self.room_same_for > 0:
 | 
						|
            # Check for the stupid sidescroller rooms that don't set your spawn point
 | 
						|
            if self.last_different_room in Consts.sidescroller_rooms:
 | 
						|
                source_entrance = Consts.sidescroller_rooms[self.last_different_room]
 | 
						|
                if source_entrance in self.entrance_mapping:
 | 
						|
                    dest_entrance = self.entrance_mapping[source_entrance]
 | 
						|
 | 
						|
                    expected_room = self.entrances_by_name[dest_entrance].outdoor_room
 | 
						|
                    if dest_entrance.endswith(":indoor"):
 | 
						|
                        expected_room = self.entrances_by_name[dest_entrance].indoor_map
 | 
						|
 | 
						|
                    if expected_room == self.room:
 | 
						|
                        self.entrances_by_name[source_entrance].map(dest_entrance)
 | 
						|
 | 
						|
            if self.room in Consts.sidescroller_rooms:
 | 
						|
                valid_sources = {x.name for x in Consts.entrance_coords if x.room == self.last_different_room}
 | 
						|
                dest_entrance = Consts.sidescroller_rooms[self.room]
 | 
						|
                source_entrance = [
 | 
						|
                    x for x in self.entrance_mapping
 | 
						|
                    if self.entrance_mapping[x] == dest_entrance and x in valid_sources
 | 
						|
                ]
 | 
						|
 | 
						|
                if source_entrance:
 | 
						|
                    self.entrances_by_name[source_entrance[0]].map(dest_entrance)
 | 
						|
 | 
						|
            self.room_changed = False
 | 
						|
 | 
						|
    last_location_message = {}
 | 
						|
    async def send_location(self, socket: WebSocketServerProtocol) -> None:
 | 
						|
        if self.room is None or self.room_same_for < 1: 
 | 
						|
            return
 | 
						|
 | 
						|
        message = {
 | 
						|
            "type":"location",
 | 
						|
            "refresh": True,
 | 
						|
            "room": f'0x{self.room:02X}',
 | 
						|
            "x": self.screen_x,
 | 
						|
            "y": self.screen_y,
 | 
						|
            "drawFine": True,
 | 
						|
        }
 | 
						|
 | 
						|
        if message != self.last_location_message:
 | 
						|
            self.last_location_message = message
 | 
						|
            await socket.send(json.dumps(message))
 | 
						|
 | 
						|
    async def send_entrances(self, socket: WebSocketServerProtocol, diff: bool=True) -> typing.Dict[str, str]:
 | 
						|
        if not self.entrance_mapping:
 | 
						|
            return
 | 
						|
 | 
						|
        new_entrances = [x for x in self.entrances_by_name.values() if x.changed or (not diff and x.other_side_name)]
 | 
						|
 | 
						|
        if not new_entrances:
 | 
						|
            return
 | 
						|
 | 
						|
        message = {
 | 
						|
            "type":"entrance",
 | 
						|
            "refresh": True,
 | 
						|
            "diff": True,
 | 
						|
            "entranceMap": {},
 | 
						|
        }
 | 
						|
 | 
						|
        for entrance in new_entrances:
 | 
						|
            message['entranceMap'][entrance.name] = entrance.other_side_name
 | 
						|
            entrance.changed = False
 | 
						|
 | 
						|
        await socket.send(json.dumps(message))
 | 
						|
 | 
						|
        new_to_server = { 
 | 
						|
            entrance.name: entrance.other_side_name 
 | 
						|
            for entrance in new_entrances 
 | 
						|
            if not entrance.known_to_server 
 | 
						|
        } 
 | 
						|
 
 | 
						|
        return new_to_server
 | 
						|
 | 
						|
    def receive_found_entrances(self, found_entrances: typing.Dict[str, str]):
 | 
						|
        if not found_entrances:
 | 
						|
            return
 | 
						|
 | 
						|
        for entrance, destination in found_entrances.items():
 | 
						|
            if entrance in self.entrances_by_name:
 | 
						|
                self.entrances_by_name[entrance].map(destination, known_to_server=True)
 |