247 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			247 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import annotations
 | |
| 
 | |
| __version__ = "2.2.1"
 | |
| _version_tuple = tuple(int(piece, 10) for piece in __version__.split("."))
 | |
| 
 | |
| import os
 | |
| import subprocess
 | |
| import sys
 | |
| import typing
 | |
| import functools
 | |
| 
 | |
| from yaml import load, dump
 | |
| 
 | |
| try:
 | |
|     from yaml import CLoader as Loader
 | |
| except ImportError:
 | |
|     from yaml import Loader
 | |
| 
 | |
| 
 | |
| def int16_as_bytes(value):
 | |
|     value = value & 0xFFFF
 | |
|     return [value & 0xFF, (value >> 8) & 0xFF]
 | |
| 
 | |
| 
 | |
| def int32_as_bytes(value):
 | |
|     value = value & 0xFFFFFFFF
 | |
|     return [value & 0xFF, (value >> 8) & 0xFF, (value >> 16) & 0xFF, (value >> 24) & 0xFF]
 | |
| 
 | |
| 
 | |
| def pc_to_snes(value):
 | |
|     return ((value<<1) & 0x7F0000)|(value & 0x7FFF)|0x8000
 | |
| 
 | |
| def snes_to_pc(value):
 | |
|     return ((value & 0x7F0000)>>1)|(value & 0x7FFF)
 | |
| 
 | |
| def parse_player_names(names, players, teams):
 | |
|     names = tuple(n for n in (n.strip() for n in names.split(",")) if n)
 | |
|     ret = []
 | |
|     while names or len(ret) < teams:
 | |
|         team = [n[:16] for n in names[:players]]
 | |
|         # where does the 16 character limit come from?
 | |
|         while len(team) != players:
 | |
|             team.append(f"Player{len(team) + 1}")
 | |
|         ret.append(team)
 | |
| 
 | |
|         names = names[players:]
 | |
|     return ret
 | |
| 
 | |
| def is_bundled():
 | |
|     return getattr(sys, 'frozen', False)
 | |
| 
 | |
| def local_path(path):
 | |
|     if local_path.cached_path:
 | |
|         return os.path.join(local_path.cached_path, path)
 | |
| 
 | |
|     elif is_bundled():
 | |
|         if hasattr(sys, "_MEIPASS"):
 | |
|             # we are running in a PyInstaller bundle
 | |
|             local_path.cached_path = sys._MEIPASS  # pylint: disable=protected-access,no-member
 | |
|         else:
 | |
|             # cx_Freeze
 | |
|             local_path.cached_path = os.path.dirname(os.path.abspath(sys.argv[0]))
 | |
|     else:
 | |
|         # we are running in a normal Python environment
 | |
|         import __main__
 | |
|         local_path.cached_path = os.path.dirname(os.path.abspath(__main__.__file__))
 | |
| 
 | |
|     return os.path.join(local_path.cached_path, path)
 | |
| 
 | |
| local_path.cached_path = None
 | |
| 
 | |
| def output_path(path):
 | |
|     if output_path.cached_path:
 | |
|         return os.path.join(output_path.cached_path, path)
 | |
| 
 | |
|     if not is_bundled() and not hasattr(sys, "_MEIPASS"):
 | |
|         # this should trigger if it's cx_freeze bundling
 | |
|         output_path.cached_path = '.'
 | |
|         return os.path.join(output_path.cached_path, path)
 | |
|     else:
 | |
|         # has been PyInstaller packaged, so cannot use CWD for output.
 | |
|         if sys.platform == 'win32':
 | |
|             # windows
 | |
|             import ctypes.wintypes
 | |
|             CSIDL_PERSONAL = 5  # My Documents
 | |
|             SHGFP_TYPE_CURRENT = 0  # Get current, not default value
 | |
| 
 | |
|             buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
 | |
|             ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_PERSONAL, None, SHGFP_TYPE_CURRENT, buf)
 | |
| 
 | |
|             documents = buf.value
 | |
| 
 | |
|         elif sys.platform == 'darwin':
 | |
|             from AppKit import NSSearchPathForDirectoriesInDomains # pylint: disable=import-error
 | |
|             # http://developer.apple.com/DOCUMENTATION/Cocoa/Reference/Foundation/Miscellaneous/Foundation_Functions/Reference/reference.html#//apple_ref/c/func/NSSearchPathForDirectoriesInDomains
 | |
|             NSDocumentDirectory = 9
 | |
|             NSUserDomainMask = 1
 | |
|             # True for expanding the tilde into a fully qualified path
 | |
|             documents = NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask, True)[0]
 | |
|         else:
 | |
|             raise NotImplementedError('Not supported yet')
 | |
| 
 | |
|         output_path.cached_path = os.path.join(documents, 'ALttPEntranceRandomizer')
 | |
|         if not os.path.exists(output_path.cached_path):
 | |
|             os.mkdir(output_path.cached_path)
 | |
|         return os.path.join(output_path.cached_path, path)
 | |
| 
 | |
| output_path.cached_path = None
 | |
| 
 | |
| def open_file(filename):
 | |
|     if sys.platform == 'win32':
 | |
|         os.startfile(filename)
 | |
|     else:
 | |
|         open_command = 'open' if sys.platform == 'darwin' else 'xdg-open'
 | |
|         subprocess.call([open_command, filename])
 | |
| 
 | |
| def close_console():
 | |
|     if sys.platform == 'win32':
 | |
|         #windows
 | |
|         import ctypes.wintypes
 | |
|         try:
 | |
