mirror of
				https://github.com/MarioSpore/Grinch-AP.git
				synced 2025-10-21 20:21:32 -06:00 
			
		
		
		
	
		
			
				
	
	
		
			391 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			391 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import annotations
 | |
| 
 | |
| import typing
 | |
| 
 | |
| 
 | |
| def tuplize_version(version: str) -> typing.Tuple[int, ...]:
 | |
|     return Version(*(int(piece, 10) for piece in version.split(".")))
 | |
| 
 | |
| 
 | |
| class Version(typing.NamedTuple):
 | |
|     major: int
 | |
|     minor: int
 | |
|     build: int
 | |
| 
 | |
| __version__ = "0.0.1"
 | |
| _version_tuple = tuplize_version(__version__)
 | |
| 
 | |
| import builtins
 | |
| import os
 | |
| import subprocess
 | |
| import sys
 | |
| import pickle
 | |
| import functools
 | |
| import io
 | |
| 
 | |
| from yaml import load, dump, safe_load
 | |
| 
 | |
| try:
 | |
|     from yaml import CLoader as Loader
 | |
| except ImportError:
 | |
|     from yaml import Loader
 | |
| 
 | |
| 
 | |
| def int16_as_bytes(value):
 | |
|     value = value & 0xFFFF
 | |
|     return [value & 0xFF, (value >> 8) & 0xFF]
 | |
| 
 | |
| 
 | |
| def int32_as_bytes(value):
 | |
|     value = value & 0xFFFFFFFF
 | |
|     return [value & 0xFF, (value >> 8) & 0xFF, (value >> 16) & 0xFF, (value >> 24) & 0xFF]
 | |
| 
 | |
| 
 | |
| def pc_to_snes(value):
 | |
|     return ((value << 1) & 0x7F0000) | (value & 0x7FFF) | 0x8000
 | |
| 
 | |
| 
 | |
| def snes_to_pc(value):
 | |
|     return ((value & 0x7F0000) >> 1) | (value & 0x7FFF)
 | |
| 
 | |
| 
 | |
| def parse_player_names(names, players, teams):
 | |
|     names = tuple(n for n in (n.strip() for n in names.split(",")) if n)
 | |
|     if len(names) != len(set(names)):
 | |
|         raise ValueError("Duplicate Player names is not supported.")
 | |
|     ret = []
 | |
|     while names or len(ret) < teams:
 | |
|         team = [n[:16] for n in names[:players]]
 | |
|         # 16 bytes in rom per player, which will map to more in unicode, but those characters later get filtered
 | |
|         while len(team) != players:
 | |
|             team.append(f"Player{len(team) + 1}")
 | |
|         ret.append(team)
 | |
| 
 | |
|         names = names[players:]
 | |
|     return ret
 | |
| 
 | |
| 
 | |
| def is_bundled() -> bool:
 | |
|     return getattr(sys, 'frozen', False)
 | |
| 
 | |
| 
 | |
| def local_path(*path):
 | |
|     if local_path.cached_path:
 | |
|         return os.path.join(local_path.cached_path, *path)
 | |
| 
 | |
|     elif is_bundled():
 | |
|         if hasattr(sys, "_MEIPASS"):
 | |
|             # we are running in a PyInstaller bundle
 | |
|             local_path.cached_path = sys._MEIPASS  # pylint: disable=protected-access,no-member
 | |
|         else:
 | |
|             # cx_Freeze
 | |
|             local_path.cached_path = os.path.dirname(os.path.abspath(sys.argv[0]))
 | |
|     else:
 | |
|         # we are running in a normal Python environment
 | |
|         import __main__
 | |
|         local_path.cached_path = os.path.dirname(os.path.abspath(__main__.__file__))
 | |
| 
 | |
|     return os.path.join(local_path.cached_path, *path)
 | |
| 
 | |
| 
 | |
| local_path.cached_path = None
 | |
| 
 | |
| 
 | |
| def output_path(*path):
 | |
|     if output_path.cached_path:
 | |
|         return os.path.join(output_path.cached_path, *path)
 | |
|     output_path.cached_path = local_path(get_options()["general_options"]["output_path"])
 | |
|     path = os.path.join(output_path.cached_path, *path)
 | |
|     os.makedirs(os.path.dirname(path), exist_ok=True)
 | |
|     return path
 | |
| 
 | |
| 
 | |
