mirror of
https://github.com/MarioSpore/Grinch-AP.git
synced 2025-10-21 04:01:32 -06:00

Calling the dunder method has to: 1. Look up the dunder method for that object/class 2. Bind a new method instance to the object instance 3. Call the method with its arguments 4. Run the appropriate operation on the object Whereas running the appropriate operation on the object from the start skips straight to step 4. Region.Register.__getitem__ is called a lot without #4583. In that case, generation of 10 template Blasphemous yamls with `--skip_output --seed 1` and progression balancing disabled went from 19.0s to 18.8s (1.3% reduction in generation duration). From profiling with `timeit` ```py def __getitem__(self, index: int) -> Location: return self._list[index] ``` appears to be about twice as fast as the old code: ```py def __getitem__(self, index: int) -> Location: return self._list.__getitem__(index) ``` Besides this, there is not expected to be any noticeable difference in performance, and there is not expected to be any difference in semantics with these changes. Co-authored-by: NewSoupVi <57900059+NewSoupVi@users.noreply.github.com>
1146 lines
49 KiB
Python
1146 lines
49 KiB
Python
from __future__ import annotations
|
|
|
|
import collections
|
|
import copy
|
|
import logging
|
|
import asyncio
|
|
import urllib.parse
|
|
import sys
|
|
import typing
|
|
import time
|
|
import functools
|
|
import warnings
|
|
|
|
import ModuleUpdate
|
|
ModuleUpdate.update()
|
|
|
|
import websockets
|
|
|
|
import Utils
|
|
|
|
if __name__ == "__main__":
|
|
Utils.init_logging("TextClient", exception_logger="Client")
|
|
|
|
from MultiServer import CommandProcessor
|
|
from NetUtils import (Endpoint, decode, NetworkItem, encode, JSONtoTextParser, ClientStatus, Permission, NetworkSlot,
|
|
RawJSONtoTextParser, add_json_text, add_json_location, add_json_item, JSONTypes, HintStatus, SlotType)
|
|
from Utils import Version, stream_input, async_start
|
|
from worlds import network_data_package, AutoWorldRegister
|
|
import os
|
|
import ssl
|
|
|
|
if typing.TYPE_CHECKING:
|
|
import kvui
|
|
import argparse
|
|
|
|
logger = logging.getLogger("Client")
|
|
|
|
# without terminal, we have to use gui mode
|
|
gui_enabled = not sys.stdout or "--nogui" not in sys.argv
|
|
|
|
|
|
@Utils.cache_argsless
|
|
def get_ssl_context():
|
|
import certifi
|
|
return ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=certifi.where())
|
|
|
|
|
|
class ClientCommandProcessor(CommandProcessor):
|
|
"""
|
|
The Command Processor will parse every method of the class that starts with "_cmd_" as a command to be called
|
|
when parsing user input, i.e. _cmd_exit will be called when the user sends the command "/exit".
|
|
|
|
The decorator @mark_raw can be imported from MultiServer and tells the parser to only split on the first
|
|
space after the command i.e. "/exit one two three" will be passed in as method("one two three") with mark_raw
|
|
and method("one", "two", "three") without.
|
|
|
|
In addition all docstrings for command methods will be displayed to the user on launch and when using "/help"
|
|
"""
|
|
def __init__(self, ctx: CommonContext):
|
|
self.ctx = ctx
|
|
|
|
def output(self, text: str):
|
|
"""Helper function to abstract logging to the CommonClient UI"""
|
|
logger.info(text)
|
|
|
|
def _cmd_exit(self) -> bool:
|
|
"""Close connections and client"""
|
|
self.ctx.exit_event.set()
|
|
return True
|
|
|
|
def _cmd_connect(self, address: str = "") -> bool:
|
|
"""Connect to a MultiWorld Server"""
|
|
if address:
|
|
self.ctx.server_address = None
|
|
self.ctx.username = None
|
|
self.ctx.password = None
|
|
elif not self.ctx.server_address:
|
|
self.output("Please specify an address.")
|
|
return False
|
|
async_start(self.ctx.connect(address if address else None), name="connecting")
|
|
return True
|
|
|
|
def _cmd_disconnect(self) -> bool:
|
|
"""Disconnect from a MultiWorld Server"""
|
|
async_start(self.ctx.disconnect(), name="disconnecting")
|
|
return True
|
|
|
|
def _cmd_received(self) -> bool:
|
|
"""List all received items"""
|
|
item: NetworkItem
|
|
self.output(f'{len(self.ctx.items_received)} received items, sorted by time:')
|
|
for index, item in enumerate(self.ctx.items_received, 1):
|
|
parts = []
|
|
add_json_item(parts, item.item, self.ctx.slot, item.flags)
|
|
add_json_text(parts, " from ")
|
|
add_json_location(parts, item.location, item.player)
|
|
add_json_text(parts, " by ")
|
|
add_json_text(parts, item.player, type=JSONTypes.player_id)
|
|
self.ctx.on_print_json({"data": parts, "cmd": "PrintJSON"})
|
|
return True
|
|
|
|
def _cmd_missing(self, filter_text = "") -> bool:
|
|
"""List all missing location checks, from your local game state.
|
|
Can be given text, which will be used as filter."""
|
|
if not self.ctx.game:
|
|
self.output("No game set, cannot determine missing checks.")
|
|
return False
|
|
count = 0
|
|
checked_count = 0
|
|
for location, location_id in AutoWorldRegister.world_types[self.ctx.game].location_name_to_id.items():
|
|
if filter_text and filter_text not in location:
|
|
continue
|
|
if location_id < 0:
|
|
continue
|
|
if location_id not in self.ctx.locations_checked:
|
|
if location_id in self.ctx.missing_locations:
|
|
self.output('Missing: ' + location)
|
|
count += 1
|
|
elif location_id in self.ctx.checked_locations:
|
|
self.output('Checked: ' + location)
|
|
count += 1
|
|
checked_count += 1
|
|
|
|
if count:
|
|
self.output(
|
|
f"Found {count} missing location checks{f'. {checked_count} location checks previously visited.' if checked_count else ''}")
|
|
else:
|
|
self.output("No missing location checks found.")
|
|
return True
|
|
|
|
def _cmd_items(self):
|
|
"""List all item names for the currently running game."""
|
|
if not self.ctx.game:
|
|
self.output("No game set, cannot determine existing items.")
|
|
return False
|
|
self.output(f"Item Names for {self.ctx.game}")
|
|
for item_name in AutoWorldRegister.world_types[self.ctx.game].item_name_to_id:
|
|
self.output(item_name)
|
|
|
|
def _cmd_item_groups(self):
|
|
"""List all item group names for the currently running game."""
|
|
if not self.ctx.game:
|
|
self.output("No game set, cannot determine existing item groups.")
|
|
return False
|
|
self.output(f"Item Group Names for {self.ctx.game}")
|
|
for group_name in AutoWorldRegister.world_types[self.ctx.game].item_name_groups:
|
|
self.output(group_name)
|
|
|
|
def _cmd_locations(self):
|
|
"""List all location names for the currently running game."""
|
|
if not self.ctx.game:
|
|
self.output("No game set, cannot determine existing locations.")
|
|
return False
|
|
self.output(f"Location Names for {self.ctx.game}")
|
|
for location_name in AutoWorldRegister.world_types[self.ctx.game].location_name_to_id:
|
|
self.output(location_name)
|
|
|
|
def _cmd_location_groups(self):
|
|
"""List all location group names for the currently running game."""
|
|
if not self.ctx.game:
|
|
self.output("No game set, cannot determine existing location groups.")
|
|
return False
|
|
self.output(f"Location Group Names for {self.ctx.game}")
|
|
for group_name in AutoWorldRegister.world_types[self.ctx.game].location_name_groups:
|
|
self.output(group_name)
|
|
|
|
def _cmd_ready(self):
|
|
"""Send ready status to server."""
|
|
self.ctx.ready = not self.ctx.ready
|
|
if self.ctx.ready:
|
|
state = ClientStatus.CLIENT_READY
|
|
self.output("Readied up.")
|
|
else:
|
|
state = ClientStatus.CLIENT_CONNECTED
|
|
self.output("Unreadied.")
|
|
async_start(self.ctx.send_msgs([{"cmd": "StatusUpdate", "status": state}]), name="send StatusUpdate")
|
|
|
|
def default(self, raw: str):
|
|
"""The default message parser to be used when parsing any messages that do not match a command"""
|
|
raw = self.ctx.on_user_say(raw)
|
|
if raw:
|
|
async_start(self.ctx.send_msgs([{"cmd": "Say", "text": raw}]), name="send Say")
|
|
|
|
|
|
class CommonContext:
|
|
# The following attributes are used to Connect and should be adjusted as needed in subclasses
|
|
tags: typing.Set[str] = {"AP"}
|
|
game: typing.Optional[str] = None
|
|
items_handling: typing.Optional[int] = None
|
|
want_slot_data: bool = True # should slot_data be retrieved via Connect
|
|
|
|
class NameLookupDict:
|
|
"""A specialized dict, with helper methods, for id -> name item/location data package lookups by game."""
|
|
def __init__(self, ctx: CommonContext, lookup_type: typing.Literal["item", "location"]):
|
|
self.ctx: CommonContext = ctx
|
|
self.lookup_type: typing.Literal["item", "location"] = lookup_type
|
|
self._unknown_item: typing.Callable[[int], str] = lambda key: f"Unknown {lookup_type} (ID: {key})"
|
|
self._archipelago_lookup: typing.Dict[int, str] = {}
|
|
self._game_store: typing.Dict[str, typing.ChainMap[int, str]] = collections.defaultdict(
|
|
lambda: collections.ChainMap(self._archipelago_lookup, Utils.KeyedDefaultDict(self._unknown_item)))
|
|
|
|
# noinspection PyTypeChecker
|
|
def __getitem__(self, key: str) -> typing.Mapping[int, str]:
|
|
assert isinstance(key, str), f"ctx.{self.lookup_type}_names used with an id, use the lookup_in_ helpers instead"
|
|
return self._game_store[key]
|
|
|
|
def __len__(self) -> int:
|
|
return len(self._game_store)
|
|
|
|
def __iter__(self) -> typing.Iterator[str]:
|
|
return iter(self._game_store)
|
|
|
|
def __repr__(self) -> str:
|
|
return repr(self._game_store)
|
|
|
|
def lookup_in_game(self, code: int, game_name: typing.Optional[str] = None) -> str:
|
|
"""Returns the name for an item/location id in the context of a specific game or own game if `game` is
|
|
omitted.
|
|
"""
|
|
if game_name is None:
|
|
game_name = self.ctx.game
|
|
assert game_name is not None, f"Attempted to lookup {self.lookup_type} with no game name available."
|
|
|
|
return self._game_store[game_name][code]
|
|
|
|
def lookup_in_slot(self, code: int, slot: typing.Optional[int] = None) -> str:
|
|
"""Returns the name for an item/location id in the context of a specific slot or own slot if `slot` is
|
|
omitted.
|
|
|
|
Use of `lookup_in_slot` should not be used when not connected to a server. If looking in own game, set
|
|
`ctx.game` and use `lookup_in_game` method instead.
|
|
"""
|
|
if slot is None:
|
|
slot = self.ctx.slot
|
|
assert slot is not None, f"Attempted to lookup {self.lookup_type} with no slot info available."
|
|
|
|
return self.lookup_in_game(code, self.ctx.slot_info[slot].game)
|
|
|
|
def update_game(self, game: str, name_to_id_lookup_table: typing.Dict[str, int]) -> None:
|
|
"""Overrides existing lookup tables for a particular game."""
|
|
id_to_name_lookup_table = Utils.KeyedDefaultDict(self._unknown_item)
|
|
id_to_name_lookup_table.update({code: name for name, code in name_to_id_lookup_table.items()})
|
|
self._game_store[game] = collections.ChainMap(self._archipelago_lookup, id_to_name_lookup_table)
|
|
if game == "Archipelago":
|
|
# Keep track of the Archipelago data package separately so if it gets updated in a custom datapackage,
|
|
# it updates in all chain maps automatically.
|
|
self._archipelago_lookup.clear()
|
|
self._archipelago_lookup.update(id_to_name_lookup_table)
|
|
|
|
# defaults
|
|
starting_reconnect_delay: int = 5
|
|
current_reconnect_delay: int = starting_reconnect_delay
|
|
command_processor: typing.Type[CommandProcessor] = ClientCommandProcessor
|
|
ui: typing.Optional["kvui.GameManager"] = None
|
|
ui_task: typing.Optional["asyncio.Task[None]"] = None
|
|
input_task: typing.Optional["asyncio.Task[None]"] = None
|
|
keep_alive_task: typing.Optional["asyncio.Task[None]"] = None
|
|
server_task: typing.Optional["asyncio.Task[None]"] = None
|
|
autoreconnect_task: typing.Optional["asyncio.Task[None]"] = None
|
|
disconnected_intentionally: bool = False
|
|
server: typing.Optional[Endpoint] = None
|
|
server_version: Version = Version(0, 0, 0)
|
|
generator_version: Version = Version(0, 0, 0)
|
|
current_energy_link_value: typing.Optional[int] = None # to display in UI, gets set by server
|
|
max_size: int = 16*1024*1024 # 16 MB of max incoming packet size
|
|
|
|
last_death_link: float = time.time() # last send/received death link on AP layer
|
|
|
|
# remaining type info
|
|
slot_info: dict[int, NetworkSlot]
|
|
"""Slot Info from the server for the current connection"""
|
|
server_address: str | None
|
|
"""Autoconnect address provided by the ctx constructor"""
|
|
password: str | None
|
|
"""Password used for Connecting, expected by server_auth"""
|
|
hint_cost: int | None
|
|
"""Current Hint Cost per Hint from the server"""
|
|
hint_points: int | None
|
|
"""Current avaliable Hint Points from the server"""
|
|
player_names: dict[int, str]
|
|
"""Current lookup of slot number to player display name from server (includes aliases)"""
|
|
|
|
finished_game: bool
|
|
"""
|
|
Bool to signal that status should be updated to Goal after reconnecting
|
|
to be used to ensure that a StatusUpdate packet does not get lost when disconnected
|
|
"""
|
|
ready: bool
|
|
"""Bool to keep track of state for the /ready command"""
|
|
team: int | None
|
|
"""Team number of currently connected slot"""
|
|
slot: int | None
|
|
"""Slot number of currently connected slot"""
|
|
auth: str | None
|
|
"""Name used in Connect packet"""
|
|
seed_name: str | None
|
|
"""Seed name that will be validated on opening a socket if present"""
|
|
|
|
# locations
|
|
locations_checked: set[int]
|
|
"""
|
|
Local container of location ids checked to signal that LocationChecks should be resent after reconnecting
|
|
to be used to ensure that a LocationChecks packet does not get lost when disconnected
|
|
"""
|
|
locations_scouted: set[int]
|
|
"""
|
|
Local container of location ids scouted to signal that LocationScouts should be resent after reconnecting
|
|
to be used to ensure that a LocationScouts packet does not get lost when disconnected
|
|
"""
|
|
items_received: list[NetworkItem]
|
|
"""List of NetworkItems recieved from the server"""
|
|
missing_locations: set[int]
|
|
"""Container of Locations that are unchecked per server state"""
|
|
checked_locations: set[int]
|
|
"""Container of Locations that are checked per server state"""
|
|
server_locations: set[int]
|
|
"""Container of Locations that exist per server state; a combination between missing and checked locations"""
|
|
locations_info: dict[int, NetworkItem]
|
|
"""Dict of location id: NetworkItem info from LocationScouts request"""
|
|
|
|
# data storage
|
|
stored_data: dict[str, typing.Any]
|
|
"""
|
|
Data Storage values by key that were retrieved from the server
|
|
any keys subscribed to with SetNotify will be kept up to date
|
|
"""
|
|
stored_data_notification_keys: set[str]
|
|
"""Current container of watched Data Storage keys, managed by ctx.set_notify"""
|
|
|
|
# internals
|
|
_messagebox: typing.Optional["kvui.MessageBox"] = None
|
|
"""Current message box through kvui"""
|
|
_messagebox_connection_loss: typing.Optional["kvui.MessageBox"] = None
|
|
"""Message box reporting a loss of connection"""
|
|
|
|
def __init__(self, server_address: typing.Optional[str] = None, password: typing.Optional[str] = None) -> None:
|
|
# server state
|
|
self.server_address = server_address
|
|
self.username = None
|
|
self.password = password
|
|
self.hint_cost = None
|
|
self.slot_info = {}
|
|
self.permissions = {
|
|
"release": "disabled",
|
|
"collect": "disabled",
|
|
"remaining": "disabled",
|
|
}
|
|
|
|
# own state
|
|
self.finished_game = False
|
|
self.ready = False
|
|
self.team = None
|
|
self.slot = None
|
|
self.auth = None
|
|
self.seed_name = None
|
|
|
|
self.locations_checked = set() # local state
|
|
self.locations_scouted = set()
|
|
self.items_received = []
|
|
self.missing_locations = set() # server state
|
|
self.checked_locations = set() # server state
|
|
self.server_locations = set() # all locations the server knows of, missing_location | checked_locations
|
|
self.locations_info = {}
|
|
|
|
self.stored_data = {}
|
|
self.stored_data_notification_keys = set()
|
|
|
|
self.input_queue = asyncio.Queue()
|
|
self.input_requests = 0
|
|
|
|
# game state
|
|
self.player_names = {0: "Archipelago"}
|
|
self.exit_event = asyncio.Event()
|
|
self.watcher_event = asyncio.Event()
|
|
|
|
self.item_names = self.NameLookupDict(self, "item")
|
|
self.location_names = self.NameLookupDict(self, "location")
|
|
self.checksums = {}
|
|
|
|
self.jsontotextparser = JSONtoTextParser(self)
|
|
self.rawjsontotextparser = RawJSONtoTextParser(self)
|
|
self.update_data_package(network_data_package)
|
|
|
|
# execution
|
|
self.keep_alive_task = asyncio.create_task(keep_alive(self), name="Bouncy")
|
|
|
|
@property
|
|
def suggested_address(self) -> str:
|
|
if self.server_address:
|
|
return self.server_address
|
|
return Utils.persistent_load().get("client", {}).get("last_server_address", "")
|
|
|
|
@functools.cached_property
|
|
def raw_text_parser(self) -> RawJSONtoTextParser:
|
|
return RawJSONtoTextParser(self)
|
|
|
|
@property
|
|
def total_locations(self) -> typing.Optional[int]:
|
|
"""Will return None until connected."""
|
|
if self.checked_locations or self.missing_locations:
|
|
return len(self.checked_locations | self.missing_locations)
|
|
|
|
async def connection_closed(self):
|
|
if self.server and self.server.socket is not None:
|
|
await self.server.socket.close()
|
|
self.reset_server_state()
|
|
|
|
def reset_server_state(self):
|
|
self.auth = None
|
|
self.slot = None
|
|
self.team = None
|
|
self.items_received = []
|
|
self.locations_info = {}
|
|
self.server_version = Version(0, 0, 0)
|
|
self.generator_version = Version(0, 0, 0)
|
|
self.server = None
|
|
self.server_task = None
|
|
self.hint_cost = None
|
|
self.permissions = {
|
|
"release": "disabled",
|
|
"collect": "disabled",
|
|
"remaining": "disabled",
|
|
}
|
|
|
|
async def disconnect(self, allow_autoreconnect: bool = False):
|
|
if not allow_autoreconnect:
|
|
self.disconnected_intentionally = True
|
|
if self.cancel_autoreconnect():
|
|
logger.info("Cancelled auto-reconnect.")
|
|
if self.server and not self.server.socket.closed:
|
|
await self.server.socket.close()
|
|
if self.server_task is not None:
|
|
await self.server_task
|
|
if self.ui:
|
|
self.ui.update_hints()
|
|
|
|
async def send_msgs(self, msgs: typing.List[typing.Any]) -> None:
|
|
""" `msgs` JSON serializable """
|
|
if not self.server or not self.server.socket.open or self.server.socket.closed:
|
|
return
|
|
await self.server.socket.send(encode(msgs))
|
|
|
|
def consume_players_package(self, package: typing.List[tuple]):
|
|
self.player_names = {slot: name for team, slot, name, orig_name in package if self.team == team}
|
|
self.player_names[0] = "Archipelago"
|
|
|
|
def event_invalid_slot(self):
|
|
raise Exception('Invalid Slot; please verify that you have connected to the correct world.')
|
|
|
|
def event_invalid_game(self):
|
|
raise Exception('Invalid Game; please verify that you connected with the right game to the correct world.')
|
|
|
|
async def server_auth(self, password_requested: bool = False):
|
|
if password_requested and not self.password:
|
|
logger.info('Enter the password required to join this game:')
|
|
self.password = await self.console_input()
|
|
return self.password
|
|
|
|
async def get_username(self):
|
|
if not self.auth:
|
|
self.auth = self.username
|
|
if not self.auth:
|
|
logger.info('Enter slot name:')
|
|
self.auth = await self.console_input()
|
|
|
|
async def send_connect(self, **kwargs: typing.Any) -> None:
|
|
"""
|
|
Send a `Connect` packet to log in to the server,
|
|
additional keyword args can override any value in the connection packet
|
|
"""
|
|
payload = {
|
|
'cmd': 'Connect',
|
|
'password': self.password, 'name': self.auth, 'version': Utils.version_tuple,
|
|
'tags': self.tags, 'items_handling': self.items_handling,
|
|
'uuid': Utils.get_unique_identifier(), 'game': self.game, "slot_data": self.want_slot_data,
|
|
}
|
|
if kwargs:
|
|
payload.update(kwargs)
|
|
await self.send_msgs([payload])
|
|
await self.send_msgs([{"cmd": "Get", "keys": ["_read_race_mode"]}])
|
|
|
|
async def check_locations(self, locations: typing.Collection[int]) -> set[int]:
|
|
"""Send new location checks to the server. Returns the set of actually new locations that were sent."""
|
|
locations = set(locations) & self.missing_locations
|
|
if locations:
|
|
await self.send_msgs([{"cmd": 'LocationChecks', "locations": tuple(locations)}])
|
|
return locations
|
|
|
|
async def console_input(self) -> str:
|
|
if self.ui:
|
|
self.ui.focus_textinput()
|
|
self.input_requests += 1
|
|
return await self.input_queue.get()
|
|
|
|
async def connect(self, address: typing.Optional[str] = None) -> None:
|
|
""" disconnect any previous connection, and open new connection to the server """
|
|
await self.disconnect()
|
|
self.server_task = asyncio.create_task(server_loop(self, address), name="server loop")
|
|
|
|
def cancel_autoreconnect(self) -> bool:
|
|
if self.autoreconnect_task:
|
|
self.autoreconnect_task.cancel()
|
|
self.autoreconnect_task = None
|
|
return True
|
|
return False
|
|
|
|
def slot_concerns_self(self, slot) -> bool:
|
|
"""Helper function to abstract player groups, should be used instead of checking slot == self.slot directly."""
|
|
if slot == self.slot:
|
|
return True
|
|
if slot in self.slot_info:
|
|
return self.slot in self.slot_info[slot].group_members
|
|
return False
|
|
|
|
def is_echoed_chat(self, print_json_packet: dict) -> bool:
|
|
"""Helper function for filtering out messages sent by self."""
|
|
return print_json_packet.get("type", "") == "Chat" \
|
|
and print_json_packet.get("team", None) == self.team \
|
|
and print_json_packet.get("slot", None) == self.slot
|
|
|
|
def is_uninteresting_item_send(self, print_json_packet: dict) -> bool:
|
|
"""Helper function for filtering out ItemSend prints that do not concern the local player."""
|
|
return print_json_packet.get("type", "") == "ItemSend" \
|
|
and not self.slot_concerns_self(print_json_packet["receiving"]) \
|
|
and not self.slot_concerns_self(print_json_packet["item"].player)
|
|
|
|
def on_print(self, args: dict):
|
|
logger.info(args["text"])
|
|
|
|
def on_print_json(self, args: dict):
|
|
if self.ui:
|
|
# send copy to UI
|
|
self.ui.print_json(copy.deepcopy(args["data"]))
|
|
|
|
logging.getLogger("FileLog").info(self.rawjsontotextparser(copy.deepcopy(args["data"])),
|
|
extra={"NoStream": True})
|
|
logging.getLogger("StreamLog").info(self.jsontotextparser(copy.deepcopy(args["data"])),
|
|
extra={"NoFile": True})
|
|
|
|
def on_package(self, cmd: str, args: dict):
|
|
"""For custom package handling in subclasses."""
|
|
pass
|
|
|
|
def on_user_say(self, text: str) -> typing.Optional[str]:
|
|
"""Gets called before sending a Say to the server from the user.
|
|
Returned text is sent, or sending is aborted if None is returned."""
|
|
return text
|
|
|
|
def on_ui_command(self, text: str) -> None:
|
|
"""Gets called by kivy when the user executes a command starting with `/` or `!`.
|
|
The command processor is still called; this is just intended for command echoing."""
|
|
self.ui.print_json([{"text": text, "type": "color", "color": "orange"}])
|
|
|
|
def update_permissions(self, permissions: typing.Dict[str, int]):
|
|
"""Internal method to parse and save server permissions from RoomInfo"""
|
|
for permission_name, permission_flag in permissions.items():
|
|
try:
|
|
flag = Permission(permission_flag)
|
|
logger.info(f"{permission_name.capitalize()} permission: {flag.name}")
|
|
self.permissions[permission_name] = flag.name
|
|
except Exception as e: # safeguard against permissions that may be implemented in the future
|
|
logger.exception(e)
|
|
|
|
async def shutdown(self):
|
|
self.server_address = ""
|
|
self.username = None
|
|
self.password = None
|
|
self.cancel_autoreconnect()
|
|
if self.server and not self.server.socket.closed:
|
|
await self.server.socket.close()
|
|
if self.server_task:
|
|
await self.server_task
|
|
|
|
while self.input_requests > 0:
|
|
self.input_queue.put_nowait(None)
|
|
self.input_requests -= 1
|
|
self.keep_alive_task.cancel()
|
|
if self.ui_task:
|
|
await self.ui_task
|
|
if self.input_task:
|
|
self.input_task.cancel()
|
|
|
|
# Hints
|
|
def update_hint(self, location: int, finding_player: int, status: typing.Optional[HintStatus]) -> None:
|
|
msg = {"cmd": "UpdateHint", "location": location, "player": finding_player}
|
|
if status is not None:
|
|
msg["status"] = status
|
|
async_start(self.send_msgs([msg]), name="update_hint")
|
|
|
|
# DataPackage
|
|
async def prepare_data_package(self, relevant_games: typing.Set[str],
|
|
remote_data_package_checksums: typing.Dict[str, str]):
|
|
"""Validate that all data is present for the current multiworld.
|
|
Download, assimilate and cache missing data from the server."""
|
|
# by documentation any game can use Archipelago locations/items -> always relevant
|
|
relevant_games.add("Archipelago")
|
|
|
|
needed_updates: typing.Set[str] = set()
|
|
for game in relevant_games:
|
|
if game not in remote_data_package_checksums:
|
|
continue
|
|
|
|
remote_checksum: typing.Optional[str] = remote_data_package_checksums.get(game)
|
|
|
|
if not remote_checksum: # custom data package and no checksum for this game
|
|
needed_updates.add(game)
|
|
continue
|
|
|
|
cached_checksum: typing.Optional[str] = self.checksums.get(game)
|
|
# no action required if cached version is new enough
|
|
if remote_checksum != cached_checksum:
|
|
local_checksum: typing.Optional[str] = network_data_package["games"].get(game, {}).get("checksum")
|
|
if remote_checksum == local_checksum:
|
|
self.update_game(network_data_package["games"][game], game)
|
|
else:
|
|
cached_game = Utils.load_data_package_for_checksum(game, remote_checksum)
|
|
cache_checksum: typing.Optional[str] = cached_game.get("checksum")
|
|
# download remote version if cache is not new enough
|
|
if remote_checksum != cache_checksum:
|
|
needed_updates.add(game)
|
|
else:
|
|
self.update_game(cached_game, game)
|
|
if needed_updates:
|
|
await self.send_msgs([{"cmd": "GetDataPackage", "games": [game_name]} for game_name in needed_updates])
|
|
|
|
def update_game(self, game_package: dict, game: str):
|
|
self.item_names.update_game(game, game_package["item_name_to_id"])
|
|
self.location_names.update_game(game, game_package["location_name_to_id"])
|
|
self.checksums[game] = game_package.get("checksum")
|
|
|
|
def update_data_package(self, data_package: dict):
|
|
for game, game_data in data_package["games"].items():
|
|
self.update_game(game_data, game)
|
|
|
|
def consume_network_data_package(self, data_package: dict):
|
|
self.update_data_package(data_package)
|
|
logger.info(f"Got new ID/Name DataPackage for {', '.join(data_package['games'])}")
|
|
for game, game_data in data_package["games"].items():
|
|
Utils.store_data_package_for_checksum(game, game_data)
|
|
|
|
# data storage
|
|
|
|
def set_notify(self, *keys: str) -> None:
|
|
"""Subscribe to be notified of changes to selected data storage keys.
|
|
|
|
The values can be accessed via the "stored_data" attribute of this context, which is a dictionary mapping the
|
|
names of the data storage keys to the latest values received from the server.
|
|
"""
|
|
if new_keys := (set(keys) - self.stored_data_notification_keys):
|
|
self.stored_data_notification_keys.update(new_keys)
|
|
async_start(self.send_msgs([{"cmd": "Get",
|
|
"keys": list(new_keys)},
|
|
{"cmd": "SetNotify",
|
|
"keys": list(new_keys)}]))
|
|
|
|
# DeathLink hooks
|
|
|
|
def on_deathlink(self, data: typing.Dict[str, typing.Any]) -> None:
|
|
"""Gets dispatched when a new DeathLink is triggered by another linked player."""
|
|
self.last_death_link = max(data["time"], self.last_death_link)
|
|
text = data.get("cause", "")
|
|
if text:
|
|
logger.info(f"DeathLink: {text}")
|
|
else:
|
|
logger.info(f"DeathLink: Received from {data['source']}")
|
|
|
|
async def send_death(self, death_text: str = ""):
|
|
"""Helper function to send a deathlink using death_text as the unique death cause string."""
|
|
if self.server and self.server.socket:
|
|
logger.info("DeathLink: Sending death to your friends...")
|
|
self.last_death_link = time.time()
|
|
await self.send_msgs([{
|
|
"cmd": "Bounce", "tags": ["DeathLink"],
|
|
"data": {
|
|
"time": self.last_death_link,
|
|
"source": self.player_names[self.slot],
|
|
"cause": death_text
|
|
}
|
|
}])
|
|
|
|
async def update_death_link(self, death_link: bool):
|
|
"""Helper function to set Death Link connection tag on/off and update the connection if already connected."""
|
|
old_tags = self.tags.copy()
|
|
if death_link:
|
|
self.tags.add("DeathLink")
|
|
else:
|
|
self.tags -= {"DeathLink"}
|
|
if old_tags != self.tags and self.server and not self.server.socket.closed:
|
|
await self.send_msgs([{"cmd": "ConnectUpdate", "tags": self.tags}])
|
|
|
|
def gui_error(self, title: str, text: typing.Union[Exception, str]) -> typing.Optional["kvui.MessageBox"]:
|
|
"""Displays an error messagebox in the loaded Kivy UI. Override if using a different UI framework"""
|
|
if not self.ui:
|
|
return None
|
|
title = title or "Error"
|
|
from kvui import MessageBox
|
|
if self._messagebox:
|
|
self._messagebox.dismiss()
|
|
# make "Multiple exceptions" look nice
|
|
text = str(text).replace('[Errno', '\n[Errno').strip()
|
|
# split long messages into title and text
|
|
parts = title.split('. ', 1)
|
|
if len(parts) == 1:
|
|
parts = title.split(', ', 1)
|
|
if len(parts) > 1:
|
|
text = parts[1] + '\n\n' + text
|
|
title = parts[0]
|
|
# display error
|
|
self._messagebox = MessageBox(title, text, error=True)
|
|
self._messagebox.open()
|
|
return self._messagebox
|
|
|
|
def handle_connection_loss(self, msg: str) -> None:
|
|
"""Helper for logging and displaying a loss of connection. Must be called from an except block."""
|
|
exc_info = sys.exc_info()
|
|
logger.exception(msg, exc_info=exc_info, extra={'compact_gui': True})
|
|
self._messagebox_connection_loss = self.gui_error(msg, exc_info[1])
|
|
|
|
def make_gui(self) -> "type[kvui.GameManager]":
|
|
"""
|
|
To return the Kivy `App` class needed for `run_gui` so it can be overridden before being built
|
|
|
|
Common changes are changing `base_title` to update the window title of the client and
|
|
updating `logging_pairs` to automatically make new tabs that can be filled with their respective logger.
|
|
|
|
ex. `logging_pairs.append(("Foo", "Bar"))`
|
|
will add a "Bar" tab which follows the logger returned from `logging.getLogger("Foo")`
|
|
"""
|
|
from kvui import GameManager
|
|
|
|
class TextManager(GameManager):
|
|
base_title = "Archipelago Text Client"
|
|
|
|
return TextManager
|
|
|
|
def run_gui(self):
|
|
"""Import kivy UI system from make_gui() and start running it as self.ui_task."""
|
|
ui_class = self.make_gui()
|
|
self.ui = ui_class(self)
|
|
self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI")
|
|
|
|
def run_cli(self):
|
|
if sys.stdin:
|
|
if sys.stdin.fileno() != 0:
|
|
from multiprocessing import parent_process
|
|
if parent_process():
|
|
return # ignore MultiProcessing pipe
|
|
|
|
# steam overlay breaks when starting console_loop
|
|
if 'gameoverlayrenderer' in os.environ.get('LD_PRELOAD', ''):
|
|
logger.info("Skipping terminal input, due to conflicting Steam Overlay detected. Please use GUI only.")
|
|
else:
|
|
self.input_task = asyncio.create_task(console_loop(self), name="Input")
|
|
|
|
|
|
async def keep_alive(ctx: CommonContext, seconds_between_checks=100):
|
|
"""some ISPs/network configurations drop TCP connections if no payload is sent (ignore TCP-keep-alive)
|
|
so we send a payload to prevent drop and if we were dropped anyway this will cause an auto-reconnect."""
|
|
seconds_elapsed = 0
|
|
while not ctx.exit_event.is_set():
|
|
await asyncio.sleep(1) # short sleep to not block program shutdown
|
|
if ctx.server and ctx.slot:
|
|
seconds_elapsed += 1
|
|
if seconds_elapsed > seconds_between_checks:
|
|
await ctx.send_msgs([{"cmd": "Bounce", "slots": [ctx.slot]}])
|
|
seconds_elapsed = 0
|
|
|
|
|
|
async def server_loop(ctx: CommonContext, address: typing.Optional[str] = None) -> None:
|
|
if ctx.server and ctx.server.socket:
|
|
logger.error('Already connected')
|
|
return
|
|
|
|
if address is None: # set through CLI or APBP
|
|
address = ctx.server_address
|
|
|
|
# Wait for the user to provide a multiworld server address
|
|
if not address:
|
|
logger.info('Please connect to an Archipelago server.')
|
|
return
|
|
|
|
ctx.cancel_autoreconnect()
|
|
if ctx._messagebox_connection_loss:
|
|
ctx._messagebox_connection_loss.dismiss()
|
|
ctx._messagebox_connection_loss = None
|
|
|
|
address = f"ws://{address}" if "://" not in address \
|
|
else address.replace("archipelago://", "ws://")
|
|
|
|
server_url = urllib.parse.urlparse(address)
|
|
if server_url.username:
|
|
ctx.username = server_url.username
|
|
if server_url.password:
|
|
ctx.password = server_url.password
|
|
|
|
def reconnect_hint() -> str:
|
|
return ", type /connect to reconnect" if ctx.server_address else ""
|
|
|
|
logger.info(f'Connecting to Archipelago server at {address}')
|
|
try:
|
|
port = server_url.port or 38281 # raises ValueError if invalid
|
|
socket = await websockets.connect(address, port=port, ping_timeout=None, ping_interval=None,
|
|
ssl=get_ssl_context() if address.startswith("wss://") else None,
|
|
max_size=ctx.max_size)
|
|
if ctx.ui is not None:
|
|
ctx.ui.update_address_bar(server_url.netloc)
|
|
ctx.server = Endpoint(socket)
|
|
logger.info('Connected')
|
|
ctx.server_address = address
|
|
ctx.current_reconnect_delay = ctx.starting_reconnect_delay
|
|
ctx.disconnected_intentionally = False
|
|
async for data in ctx.server.socket:
|
|
for msg in decode(data):
|
|
await process_server_cmd(ctx, msg)
|
|
logger.warning(f"Disconnected from multiworld server{reconnect_hint()}")
|
|
except websockets.InvalidMessage:
|
|
# probably encrypted
|
|
if address.startswith("ws://"):
|
|
# try wss
|
|
await server_loop(ctx, "ws" + address[1:])
|
|
else:
|
|
ctx.handle_connection_loss(f"Lost connection to the multiworld server due to InvalidMessage"
|
|
f"{reconnect_hint()}")
|
|
except ConnectionRefusedError:
|
|
ctx.handle_connection_loss("Connection refused by the server. "
|
|
"May not be running Archipelago on that address or port.")
|
|
except websockets.InvalidURI:
|
|
ctx.handle_connection_loss("Failed to connect to the multiworld server (invalid URI)")
|
|
except OSError:
|
|
ctx.handle_connection_loss("Failed to connect to the multiworld server")
|
|
except Exception:
|
|
ctx.handle_connection_loss(f"Lost connection to the multiworld server{reconnect_hint()}")
|
|
finally:
|
|
await ctx.connection_closed()
|
|
if ctx.server_address and ctx.username and not ctx.disconnected_intentionally:
|
|
logger.info(f"... automatically reconnecting in {ctx.current_reconnect_delay} seconds")
|
|
assert ctx.autoreconnect_task is None
|
|
ctx.autoreconnect_task = asyncio.create_task(server_autoreconnect(ctx), name="server auto reconnect")
|
|
ctx.current_reconnect_delay *= 2
|
|
|
|
|
|
async def server_autoreconnect(ctx: CommonContext):
|
|
await asyncio.sleep(ctx.current_reconnect_delay)
|
|
if ctx.server_address and ctx.server_task is None:
|
|
ctx.server_task = asyncio.create_task(server_loop(ctx), name="server loop")
|
|
|
|
|
|
async def process_server_cmd(ctx: CommonContext, args: dict):
|
|
try:
|
|
cmd = args["cmd"]
|
|
except:
|
|
logger.exception(f"Could not get command from {args}")
|
|
raise
|
|
if cmd == 'RoomInfo':
|
|
if ctx.seed_name and ctx.seed_name != args["seed_name"]:
|
|
msg = "The server is running a different multiworld than your client is. (invalid seed_name)"
|
|
logger.info(msg, extra={'compact_gui': True})
|
|
ctx.gui_error('Error', msg)
|
|
else:
|
|
logger.info('--------------------------------')
|
|
logger.info('Room Information:')
|
|
logger.info('--------------------------------')
|
|
version = args["version"]
|
|
ctx.server_version = Version(*version)
|
|
|
|
if "generator_version" in args:
|
|
ctx.generator_version = Version(*args["generator_version"])
|
|
logger.info(f'Server protocol version: {ctx.server_version.as_simple_string()}, '
|
|
f'generator version: {ctx.generator_version.as_simple_string()}, '
|
|
f'tags: {", ".join(args["tags"])}')
|
|
else:
|
|
logger.info(f'Server protocol version: {ctx.server_version.as_simple_string()}, '
|
|
f'tags: {", ".join(args["tags"])}')
|
|
if args['password']:
|
|
logger.info('Password required')
|
|
ctx.update_permissions(args.get("permissions", {}))
|
|
logger.info(
|
|
f"A !hint costs {args['hint_cost']}% of your total location count as points"
|
|
f" and you get {args['location_check_points']}"
|
|
f" for each location checked. Use !hint for more information.")
|
|
ctx.hint_cost = int(args['hint_cost'])
|
|
ctx.check_points = int(args['location_check_points'])
|
|
|
|
if "players" in args: # TODO remove when servers sending this are outdated
|
|
players = args.get("players", [])
|
|
if len(players) < 1:
|
|
logger.info('No player connected')
|
|
else:
|
|
players.sort()
|
|
current_team = -1
|
|
logger.info('Connected Players:')
|
|
for network_player in players:
|
|
if network_player.team != current_team:
|
|
logger.info(f' Team #{network_player.team + 1}')
|
|
current_team = network_player.team
|
|
logger.info(' %s (Player %d)' % (network_player.alias, network_player.slot))
|
|
|
|
# update data package
|
|
data_package_checksums = args.get("datapackage_checksums", {})
|
|
await ctx.prepare_data_package(set(args["games"]), data_package_checksums)
|
|
|
|
await ctx.server_auth(args['password'])
|
|
|
|
elif cmd == 'DataPackage':
|
|
ctx.consume_network_data_package(args['data'])
|
|
|
|
elif cmd == 'ConnectionRefused':
|
|
errors = args["errors"]
|
|
if 'InvalidSlot' in errors:
|
|
ctx.disconnected_intentionally = True
|
|
ctx.event_invalid_slot()
|
|
elif 'InvalidGame' in errors:
|
|
ctx.disconnected_intentionally = True
|
|
ctx.event_invalid_game()
|
|
elif 'IncompatibleVersion' in errors:
|
|
ctx.disconnected_intentionally = True
|
|
raise Exception('Server reported your client version as incompatible. '
|
|
'This probably means you have to update.')
|
|
elif 'InvalidItemsHandling' in errors:
|
|
raise Exception('The item handling flags requested by the client are not supported')
|
|
# last to check, recoverable problem
|
|
elif 'InvalidPassword' in errors:
|
|
logger.error('Invalid password')
|
|
ctx.password = None
|
|
await ctx.server_auth(True)
|
|
elif errors:
|
|
raise Exception("Unknown connection errors: " + str(errors))
|
|
else:
|
|
raise Exception('Connection refused by the multiworld host, no reason provided')
|
|
|
|
elif cmd == 'Connected':
|
|
ctx.username = ctx.auth
|
|
ctx.team = args["team"]
|
|
ctx.slot = args["slot"]
|
|
# int keys get lost in JSON transfer
|
|
ctx.slot_info = {0: NetworkSlot("Archipelago", "Archipelago", SlotType.player)}
|
|
ctx.slot_info.update({int(pid): data for pid, data in args["slot_info"].items()})
|
|
ctx.hint_points = args.get("hint_points", 0)
|
|
ctx.consume_players_package(args["players"])
|
|
ctx.stored_data_notification_keys.add(f"_read_hints_{ctx.team}_{ctx.slot}")
|
|
msgs = []
|
|
if ctx.locations_checked:
|
|
msgs.append({"cmd": "LocationChecks",
|
|
"locations": list(ctx.locations_checked)})
|
|
if ctx.locations_scouted:
|
|
msgs.append({"cmd": "LocationScouts",
|
|
"locations": list(ctx.locations_scouted)})
|
|
if ctx.stored_data_notification_keys:
|
|
msgs.append({"cmd": "Get",
|
|
"keys": list(ctx.stored_data_notification_keys)})
|
|
msgs.append({"cmd": "SetNotify",
|
|
"keys": list(ctx.stored_data_notification_keys)})
|
|
if msgs:
|
|
await ctx.send_msgs(msgs)
|
|
if ctx.finished_game:
|
|
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
|
|
|
|
# Get the server side view of missing as of time of connecting.
|
|
# This list is used to only send to the server what is reported as ACTUALLY Missing.
|
|
# This also serves to allow an easy visual of what locations were already checked previously
|
|
# when /missing is used for the client side view of what is missing.
|
|
ctx.missing_locations = set(args["missing_locations"])
|
|
ctx.checked_locations = set(args["checked_locations"])
|
|
ctx.server_locations = ctx.missing_locations | ctx. checked_locations
|
|
|
|
server_url = urllib.parse.urlparse(ctx.server_address)
|
|
Utils.persistent_store("client", "last_server_address", server_url.netloc)
|
|
|
|
elif cmd == 'ReceivedItems':
|
|
start_index = args["index"]
|
|
|
|
if start_index == 0:
|
|
ctx.items_received = []
|
|
elif start_index != len(ctx.items_received):
|
|
sync_msg = [{'cmd': 'Sync'}]
|
|
if ctx.locations_checked:
|
|
sync_msg.append({"cmd": "LocationChecks",
|
|
"locations": list(ctx.locations_checked)})
|
|
await ctx.send_msgs(sync_msg)
|
|
if start_index == len(ctx.items_received):
|
|
for item in args['items']:
|
|
ctx.items_received.append(NetworkItem(*item))
|
|
ctx.watcher_event.set()
|
|
|
|
elif cmd == 'LocationInfo':
|
|
for item in [NetworkItem(*item) for item in args['locations']]:
|
|
ctx.locations_info[item.location] = item
|
|
ctx.watcher_event.set()
|
|
|
|
elif cmd == "RoomUpdate":
|
|
if "players" in args:
|
|
ctx.consume_players_package(args["players"])
|
|
if "hint_points" in args:
|
|
ctx.hint_points = args['hint_points']
|
|
if "checked_locations" in args:
|
|
checked = set(args["checked_locations"])
|
|
ctx.checked_locations |= checked
|
|
ctx.missing_locations -= checked
|
|
if "permissions" in args:
|
|
ctx.update_permissions(args["permissions"])
|
|
|
|
elif cmd == 'Print':
|
|
ctx.on_print(args)
|
|
|
|
elif cmd == 'PrintJSON':
|
|
ctx.on_print_json(args)
|
|
|
|
elif cmd == 'InvalidPacket':
|
|
logger.warning(f"Invalid Packet of {args['type']}: {args['text']}")
|
|
|
|
elif cmd == "Bounced":
|
|
tags = args.get("tags", [])
|
|
# we can skip checking "DeathLink" in ctx.tags, as otherwise we wouldn't have been send this
|
|
if "DeathLink" in tags and ctx.last_death_link != args["data"]["time"]:
|
|
ctx.on_deathlink(args["data"])
|
|
|
|
elif cmd == "Retrieved":
|
|
ctx.stored_data.update(args["keys"])
|
|
if ctx.ui and f"_read_hints_{ctx.team}_{ctx.slot}" in args["keys"]:
|
|
ctx.ui.update_hints()
|
|
|
|
elif cmd == "SetReply":
|
|
ctx.stored_data[args["key"]] = args["value"]
|
|
if ctx.ui and f"_read_hints_{ctx.team}_{ctx.slot}" == args["key"]:
|
|
ctx.ui.update_hints()
|
|
elif args["key"].startswith("EnergyLink"):
|
|
ctx.current_energy_link_value = args["value"]
|
|
if ctx.ui:
|
|
ctx.ui.set_new_energy_link_value()
|
|
else:
|
|
logger.debug(f"unknown command {cmd}")
|
|
|
|
ctx.on_package(cmd, args)
|
|
|
|
|
|
async def console_loop(ctx: CommonContext):
|
|
commandprocessor = ctx.command_processor(ctx)
|
|
queue = asyncio.Queue()
|
|
stream_input(sys.stdin, queue)
|
|
while not ctx.exit_event.is_set():
|
|
try:
|
|
input_text = await queue.get()
|
|
queue.task_done()
|
|
|
|
if ctx.input_requests > 0:
|
|
ctx.input_requests -= 1
|
|
ctx.input_queue.put_nowait(input_text)
|
|
continue
|
|
|
|
if input_text:
|
|
commandprocessor(input_text)
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
|
|
|
|
def get_base_parser(description: typing.Optional[str] = None):
|
|
"""Base argument parser to be reused for components subclassing off of CommonClient"""
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description=description)
|
|
parser.add_argument('--connect', default=None, help='Address of the multiworld host.')
|
|
parser.add_argument('--password', default=None, help='Password of the multiworld host.')
|
|
if sys.stdout: # If terminal output exists, offer gui-less mode
|
|
parser.add_argument('--nogui', default=False, action='store_true', help="Turns off Client GUI.")
|
|
return parser
|
|
|
|
|
|
def handle_url_arg(args: "argparse.Namespace",
|
|
parser: "typing.Optional[argparse.ArgumentParser]" = None) -> "argparse.Namespace":
|
|
"""
|
|
Parse the url arg "archipelago://name:pass@host:port" from launcher into correct launch args for CommonClient
|
|
If alternate data is required the urlparse response is saved back to args.url if valid
|
|
"""
|
|
if not args.url:
|
|
return args
|
|
|
|
url = urllib.parse.urlparse(args.url)
|
|
if url.scheme != "archipelago":
|
|
if not parser:
|
|
parser = get_base_parser()
|
|
parser.error(f"bad url, found {args.url}, expected url in form of archipelago://archipelago.gg:38281")
|
|
return args
|
|
|
|
args.url = url
|
|
args.connect = url.netloc
|
|
if url.username:
|
|
args.name = urllib.parse.unquote(url.username)
|
|
if url.password:
|
|
args.password = urllib.parse.unquote(url.password)
|
|
|
|
return args
|
|
|
|
|
|
def run_as_textclient(*args):
|
|
class TextContext(CommonContext):
|
|
# Text Mode to use !hint and such with games that have no text entry
|
|
tags = CommonContext.tags | {"TextOnly"}
|
|
game = "" # empty matches any game since 0.3.2
|
|
items_handling = 0b111 # receive all items for /received
|
|
want_slot_data = False # Can't use game specific slot_data
|
|
|
|
async def server_auth(self, password_requested: bool = False):
|
|
if password_requested and not self.password:
|
|
await super(TextContext, self).server_auth(password_requested)
|
|
await self.get_username()
|
|
await self.send_connect(game="")
|
|
|
|
def on_package(self, cmd: str, args: dict):
|
|
if cmd == "Connected":
|
|
self.game = self.slot_info[self.slot].game
|
|
|
|
async def disconnect(self, allow_autoreconnect: bool = False):
|
|
self.game = ""
|
|
await super().disconnect(allow_autoreconnect)
|
|
|
|
async def main(args):
|
|
ctx = TextContext(args.connect, args.password)
|
|
ctx.auth = args.name
|
|
ctx.server_task = asyncio.create_task(server_loop(ctx), name="server loop")
|
|
|
|
if gui_enabled:
|
|
ctx.run_gui()
|
|
ctx.run_cli()
|
|
|
|
await ctx.exit_event.wait()
|
|
await ctx.shutdown()
|
|
|
|
import colorama
|
|
|
|
parser = get_base_parser(description="Gameless Archipelago Client, for text interfacing.")
|
|
parser.add_argument('--name', default=None, help="Slot Name to connect as.")
|
|
parser.add_argument("url", nargs="?", help="Archipelago connection url")
|
|
args = parser.parse_args(args)
|
|
|
|
args = handle_url_arg(args, parser=parser)
|
|
|
|
# use colorama to display colored text highlighting on windows
|
|
colorama.just_fix_windows_console()
|
|
|
|
asyncio.run(main(args))
|
|
colorama.deinit()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
logging.getLogger().setLevel(logging.INFO) # force log-level to work around log level resetting to WARNING
|
|
run_as_textclient(*sys.argv[1:]) # default value for parse_args
|