* Test: add hosting simulation test * WebHost: add weak typing to get_app() * MultiServer: add typing to auto_saver_thread * MultiServer: don't cancel task, properly end it * customserver: stop auto-save thread from saving after shutdown and make sure it stops, another potential memory leak * MultiServer, customserver: make datapackage small again * customserver: collect/finish room tasks Hopefully fixes the memory leak we are seeing * CI: test hosting * Test: hosting: verify autohoster saves on Ctrl+C * customserver: save when stopping via Ctrl+C
		
			
				
	
	
		
			111 lines
		
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			111 lines
		
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import json
 | 
						|
import sys
 | 
						|
from typing import Any, Collection, Dict, Iterable, Optional
 | 
						|
from websockets import ConnectionClosed
 | 
						|
from websockets.sync.client import connect, ClientConnection
 | 
						|
from threading import Thread
 | 
						|
 | 
						|
 | 
						|
__all__ = [
 | 
						|
    "Client"
 | 
						|
]
 | 
						|
 | 
						|
 | 
						|
class Client:
 | 
						|
    """Incomplete, minimalistic sync test client for AP network protocol"""
 | 
						|
 | 
						|
    recv_timeout = 1.0
 | 
						|
 | 
						|
    host: str
 | 
						|
    game: str
 | 
						|
    slot: str
 | 
						|
    password: Optional[str]
 | 
						|
 | 
						|
    _ws: Optional[ClientConnection]
 | 
						|
 | 
						|
    games: Iterable[str]
 | 
						|
    data_package_checksums: Dict[str, Any]
 | 
						|
    games_packages: Dict[str, Any]
 | 
						|
    missing_locations: Collection[int]
 | 
						|
    checked_locations: Collection[int]
 | 
						|
 | 
						|
    def __init__(self, host: str, game: str, slot: str, password: Optional[str] = None) -> None:
 | 
						|
        self.host = host
 | 
						|
        self.game = game
 | 
						|
        self.slot = slot
 | 
						|
        self.password = password
 | 
						|
        self._ws = None
 | 
						|
        self.games = []
 | 
						|
        self.data_package_checksums = {}
 | 
						|
        self.games_packages = {}
 | 
						|
        self.missing_locations = []
 | 
						|
        self.checked_locations = []
 | 
						|
 | 
						|
    def __enter__(self) -> "Client":
 | 
						|
        try:
 | 
						|
            self.connect()
 | 
						|
        except BaseException:
 | 
						|
            self.__exit__(*sys.exc_info())
 | 
						|
            raise
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:  # type: ignore
 | 
						|
        self.close()
 | 
						|
 | 
						|
    def _poll(self) -> None:
 | 
						|
        assert self._ws
 | 
						|
        try:
 | 
						|
            while True:
 | 
						|
                self._ws.recv()
 | 
						|
        except (TimeoutError, ConnectionClosed, KeyboardInterrupt, SystemExit):
 | 
						|
            pass
 | 
						|
 | 
						|
    def connect(self) -> None:
 | 
						|
        self._ws = connect(f"ws://{self.host}")
 | 
						|
        room_info = json.loads(self._ws.recv(self.recv_timeout))[0]
 | 
						|
        self.games = sorted(room_info["games"])
 | 
						|
        self.data_package_checksums = room_info["datapackage_checksums"]
 | 
						|
        self._ws.send(json.dumps([{
 | 
						|
            "cmd": "GetDataPackage",
 | 
						|
            "games": list(self.games),
 | 
						|
        }]))
 | 
						|
        data_package_msg = json.loads(self._ws.recv(self.recv_timeout))[0]
 | 
						|
        self.games_packages = data_package_msg["data"]["games"]
 | 
						|
        self._ws.send(json.dumps([{
 | 
						|
            "cmd": "Connect",
 | 
						|
            "game": self.game,
 | 
						|
            "name": self.slot,
 | 
						|
            "password": self.password,
 | 
						|
            "uuid": "",
 | 
						|
            "version": {
 | 
						|
                "class": "Version",
 | 
						|
                "major": 0,
 | 
						|
                "minor": 4,
 | 
						|
                "build": 6,
 | 
						|
            },
 | 
						|
            "items_handling": 0,
 | 
						|
            "tags": [],
 | 
						|
            "slot_data": False,
 | 
						|
        }]))
 | 
						|
        connect_result_msg = json.loads(self._ws.recv(self.recv_timeout))[0]
 | 
						|
        if connect_result_msg["cmd"] != "Connected":
 | 
						|
            raise ConnectionError(", ".join(connect_result_msg.get("errors", [connect_result_msg["cmd"]])))
 | 
						|
        self.missing_locations = connect_result_msg["missing_locations"]
 | 
						|
        self.checked_locations = connect_result_msg["checked_locations"]
 | 
						|
 | 
						|
    def close(self) -> None:
 | 
						|
        if self._ws:
 | 
						|
            Thread(target=self._poll).start()
 | 
						|
            self._ws.close()
 | 
						|
 | 
						|
    def collect(self, locations: Iterable[int]) -> None:
 | 
						|
        if not self._ws:
 | 
						|
            raise ValueError("Not connected")
 | 
						|
        self._ws.send(json.dumps([{
 | 
						|
            "cmd": "LocationChecks",
 | 
						|
            "locations": locations,
 | 
						|
        }]))
 | 
						|
 | 
						|
    def collect_any(self) -> None:
 | 
						|
        self.collect([next(iter(self.missing_locations))])
 |