mirror of
https://github.com/MarioSpore/Grinch-AP.git
synced 2025-10-21 20:21:32 -06:00
106 lines
3.4 KiB
Python
106 lines
3.4 KiB
Python
![]() |
import asyncio
|
||
|
from logging import Logger
|
||
|
import socket
|
||
|
from typing import Any
|
||
|
|
||
|
ADDRESS = "127.0.0.1"
|
||
|
PORT = 4318
|
||
|
|
||
|
CLIENT_PREFIX = "APSTART:"
|
||
|
CLIENT_POSTFIX = ":APEND"
|
||
|
|
||
|
|
||
|
def decode_mixed_string(data: bytes) -> str:
|
||
|
return "".join(chr(b) if 32 <= b < 127 else "?" for b in data)
|
||
|
|
||
|
|
||
|
class TunerException(Exception):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class TunerTimeoutException(TunerException):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class TunerErrorException(TunerException):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class TunerConnectionException(TunerException):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class TunerClient:
|
||
|
"""Interfaces with Civilization via the tuner socket"""
|
||
|
logger: Logger
|
||
|
|
||
|
def __init__(self, logger: Logger):
|
||
|
self.logger = logger
|
||
|
|
||
|
def __parse_response(self, response: str) -> str:
|
||
|
"""Parses the response from the tuner socket"""
|
||
|
split = response.split(CLIENT_PREFIX)
|
||
|
if len(split) > 1:
|
||
|
start = split[1]
|
||
|
end = start.split(CLIENT_POSTFIX)[0]
|
||
|
return end
|
||
|
elif "ERR:" in response:
|
||
|
raise TunerErrorException(response.replace("?", ""))
|
||
|
else:
|
||
|
return ""
|
||
|
|
||
|
async def send_game_command(self, command_string: str, size: int = 64):
|
||
|
"""Small helper that prefixes a command with GameCore.Game."""
|
||
|
return await self.send_command("GameCore.Game." + command_string, size)
|
||
|
|
||
|
async def send_command(self, command_string: str, size: int = 64):
|
||
|
"""Send a raw commannd"""
|
||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
|
sock.setblocking(False)
|
||
|
|
||
|
b_command_string = command_string.encode("utf-8")
|
||
|
|
||
|
# Send data to the server
|
||
|
command_prefix = b"CMD:0:"
|
||
|
delimiter = b"\x00"
|
||
|
full_command = b_command_string
|
||
|
message = command_prefix + full_command + delimiter
|
||
|
message_length = len(message).to_bytes(1, byteorder="little")
|
||
|
|
||
|
# game expects this to be added before any command that is sent, indicates payload size
|
||
|
message_header = message_length + b"\x00\x00\x00\x03\x00\x00\x00"
|
||
|
data = message_header + command_prefix + full_command + delimiter
|
||
|
|
||
|
server_address = (ADDRESS, PORT)
|
||
|
loop = asyncio.get_event_loop()
|
||
|
try:
|
||
|
await loop.sock_connect(sock, server_address)
|
||
|
await loop.sock_sendall(sock, data)
|
||
|
|
||
|
# Add a delay before receiving data
|
||
|
await asyncio.sleep(.02)
|
||
|
|
||
|
received_data = await self.async_recv(sock)
|
||
|
response = decode_mixed_string(received_data)
|
||
|
return self.__parse_response(response)
|
||
|
|
||
|
except socket.timeout:
|
||
|
self.logger.debug("Timeout occurred while receiving data")
|
||
|
raise TunerTimeoutException()
|
||
|
except Exception as e:
|
||
|
self.logger.debug(f"Error occurred while receiving data: {str(e)}")
|
||
|
# check if No connection could be made is present in the error message
|
||
|
connection_errors = [
|
||
|
"The remote computer refused the network connection",
|
||
|
]
|
||
|
if any(error in str(e) for error in connection_errors):
|
||
|
raise TunerConnectionException(e)
|
||
|
else:
|
||
|
raise TunerErrorException(e)
|
||
|
finally:
|
||
|
sock.close()
|
||
|
|
||
|
async def async_recv(self, sock: Any, timeout: float = 2.0, size: int = 4096):
|
||
|
response = await asyncio.wait_for(asyncio.get_event_loop().sock_recv(sock, size), timeout)
|
||
|
return response
|