| output_path.cached_path = None
 | |
| 
 | |
| 
 | |
| def open_file(filename):
 | |
|     if sys.platform == 'win32':
 | |
|         os.startfile(filename)
 | |
|     else:
 | |
|         open_command = 'open' if sys.platform == 'darwin' else 'xdg-open'
 | |
|         subprocess.call([open_command, filename])
 | |
| 
 | |
| 
 | |
| def close_console():
 | |
|     if sys.platform == 'win32':
 | |
|         # windows
 | |
|         import ctypes.wintypes
 | |
|         try:
 | |
|             ctypes.windll.kernel32.FreeConsole()
 | |
|         except Exception:
 | |
|             pass
 | |
| 
 | |
| 
 | |
| parse_yaml = safe_load
 | |
| unsafe_parse_yaml = functools.partial(load, Loader=Loader)
 | |
| 
 | |
| 
 | |
| class Hint(typing.NamedTuple):
 | |
|     receiving_player: int
 | |
|     finding_player: int
 | |
|     location: int
 | |
|     item: int
 | |
|     found: bool
 | |
|     entrance: str = ""
 | |
| 
 | |
|     def re_check(self, ctx, team) -> Hint:
 | |
|         if self.found:
 | |
|             return self
 | |
|         found = self.location in ctx.location_checks[team, self.finding_player]
 | |
|         if found:
 | |
|             return Hint(self.receiving_player, self.finding_player, self.location, self.item, found, self.entrance)
 | |
|         return self
 | |
| 
 | |
|     def as_legacy(self) -> tuple:
 | |
|         return self.receiving_player, self.finding_player, self.location, self.item, self.found
 | |
| 
 | |
|     def __hash__(self):
 | |
|         return hash((self.receiving_player, self.finding_player, self.location, self.item, self.entrance))
 | |
| 
 | |
| 
 | |
| def get_public_ipv4() -> str:
 | |
|     import socket
 | |
|     import urllib.request
 | |
|     import logging
 | |
|     ip = socket.gethostbyname(socket.gethostname())
 | |
|     try:
 | |
|         ip = urllib.request.urlopen('https://checkip.amazonaws.com/').read().decode('utf8').strip()
 | |
|     except Exception as e:
 | |
|         try:
 | |
|             ip = urllib.request.urlopen('https://v4.ident.me').read().decode('utf8').strip()
 | |
|         except:
 | |
|             logging.exception(e)
 | |
|             pass  # we could be offline, in a local game, so no point in erroring out
 | |
|     return ip
 | |
| 
 | |
| 
 | |
| def get_public_ipv6() -> str:
 | |
|     import socket
 | |
|     import urllib.request
 | |
|     import logging
 | |
|     ip = socket.gethostbyname(socket.gethostname())
 | |
|     try:
 | |
|         ip = urllib.request.urlopen('https://v6.ident.me').read().decode('utf8').strip()
 | |
|     except Exception as e:
 | |
|         logging.exception(e)
 | |
|         pass  # we could be offline, in a local game, or ipv6 may not be available
 | |
|     return ip
 | |
| 
 | |
| 
 | |
| def get_default_options() -> dict:
 | |
|     if not hasattr(get_default_options, "options"):
 | |
|         # Refer to host.yaml for comments as to what all these options mean.
 | |
