From 8b8e2790158c91d46f840511ed0ba52832afa781 Mon Sep 17 00:00:00 2001 From: Fabian Dill Date: Mon, 13 Apr 2020 11:26:50 +0200 Subject: [PATCH] server command processor some commands were renamed at this opportunity --- MultiServer.py | 214 ++++++++++++++++++++++++++++++------------------- 1 file changed, 133 insertions(+), 81 deletions(-) diff --git a/MultiServer.py b/MultiServer.py index 79778564..1fdd90ec 100644 --- a/MultiServer.py +++ b/MultiServer.py @@ -6,6 +6,7 @@ import logging import zlib import collections import typing +import inspect import ModuleUpdate @@ -66,6 +67,7 @@ class Context: self.hints_used = collections.defaultdict(int) self.hints_sent = collections.defaultdict(set) self.item_cheat = item_cheat + self.running = True def get_save(self) -> dict: return { @@ -124,7 +126,7 @@ def notify_client(client: Client, text: str): asyncio.create_task(send_msgs(client.socket, [['Print', text]])) -# separated out, due to compatibilty between client's +# separated out, due to compatibilty between clients def notify_hints(ctx: Context, team: int, hints: typing.List[Utils.Hint]): cmd = [["Hint", hints]] texts = [['Print', format_hint(ctx, team, hint)] for hint in hints] @@ -499,94 +501,145 @@ async def process_client_cmd(ctx: Context, client: Client, cmd, args): notify_client(client, response) -def set_password(ctx : Context, password): +def set_password(ctx: Context, password): ctx.password = password logging.warning('Password set to ' + password if password else 'Password disabled') +class CommandProcessor(): + commands: typing.Dict[str, typing.Callable] + + def __init__(self): + self.commands = {name[5:].lower(): method for name, method in inspect.getmembers(self) if + name.startswith("_cmd_")} + + def output(self, text: str): + print(text) + + def __call__(self, raw: str): + if not raw: + return + command = raw.split() + basecommand = command[0] + if basecommand[0] == "/": + method = self.commands.get(basecommand[1:].lower(), None) + if not method: + self._error_unknown_command(basecommand[1:]) + else: + method(*command[1:]) + else: + self.default(raw) + + def get_help_text(self) -> str: + s = "" + for command, method in self.commands.items(): + spec = inspect.signature(method).parameters + s += f"/{command} {' '.join(spec)}\n {method.__doc__}\n" + return s + + def _cmd_help(self): + """Returns the help listing""" + self.output(self.get_help_text()) + + def default(self, raw: str): + self.output("Echo: " + raw) + + def _error_unknown_command(self, raw: str): + self.output(f"Could not find command {raw}. Known commands: {', '.join(self.commands)}") + + +class ServerCommandProcessor(CommandProcessor): + ctx: Context + + def __init__(self, ctx: Context): + self.ctx = ctx + super(ServerCommandProcessor, self).__init__() + + def default(self, raw: str): + notify_all(self.ctx, '[Server]: ' + raw) + + def _cmd_kick(self, player_name: str): + """Kick specified player from the server""" + for client in self.ctx.clients: + if client.auth and client.name.lower() == player_name.lower() and client.socket and not client.socket.closed: + asyncio.create_task(client.socket.close()) + self.output(f"Kicked {client.name}") + break + else: + self.output("Could not find player to kick") + + def _cmd_players(self): + """Get information about connected players""" + self.output(get_connected_players_string(self.ctx)) + + def _cmd_exit(self): + """Shutdown the server""" + asyncio.create_task(self.ctx.server.ws_server._close()) + self.ctx.running = False + + def _cmd_password(self, new_password: str = ""): + """Set the server password. Leave the password text empty to remove the password""" + set_password(self.ctx, new_password if new_password else None) + + def _cmd_forfeit(self, player_name: str): + """Send out the remaining items from a player's game to their intended recipients""" + seeked_player = player_name.lower() + for (team, slot), name in self.ctx.player_names.items(): + if name.lower() == seeked_player: + forfeit_player(self.ctx, team, slot) + break + else: + self.output("Could not find player to forfeit") + + def _cmd_send(self, player_name: str, *item_name: str): + """Sends an item to the specified player""" + seeked_player, usable, response = get_intended_text(player_name, self.ctx.player_names.values()) + if usable: + item = " ".join(item_name) + item, usable, response = get_intended_text(item, Items.item_table.keys()) + if usable: + for client in self.ctx.clients: + if client.name == seeked_player: + new_item = ReceivedItem(Items.item_table[item][3], -1, client.slot) + get_received_items(self.ctx, client.team, client.slot).append(new_item) + notify_all(self.ctx, 'Cheat console: sending "' + item + '" to ' + client.name) + send_new_items(self.ctx) + return + else: + self.output(response) + else: + self.output(response) + + def _cmd_hint(self, player_name: str, *item_or_location: str): + """Send out a hint for a player's item or location to their team""" + seeked_player, usable, response = get_intended_text(player_name, self.ctx.player_names.values()) + if usable: + for (team, slot), name in self.ctx.player_names.items(): + if name == seeked_player: + item = " ".join(item_or_location) + item, usable, response = get_intended_text(item) + if usable: + if item in Items.item_table: # item name + hints = collect_hints(self.ctx, team, slot, item) + notify_hints(self.ctx, team, hints) + else: # location name + hints = collect_hints_location(self.ctx, team, slot, item) + notify_hints(self.ctx, team, hints) + else: + self.output(response) + return + else: + self.output(response) + + async def console(ctx: Context): session = prompt_toolkit.PromptSession() - running = True - while running: + cmd_processor = ServerCommandProcessor(ctx) + while ctx.running: with patch_stdout(): input_text = await session.prompt_async() try: - - command = input_text.split() - if not command: - continue - - if command[0] == '/exit': - await ctx.server.ws_server._close() - running = False - - if command[0] == '/players': - logging.info(get_connected_players_string(ctx)) - if command[0] == '/password': - set_password(ctx, command[1] if len(command) > 1 else None) - if command[0] == '/kick' and len(command) > 1: - team = int(command[2]) - 1 if len(command) > 2 and command[2].isdigit() else None - for client in ctx.clients: - if client.auth and client.name.lower() == command[1].lower() and (team is None or team == client.team): - if client.socket and not client.socket.closed: - await client.socket.close() - - if command[0] == '/forfeitslot' and len(command) > 1 and command[1].isdigit(): - if len(command) > 2 and command[2].isdigit(): - team = int(command[1]) - 1 - slot = int(command[2]) - else: - team = 0 - slot = int(command[1]) - forfeit_player(ctx, team, slot) - if command[0] == '/forfeitplayer' and len(command) > 1: - seeked_player = command[1].lower() - for (team, slot), name in ctx.player_names.items(): - if name.lower() == seeked_player: - forfeit_player(ctx, team, slot) - if command[0] == '/senditem': - if len(command) <= 2: - logging.info("Use /senditem {Playername} {itemname}\nFor example /senditem Berserker Lamp") - else: - seeked_player, usable, response = get_intended_text(command[1], ctx.player_names.values()) - if usable: - item = " ".join(command[2:]) - item, usable, response = get_intended_text(item, Items.item_table.keys()) - if usable: - for client in ctx.clients: - if client.name == seeked_player: - new_item = ReceivedItem(Items.item_table[item][3], -1, client.slot) - get_received_items(ctx, client.team, client.slot).append(new_item) - notify_all(ctx, 'Cheat console: sending "' + item + '" to ' + client.name) - send_new_items(ctx) - else: - logging.warning(response) - else: - logging.warning(response) - if command[0] == '/hint': - if len(command) <= 2: - logging.info("Use /hint {Playername} {itemname/locationname}\nFor example /hint Berserker Lamp") - else: - seeked_player, usable, response = get_intended_text(command[1], ctx.player_names.values()) - if usable: - for (team, slot), name in ctx.player_names.items(): - if name == seeked_player: - item = " ".join(command[2:]) - item, usable, response = get_intended_text(item) - if usable: - if item in Items.item_table: #item name - hints = collect_hints(ctx, team, slot, item) - notify_hints(ctx, team, hints) - else: #location name - hints = collect_hints_location(ctx, team, slot, item) - notify_hints(ctx, team, hints) - else: - logging.warning(response) - else: - logging.warning(response) - - if command[0][0] != '/': - notify_all(ctx, '[Server]: ' + input_text) + cmd_processor(input_text) except: import traceback traceback.print_exc() @@ -702,4 +755,3 @@ async def main(args: argparse.Namespace): if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main(parse_args())) - loop.close()