|             ctypes.windll.kernel32.FreeConsole()
 | |
|         except Exception:
 | |
|             pass
 | |
| 
 | |
| def make_new_base2current(old_rom='Zelda no Densetsu - Kamigami no Triforce (Japan).sfc', new_rom='working.sfc'):
 | |
|     from collections import OrderedDict
 | |
|     import json
 | |
|     import hashlib
 | |
|     with open(old_rom, 'rb') as stream:
 | |
|         old_rom_data = bytearray(stream.read())
 | |
|     with open(new_rom, 'rb') as stream:
 | |
|         new_rom_data = bytearray(stream.read())
 | |
|     # extend to 2 mb
 | |
|     old_rom_data.extend(bytearray([0x00]) * (2097152 - len(old_rom_data)))
 | |
| 
 | |
|     out_data = OrderedDict()
 | |
|     for idx, old in enumerate(old_rom_data):
 | |
|         new = new_rom_data[idx]
 | |
|         if old != new:
 | |
|             out_data[idx] = [int(new)]
 | |
|     for offset in reversed(list(out_data.keys())):
 | |
|         if offset - 1 in out_data:
 | |
|             out_data[offset-1].extend(out_data.pop(offset))
 | |
|     with open('data/base2current.json', 'wt') as outfile:
 | |
|         json.dump([{key: value} for key, value in out_data.items()], outfile, separators=(",", ":"))
 | |
| 
 | |
|     basemd5 = hashlib.md5()
 | |
|     basemd5.update(new_rom_data)
 | |
|     return "New Rom Hash: " + basemd5.hexdigest()
 | |
| 
 | |
| 
 | |
| parse_yaml = functools.partial(load, Loader=Loader)
 | |
| 
 | |
| 
 | |
| class Hint(typing.NamedTuple):
 | |
|     receiving_player: int
 | |
|     finding_player: int
 | |
|     location: int
 | |
|     item: int
 | |
|     found: bool
 | |
|     entrance: str = ""
 | |
| 
 | |
|     def re_check(self, ctx, team) -> Hint:
 | |
|         if self.found:
 | |
|             return self
 | |
|         found = self.location in ctx.location_checks[team, self.finding_player]
 | |
|         if found:
 | |
|             return Hint(self.receiving_player, self.finding_player, self.location, self.item, found, self.entrance)
 | |
|         return self
 | |
| 
 | |
|     def as_legacy(self) -> tuple:
 | |
|         return self.receiving_player, self.finding_player, self.location, self.item, self.found
 | |
| 
 | |
|     def __hash__(self):
 | |
|         return hash((self.receiving_player, self.finding_player, self.location, self.item, self.entrance))
 | |
| 
 | |
| def get_public_ipv4() -> str:
 | |
|     import socket
 | |
|     import urllib.request
 | |
|     import logging
 | |
|     ip = socket.gethostbyname(socket.gethostname())
 | |
|     try:
 | |
|         ip = urllib.request.urlopen('https://checkip.amazonaws.com/').read().decode('utf8').strip()
 | |
|     except Exception as e:
 | |
|         try:
 | |
|             ip = urllib.request.urlopen('https://v4.ident.me').read().decode('utf8').strip()
 | |
|         except:
 | |
|             logging.exception(e)
 | |
|             pass  # we could be offline, in a local game, so no point in erroring out
 | |
|     return ip
 | |
| 
 | |
| 
 | |
| def get_options() -> dict:
 | |
|     if not hasattr(get_options, "options"):
 | |
|         locations = ("options.yaml", "host.yaml",
 | |
|                      local_path("options.yaml"), local_path("host.yaml"))
 | |
| 
 | |
|         for location in locations:
 | |
|             if os.path.exists(location):
 | |
|                 with open(location) as f:
 | |
|                     get_options.options = parse_yaml(f.read())
 | |
|                 break
 | |
|         else:
 | |
|             raise FileNotFoundError(f"Could not find {locations[1]} to load options.")
 | |
|     return get_options.options
 | |
| 
 | |
| 
 | |
| def get_item_name_from_id(code):
 | |
|     import Items
 | |
|     return Items.lookup_id_to_name.get(code, f'Unknown item (ID:{code})')
 | |
| 
 | |
| 
 | |
| def get_location_name_from_address(address):
 | |
|     import Regions
 | |
|     return Regions.lookup_id_to_name.get(address, f'Unknown location (ID:{address})')
 | |
| 
 | |
| 
 | |
| def persistent_store(category, key, value):
 | |
|     path = local_path("_persistent_storage.yaml")
 | |
|     storage: dict = persistent_load()
 | |
|     category = storage.setdefault(category, {})
 | |
|     category[key] = value
 | |
|     with open(path, "wt") as f:
 | |
|         f.write(dump(storage))
 | |
| 
 | |
| 
 | |
| def persistent_load() -> typing.Dict[dict]:
 | |
|     path = local_path("_persistent_storage.yaml")
 | |
|     storage: dict = {}
 | |
|     if os.path.exists(path):
 | |
|         try:
 | |
|             with open(path, "r") as f:
 | |
|                 storage = parse_yaml(f.read())
 | |
|         except Exception as e:
 | |
|             import logging
 | |
|             logging.debug(f"Could not read store: {e}")
 | |
|     if storage is None:
 | |
|         storage = {}
 | |
|     return storage
 | |
| 
 | |
| 
 | |
| class ReceivedItem(typing.NamedTuple):
 | |
|     item: int
 | |
|     location: int
 | |
|     player: int
 | 