|         options = {
 | |
|             "general_options": {
 | |
|                 "rom_file": "Zelda no Densetsu - Kamigami no Triforce (Japan).sfc",
 | |
|                 "qusb2snes": "QUsb2Snes\\QUsb2Snes.exe",
 | |
|                 "rom_start": True,
 | |
|                 "output_path": "output",
 | |
|             },
 | |
|             "server_options": {
 | |
|                 "host": None,
 | |
|                 "port": 38281,
 | |
|                 "password": None,
 | |
|                 "multidata": None,
 | |
|                 "savefile": None,
 | |
|                 "disable_save": False,
 | |
|                 "loglevel": "info",
 | |
|                 "server_password": None,
 | |
|                 "disable_item_cheat": False,
 | |
|                 "location_check_points": 1,
 | |
|                 "hint_cost": 1000,
 | |
|                 "forfeit_mode": "goal",
 | |
|                 "remaining_mode": "goal",
 | |
|                 "auto_shutdown": 0,
 | |
|                 "compatibility": 2,
 | |
|             },
 | |
|             "multi_mystery_options": {
 | |
|                 "teams": 1,
 | |
|                 "enemizer_path": "EnemizerCLI/EnemizerCLI.Core.exe",
 | |
|                 "player_files_path": "Players",
 | |
|                 "players": 0,
 | |
|                 "weights_file_path": "weights.yaml",
 | |
|                 "meta_file_path": "meta.yaml",
 | |
|                 "player_name": "",
 | |
|                 "create_spoiler": 1,
 | |
|                 "zip_roms": 0,
 | |
|                 "zip_diffs": 2,
 | |
|                 "zip_spoiler": 0,
 | |
|                 "zip_multidata": 1,
 | |
|                 "zip_format": 1,
 | |
|                 "race": 0,
 | |
|                 "cpu_threads": 0,
 | |
|                 "max_attempts": 0,
 | |
|                 "take_first_working": False,
 | |
|                 "keep_all_seeds": False,
 | |
|                 "log_output_path": "Output Logs",
 | |
|                 "log_level": None,
 | |
|                 "plando_options": "bosses",
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         get_default_options.options = options
 | |
|     return get_default_options.options
 | |
| 
 | |
| 
 | |
| blacklisted_options = {"multi_mystery_options.cpu_threads",
 | |
|                        "multi_mystery_options.max_attempts",
 | |
|                        "multi_mystery_options.take_first_working",
 | |
|                        "multi_mystery_options.keep_all_seeds",
 | |
|                        "multi_mystery_options.log_output_path",
 | |
|                        "multi_mystery_options.log_level"}
 | |
| 
 | |
| 
 | |
| def update_options(src: dict, dest: dict, filename: str, keys: list) -> dict:
 | |
|     import logging
 | |
|     for key, value in src.items():
 | |
|         new_keys = keys.copy()
 | |
|         new_keys.append(key)
 | |
|         option_name = '.'.join(new_keys)
 | |
|         if key not in dest:
 | |
|             dest[key] = value
 | |
|             if filename.endswith("options.yaml") and option_name not in blacklisted_options:
 | |
|                 logging.info(f"Warning: {filename} is missing {option_name}")
 | |
|         elif isinstance(value, dict):
 | |
|             if not isinstance(dest.get(key, None), dict):
 | |
|                 if filename.endswith("options.yaml") and option_name not in blacklisted_options:
 | |
|                     logging.info(f"Warning: {filename} has {option_name}, but it is not a dictionary. overwriting.")
 | |
|                 dest[key] = value
 | |
|             else:
 | |
|                 dest[key] = update_options(value, dest[key], filename, new_keys)
 | |
|     return dest
 | |
| 
 | |
| 
 | |
| def get_options() -> dict:
 | |
|     if not hasattr(get_options, "options"):
 | |
|         locations = ("options.yaml", "host.yaml",
 | |
|                      local_path("options.yaml"), local_path("host.yaml"))
 | |
| 
 | |
|         for location in locations:
 | |
|             if os.path.exists(location):
 | |
|                 with open(location) as f:
 | |
|                     options = parse_yaml(f.read())
 | |
| 
 | |
|                 get_options.options = update_options(get_default_options(), options, location, list())
 | |
|                 break
 | |
|         else:
 | |
|             raise FileNotFoundError(f"Could not find {locations[1]} to load options.")
 | |
|     return get_options.options
 | |
| 
 | |
| 
 | |
| def get_item_name_from_id(code):
 | |
|     from worlds import lookup_any_item_id_to_name
 | |
|     return lookup_any_item_id_to_name.get(code, f'Unknown item (ID:{code})')
 | |
| 
 | |
| 
 | |
| def get_location_name_from_address(address):
 | |
|     from worlds import lookup_any_location_id_to_name
 | |
|     return lookup_any_location_id_to_name.get(address, f'Unknown location (ID:{address})')
 | |
| 
 | |
| 
 | |
| def persistent_store(category, key, value):
 | |
|     path = local_path("_persistent_storage.yaml")
 | |
|     storage: dict = persistent_load()
 | |
|     category = storage.setdefault(category, {})
 | |
|     category[key] = value
 | |
|     with open(path, "wt") as f:
 | |
|         f.write(dump(storage))
 | |
| 
 | |
| 
 | |
| def persistent_load() -> typing.Dict[dict]:
 | |
|     storage = getattr(persistent_load, "storage", None)
 | |
|     if storage:
 | |
|         return storage
 | |
|     path = local_path("_persistent_storage.yaml")
 | |
|     storage: dict = {}
 | |
|     if os.path.exists(path):
 | |
|         try:
 | |
|             with open(path, "r") as f:
 | |
|                 storage = unsafe_parse_yaml(f.read())
 | |
|         except Exception as e:
 | |
|             import logging
 | |
|             logging.debug(f"Could not read store: {e}")
 | |
|     if storage is None:
 | |
|         storage = {}
 | |
|     persistent_load.storage = storage
 | |
|     return storage
 | |
| 
 | |
| 
 | |
| def get_adjuster_settings(romfile: str) -> typing.Tuple[str, bool]:
 | |
|     if hasattr(get_adjuster_settings, "adjuster_settings"):
 | |
|         adjuster_settings = getattr(get_adjuster_settings, "adjuster_settings")
 | |
|     else:
 | |
|         adjuster_settings = persistent_load().get("adjuster", {}).get("last_settings_3", {})
 | |
| 
 | |
|     if adjuster_settings:
 | |
|         import pprint
 | |
|         import Patch
 | |
|         adjuster_settings.rom = romfile
 | |
|         adjuster_settings.baserom = Patch.get_base_rom_path()
 | |
|         whitelist = {"disablemusic", "fastmenu", "heartbeep", "heartcolor", "ow_palettes", "quickswap",
 | |
|                      "uw_palettes", "sprite"}
 | |
|         printed_options = {name: value for name, value in vars(adjuster_settings).items() if name in whitelist}
 | |
| 
 | |
|         if hasattr(get_adjuster_settings, "adjust_wanted"):
 | |
|             adjust_wanted = getattr(get_adjuster_settings, "adjust_wanted")
 | |
|         elif persistent_load().get("adjuster", {}).get("never_adjust", False):  # never adjust, per user request
 | |
|             return romfile, False
 | |
|         else:
 | |
|             adjust_wanted = input(f"Last used adjuster settings were found. Would you like to apply these? \n"
 | |
|                                   f"{pprint.pformat(printed_options)}\n"
 | |
|                                   f"Enter yes, no or never: ")
 | |
|         if adjust_wanted and adjust_wanted.startswith("y"):
 | |
|             adjusted = True
 | |
|             import Adjuster
 | |
|             _, romfile = Adjuster.adjust(adjuster_settings)
 | |
|         elif adjust_wanted and "never" in adjust_wanted:
 | |
|             persistent_store("adjuster", "never_adjust", True)
 | |
|             return romfile, False
 | |
|         else:
 | |
|             adjusted = False
 | |
|             import logging
 | |
|             if not hasattr(get_adjuster_settings, "adjust_wanted"):
 | |
|                 logging.info(f"Skipping post-patch adjustment")
 | |
|         get_adjuster_settings.adjuster_settings = adjuster_settings
 | |
|         get_adjuster_settings.adjust_wanted = adjust_wanted
 | |
|         return romfile, adjusted
 | |
|     return romfile, False
 | |
| 
 | |
| 
 | |
| def get_unique_identifier():
 | |
|     uuid = persistent_load().get("client", {}).get("uuid", None)
 | |
|     if uuid:
 | |
|         return uuid
 | |
| 
 | |
|     import uuid
 | |
|     uuid = uuid.getnode()
 | |
|     persistent_store("client", "uuid", uuid)
 | |
|     return uuid
 | |
| 
 | |
| 
 | |
| safe_builtins = {
 | |
|     'set',
 | |
|     'frozenset',
 | |
| }
 | |
| 
 | |
| 
 | |
| class RestrictedUnpickler(pickle.Unpickler):
 | |
|     def find_class(self, module, name):
 | |
|         if module == "builtins" and name in safe_builtins:
 | |
|             return getattr(builtins, name)
 | |
|         if module == "NetUtils" and name in {"NetworkItem"}:
 | |
|             import NetUtils
 | |
|             return getattr(NetUtils, name)
 | |
|         # Forbid everything else.
 | |
|         raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
 | |
|                                      (module, name))
 | |
| 
 | |
| 
 | |
| def restricted_loads(s):
 | |
|     """Helper function analogous to pickle.loads()."""
 | |
|     return RestrictedUnpickler(io.BytesIO(s)).load() | 
