mirror of
				https://github.com/MarioSpore/Grinch-AP.git
				synced 2025-10-21 20:21:32 -06:00 
			
		
		
		
	
		
			
	
	
		
			773 lines
		
	
	
		
			29 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			773 lines
		
	
	
		
			29 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | """
 | ||
|  | Application settings / host.yaml interface using type hints. | ||
|  | This is different from player settings. | ||
|  | """
 | ||
|  | 
 | ||
|  | import os.path | ||
|  | import shutil | ||
|  | import sys | ||
|  | import typing | ||
|  | import warnings | ||
|  | from enum import IntEnum | ||
|  | from threading import Lock | ||
|  | from typing import cast, Any, BinaryIO, ClassVar, Dict, Iterator, List, Optional, TextIO, Tuple, Union, TypeVar | ||
|  | import os | ||
|  | 
 | ||
|  | __all__ = [ | ||
|  |     "get_settings", "fmt_doc", "no_gui", | ||
|  |     "Group", "Bool", "Path", "UserFilePath", "UserFolderPath", "LocalFilePath", "LocalFolderPath", | ||
|  |     "OptionalUserFilePath", "OptionalUserFolderPath", "OptionalLocalFilePath", "OptionalLocalFolderPath", | ||
|  |     "GeneralOptions", "ServerOptions", "GeneratorOptions", "SNIOptions", "Settings" | ||
|  | ] | ||
|  | 
 | ||
|  | no_gui = False | ||
|  | _world_settings_name_cache: Dict[str, str] = {}  # TODO: cache on disk and update when worlds change | ||
|  | _world_settings_name_cache_updated = False | ||
|  | _lock = Lock() | ||
|  | 
 | ||
|  | 
 | ||
|  | def _update_cache() -> None: | ||
|  |     """Load all worlds and update world_settings_name_cache""" | ||
|  |     global _world_settings_name_cache_updated | ||
|  |     if _world_settings_name_cache_updated: | ||
|  |         return | ||
|  | 
 | ||
|  |     try: | ||
|  |         from worlds.AutoWorld import AutoWorldRegister | ||
|  |         for world in AutoWorldRegister.world_types.values(): | ||
|  |             annotation = world.__annotations__.get("settings", None) | ||
|  |             if annotation is None or annotation == "ClassVar[Optional['Group']]": | ||
|  |                 continue | ||
|  |             _world_settings_name_cache[world.settings_key] = f"{world.__module__}.{world.__name__}" | ||
|  |     finally: | ||
|  |         _world_settings_name_cache_updated = True | ||
|  | 
 | ||
|  | 
 | ||
|  | def fmt_doc(cls: type, level: int) -> str: | ||
|  |     comment = cls.__doc__ | ||
|  |     assert comment, f"{cls} has no __doc__" | ||
|  |     indent = level * 2 * " " | ||
|  |     return "\n".join(map(lambda s: f"{indent}# {s}", filter(None, map(lambda s: s.strip(), comment.split("\n"))))) | ||
|  | 
 | ||
|  | 
 | ||
|  | class Group: | ||
|  |     _type_cache: ClassVar[Optional[Dict[str, Any]]] = None | ||
|  |     _dumping: bool = False | ||
|  |     _has_attr: bool = False | ||
|  |     _changed: bool = False | ||
|  | 
 | ||
|  |     def __getitem__(self, key: str) -> Any: | ||
|  |         try: | ||
|  |             return getattr(self, key) | ||
|  |         except NameError: | ||
|  |             raise KeyError(key) | ||
|  | 
 | ||
|  |     def __iter__(self) -> Iterator[str]: | ||
|  |         cls_members = dir(self.__class__) | ||
|  |         members = filter(lambda k: not k.startswith("_") and (k not in cls_members or k in self.__annotations__), | ||
|  |                          list(self.__annotations__) + | ||
|  |                          [name for name in dir(self) if name not in self.__annotations__]) | ||
|  |         return members.__iter__() | ||
|  | 
 | ||
|  |     def __contains__(self, key: str) -> bool: | ||
|  |         try: | ||
|  |             self._has_attr = True | ||
|  |             return hasattr(self, key) | ||
|  |         finally: | ||
|  |             self._has_attr = False | ||
|  | 
 | ||
|  |     def __setitem__(self, key: str, value: Any) -> None: | ||
|  |         setattr(self, key, value) | ||
|  | 
 | ||
|  |     def __getattribute__(self, item: str) -> Any: | ||
|  |         attr = super().__getattribute__(item) | ||
|  |         if isinstance(attr, Path) and not super().__getattribute__("_dumping"): | ||
|  |             if attr.required and not attr.exists() and not super().__getattribute__("_has_attr"): | ||
|  |                 # if a file is required, and the one from settings does not exist, ask the user to provide it | ||
|  |                 # unless we are dumping the settings, because that would ask for each entry | ||
|  |                 with _lock:  # lock to avoid opening multiple | ||
|  |                     new = None if no_gui else attr.browse() | ||
|  |                     if new is None: | ||
|  |                         raise FileNotFoundError(f"{attr} does not exist, but " | ||
|  |                                                 f"{self.__class__.__name__}.{item} is required") | ||
|  |                     setattr(self, item, new) | ||
|  |                     self._changed = True | ||
|  |                     attr = new | ||
|  |             # resolve the path immediately when accessing it | ||
|  |             return attr.__class__(attr.resolve()) | ||
|  |         return attr | ||
|  | 
 | ||
|  |     @property | ||
|  |     def changed(self) -> bool: | ||
|  |         return self._changed or any(map(lambda v: isinstance(v, Group) and v.changed, | ||
|  |                                         self.__dict__.values())) | ||
|  | 
 | ||
|  |     @classmethod | ||
|  |     def get_type_hints(cls) -> Dict[str, Any]: | ||
|  |         """Returns resolved type hints for the class""" | ||
|  |         if cls._type_cache is None: | ||
|  |             if not isinstance(next(iter(cls.__annotations__.values())), str): | ||
|  |                 # non-str: assume already resolved | ||
|  |                 cls._type_cache = cls.__annotations__ | ||
|  |             else: | ||
|  |                 # str: build dicts and resolve with eval | ||
|  |                 mod = sys.modules[cls.__module__]  # assume the module wasn't deleted | ||
|  |                 mod_dict = {k: getattr(mod, k) for k in dir(mod)} | ||
|  |                 cls._type_cache = typing.get_type_hints(cls, globalns=mod_dict, localns=cls.__dict__) | ||
|  |         return cls._type_cache | ||
|  | 
 | ||
|  |     def get(self, key: str, default: Any) -> Any: | ||
|  |         if key in self: | ||
|  |             return self[key] | ||
|  |         return default | ||
|  | 
 | ||
|  |     def items(self) -> List[Tuple[str, Any]]: | ||
|  |         return [(key, getattr(self, key)) for key in self] | ||
|  | 
 | ||
|  |     def update(self, dct: Dict[str, Any]) -> None: | ||
|  |         assert isinstance(dct, dict), f"{self.__class__.__name__}.update called with " \ | ||
|  |                                       f"{dct.__class__.__name__} instead of dict." | ||
|  | 
 | ||
|  |         for k in self.__annotations__: | ||
|  |             if not k.startswith("_") and k not in dct: | ||
|  |                 self._changed = True  # key missing from host.yaml | ||
|  | 
 | ||
|  |         for k, v in dct.items(): | ||
|  |             # don't do getattr to stay lazy with world group init/loading | ||
|  |             # instead we assign unknown groups as dicts and a later getattr will upcast them | ||
|  |             attr = self.__dict__[k] if k in self.__dict__ else \ | ||
|  |                 self.__class__.__dict__[k] if k in self.__class__.__dict__ else None | ||
|  |             if isinstance(attr, Group): | ||
|  |                 # update group | ||
|  |                 if k not in self.__dict__: | ||
|  |                     attr = attr.__class__()  # make a copy of default | ||
|  |                     setattr(self, k, attr) | ||
|  |                 attr.update(v) | ||
|  |             elif isinstance(attr, dict): | ||
|  |                 # update dict | ||
|  |                 if k not in self.__dict__: | ||
|  |                     attr = attr.copy()  # make a copy of default | ||
|  |                     setattr(self, k, attr) | ||
|  |                 attr.update(v) | ||
|  |             else: | ||
|  |                 # assign value, try to upcast to type hint | ||
|  |                 annotation = self.get_type_hints().get(k, None) | ||
|  |                 candidates = [] if annotation is None else \ | ||
|  |                     typing.get_args(annotation) if typing.get_origin(annotation) is Union else [annotation] | ||
|  |                 none_type = type(None) | ||
|  |                 for cls in candidates: | ||
|  |                     assert isinstance(cls, type), f"{self.__class__.__name__}.{k}: type {cls} not supported in settings" | ||
|  |                     if v is None and cls is none_type: | ||
|  |                         # assign None, i.e. from Optional | ||
|  |                         setattr(self, k, v) | ||
|  |                         break | ||
|  |                     if cls is bool and isinstance(v, bool): | ||
|  |                         # assign bool - special handling because issubclass(int, bool) is True | ||
|  |                         setattr(self, k, v) | ||
|  |                         break | ||
|  |                     if cls is not bool and issubclass(cls, type(v)): | ||
|  |                         # upcast, i.e. int -> IntEnum, str -> Path | ||
|  |                         setattr(self, k, cls.__call__(v)) | ||
|  |                         break | ||
|  |                 else: | ||
|  |                     # assign scalar and hope for the best | ||
|  |                     setattr(self, k, v) | ||
|  |                     if annotation: | ||
|  |                         warnings.warn(f"{self.__class__.__name__}.{k} " | ||
|  |                                       f"assigned from incompatible type {type(v).__name__}") | ||
|  | 
 | ||
|  |     def as_dict(self, *args: str, downcast: bool = True) -> Dict[str, Any]: | ||
|  |         return { | ||
|  |             name: _to_builtin(cast(object, getattr(self, name))) if downcast else getattr(self, name) | ||
|  |             for name in self if not args or name in args | ||
|  |         } | ||
|  | 
 | ||
|  |     def dump(self, f: TextIO, level: int = 0) -> None: | ||
|  |         from Utils import dump, Dumper as BaseDumper | ||
|  |         from yaml import ScalarNode, MappingNode | ||
|  | 
 | ||
|  |         class Dumper(BaseDumper): | ||
|  |             def represent_mapping(self, tag: str, mapping: Any, flow_style: Any = None) -> MappingNode: | ||
|  |                 res: MappingNode = super().represent_mapping(tag, mapping, flow_style) | ||
|  |                 pairs = cast(List[Tuple[ScalarNode, Any]], res.value) | ||
|  |                 for k, v in pairs: | ||
|  |                     k.style = None  # remove quotes from keys | ||
|  |                 return res | ||
|  | 
 | ||
|  |             def represent_str(self, data: str) -> ScalarNode: | ||
|  |                 # default double quote all strings | ||
|  |                 return self.represent_scalar("tag:yaml.org,2002:str", data, style='"') | ||
|  | 
 | ||
|  |         Dumper.add_representer(str, Dumper.represent_str) | ||
|  | 
 | ||
|  |         self._dumping = True | ||
|  |         try: | ||
|  |             # fetch class to avoid going through getattr | ||
|  |             cls = self.__class__ | ||
|  |             type_hints = cls.get_type_hints() | ||
|  |             # validate group | ||
|  |             for name in cls.__annotations__.keys(): | ||
|  |                 assert hasattr(cls, name), f"{cls}.{name} is missing a default value" | ||
|  |             # dump ordered members | ||
|  |             for name in self: | ||
|  |                 attr = cast(object, getattr(self, name)) | ||
|  |                 attr_cls = type_hints[name] if name in type_hints else attr.__class__ | ||
|  |                 attr_cls_origin = typing.get_origin(attr_cls) | ||
|  |                 while attr_cls_origin is Union:  # resolve to first type for doc string | ||
|  |                     attr_cls = typing.get_args(attr_cls)[0] | ||
|  |                     attr_cls_origin = typing.get_origin(attr_cls) | ||
|  |                 if attr_cls.__doc__ and attr_cls.__module__ != "builtins": | ||
|  |                     f.write(fmt_doc(attr_cls, level=level) + "\n") | ||
|  |                 indent = '  ' * level | ||
|  |                 if isinstance(attr, Group): | ||
|  |                     f.write(f"{indent}{name}:\n") | ||
|  |                     attr.dump(f, level=level+1) | ||
|  |                 elif isinstance(attr, (dict, list, tuple, set)): | ||
|  |                     # TODO: special handling for dicts and iterables | ||
|  |                     raise NotImplementedError() | ||
|  |                 else: | ||
|  |                     yaml_line = dump({name: _to_builtin(attr)}, Dumper=Dumper) | ||
|  |                     f.write(f"{indent}{yaml_line}") | ||
|  |             self._changed = False | ||
|  |         finally: | ||
|  |             self._dumping = False | ||
|  | 
 | ||
|  | 
 | ||
|  | class Bool: | ||
|  |     # can't subclass bool, so we use this and Union or type: ignore | ||
|  |     def __bool__(self) -> bool: | ||
|  |         raise NotImplementedError() | ||
|  | 
 | ||
|  | 
 | ||
|  | # Types for generic settings | ||
|  | T = TypeVar("T", bound="Path") | ||
|  | 
 | ||
|  | 
 | ||
|  | def _resolve_exe(s: str) -> str: | ||
|  |     """Append exe file extension if the file is an executable""" | ||
|  |     if isinstance(s, Path): | ||
|  |         from Utils import is_windows | ||
|  |         if s.is_exe and is_windows and not s.lower().endswith(".exe"): | ||
|  |             return str(s + ".exe") | ||
|  |     return str(s) | ||
|  | 
 | ||
|  | 
 | ||
|  | def _to_builtin(o: object) -> Any: | ||
|  |     """Downcast object to a builtin type for output""" | ||
|  |     if o is None: | ||
|  |         return None | ||
|  |     c = o.__class__ | ||
|  |     while c.__module__ != "builtins": | ||
|  |         c = c.__base__ | ||
|  |     return c.__call__(o) | ||
|  | 
 | ||
|  | 
 | ||
|  | class Path(str): | ||
|  |     # paths in host.yaml are str | ||
|  |     required: bool = True | ||
|  |     """Marks the file as required and opens a file browser when missing""" | ||
|  |     is_exe: bool = False | ||
|  |     """Special cross-platform handling for executables""" | ||
|  |     description: Optional[str] = None | ||
|  |     """Title to display when browsing for the file""" | ||
|  |     copy_to: Optional[str] = None | ||
|  |     """If not None, copy to AP folder instead of linking it""" | ||
|  | 
 | ||
|  |     @classmethod | ||
|  |     def validate(cls, path: str) -> None: | ||
|  |         """Overload and raise to validate input files from browse""" | ||
|  |         pass | ||
|  | 
 | ||
|  |     def browse(self: T, **kwargs: Any) -> Optional[T]: | ||
|  |         """Opens a file browser to search for the file""" | ||
|  |         raise NotImplementedError(f"Please use a subclass of Path for {self.__class__.__name__}") | ||
|  | 
 | ||
|  |     def resolve(self) -> str: | ||
|  |         return _resolve_exe(self) | ||
|  | 
 | ||
|  |     def exists(self) -> bool: | ||
|  |         return os.path.exists(self.resolve()) | ||
|  | 
 | ||
|  | 
 | ||
|  | class _UserPath(str): | ||
|  |     def resolve(self) -> str: | ||
|  |         if os.path.isabs(self): | ||
|  |             return str(self) | ||
|  |         from Utils import user_path | ||
|  |         return user_path(_resolve_exe(self)) | ||
|  | 
 | ||
|  | 
 | ||
|  | class _LocalPath(str): | ||
|  |     def resolve(self) -> str: | ||
|  |         if os.path.isabs(self): | ||
|  |             return str(self) | ||
|  |         from Utils import local_path | ||
|  |         return local_path(_resolve_exe(self)) | ||
|  | 
 | ||
|  | 
 | ||
|  | class FilePath(Path): | ||
|  |     # path to a file | ||
|  | 
 | ||
|  |     md5s: ClassVar[List[Union[str, bytes]]] = [] | ||
|  |     """MD5 hashes for default validator.""" | ||
|  | 
 | ||
|  |     def browse(self: T, | ||
|  |                filetypes: Optional[typing.Sequence[typing.Tuple[str, typing.Sequence[str]]]] = None, **kwargs: Any)\ | ||
|  |             -> Optional[T]: | ||
|  |         from Utils import open_filename, is_windows | ||
|  |         if not filetypes: | ||
|  |             if self.is_exe: | ||
|  |                 name, ext = "Program", ".exe" if is_windows else "" | ||
|  |             else: | ||
|  |                 ext = os.path.splitext(self)[1] | ||
|  |                 name = ext[1:] if ext else "File" | ||
|  |             filetypes = [(name, [ext])] | ||
|  |         res = open_filename(f"Select {self.description or self.__class__.__name__}", filetypes, self) | ||
|  |         if res: | ||
|  |             self.validate(res) | ||
|  |             if self.copy_to: | ||
|  |                 # instead of linking the file, copy it | ||
|  |                 dst = self.__class__(self.copy_to).resolve() | ||
|  |                 shutil.copy(res, dst, follow_symlinks=True) | ||
|  |                 res = dst | ||
|  |             try: | ||
|  |                 rel = os.path.relpath(res, self.__class__("").resolve()) | ||
|  |                 if not rel.startswith(".."): | ||
|  |                     res = rel | ||
|  |             except ValueError: | ||
|  |                 pass | ||
|  |             return self.__class__(res) | ||
|  |         return None | ||
|  | 
 | ||
|  |     @classmethod | ||
|  |     def _validate_stream_hashes(cls, f: BinaryIO) -> None: | ||
|  |         """Helper to efficiently validate stream against hashes""" | ||
|  |         if not cls.md5s: | ||
|  |             return  # no hashes to validate against | ||
|  | 
 | ||
|  |         pos = f.tell() | ||
|  |         try: | ||
|  |             from hashlib import md5 | ||
|  |             file_md5 = md5() | ||
|  |             block = bytearray(64*1024) | ||
|  |             view = memoryview(block) | ||
|  |             while n := f.readinto(view):  # type: ignore | ||
|  |                 file_md5.update(view[:n]) | ||
|  |             file_md5_hex = file_md5.hexdigest() | ||
|  |             for valid_md5 in cls.md5s: | ||
|  |                 if isinstance(valid_md5, str): | ||
|  |                     if valid_md5.lower() == file_md5_hex: | ||
|  |                         break | ||
|  |                 elif valid_md5 == file_md5.digest(): | ||
|  |                     break | ||
|  |             else: | ||
|  |                 raise ValueError(f"Hashes do not match for {cls.__name__}") | ||
|  |         finally: | ||
|  |             f.seek(pos) | ||
|  | 
 | ||
|  |     @classmethod | ||
|  |     def validate(cls, path: str) -> None: | ||
|  |         """Try to open and validate file against hashes""" | ||
|  |         with open(path, "rb", buffering=0) as f: | ||
|  |             try: | ||
|  |                 cls._validate_stream_hashes(f) | ||
|  |             except ValueError: | ||
|  |                 raise ValueError(f"File hash does not match for {path}") | ||
|  | 
 | ||
|  | 
 | ||
|  | class FolderPath(Path): | ||
|  |     # path to a folder | ||
|  | 
 | ||
|  |     def browse(self: T, **kwargs: Any) -> Optional[T]: | ||
|  |         from Utils import open_directory | ||
|  |         res = open_directory(f"Select {self.description or self.__class__.__name__}", self) | ||
|  |         if res: | ||
|  |             try: | ||
|  |                 rel = os.path.relpath(res, self.__class__("").resolve()) | ||
|  |                 if not rel.startswith(".."): | ||
|  |                     res = rel | ||
|  |             except ValueError: | ||
|  |                 pass | ||
|  |             return self.__class__(res) | ||
|  |         return None | ||
|  | 
 | ||
|  | 
 | ||
|  | class UserFilePath(_UserPath, FilePath): | ||
|  |     pass | ||
|  | 
 | ||
|  | 
 | ||
|  | class UserFolderPath(_UserPath, FolderPath): | ||
|  |     pass | ||
|  | 
 | ||
|  | 
 | ||
|  | class OptionalUserFilePath(UserFilePath): | ||
|  |     required = False | ||
|  | 
 | ||
|  | 
 | ||
|  | class OptionalUserFolderPath(UserFolderPath): | ||
|  |     required = False | ||
|  | 
 | ||
|  | 
 | ||
|  | class LocalFilePath(_LocalPath, FilePath): | ||
|  |     pass | ||
|  | 
 | ||
|  | 
 | ||
|  | class LocalFolderPath(_LocalPath, FolderPath): | ||
|  |     pass | ||
|  | 
 | ||
|  | 
 | ||
|  | class OptionalLocalFilePath(LocalFilePath): | ||
|  |     required = False | ||
|  | 
 | ||
|  | 
 | ||
|  | class OptionalLocalFolderPath(LocalFolderPath): | ||
|  |     required = False | ||
|  | 
 | ||
|  | 
 | ||
|  | class SNESRomPath(UserFilePath): | ||
|  |     # Special UserFilePath that ignores an optional header when validating | ||
|  | 
 | ||
|  |     @classmethod | ||
|  |     def validate(cls, path: str) -> None: | ||
|  |         """Try to open and validate file against hashes""" | ||
|  |         with open(path, "rb", buffering=0) as f: | ||
|  |             f.seek(0, os.SEEK_END) | ||
|  |             size = f.tell() | ||
|  |             if size % 1024 == 512: | ||
|  |                 f.seek(512)  # skip header | ||
|  |             elif size % 1024 == 0: | ||
|  |                 f.seek(0)  # header-less | ||
|  |             else: | ||
|  |                 raise ValueError(f"Unexpected file size for {path}") | ||
|  | 
 | ||
|  |             try: | ||
|  |                 cls._validate_stream_hashes(f) | ||
|  |             except ValueError: | ||
|  |                 raise ValueError(f"File hash does not match for {path}") | ||
|  | 
 | ||
|  | 
 | ||
|  | # World-independent setting groups | ||
|  | 
 | ||
|  | class GeneralOptions(Group): | ||
|  |     class OutputPath(OptionalUserFolderPath): | ||
|  |         """
 | ||
|  |         Where to place output files | ||
|  |         """
 | ||
|  |         # created on demand, so marked as optional | ||
|  | 
 | ||
|  |     output_path: OutputPath = OutputPath("output") | ||
|  | 
 | ||
|  | 
 | ||
|  | class ServerOptions(Group): | ||
|  |     """
 | ||
|  |     Options for MultiServer | ||
|  |     Null means nothing, for the server this means to default the value | ||
|  |     These overwrite command line arguments! | ||
|  |     """
 | ||
|  | 
 | ||
|  |     class ServerPassword(str): | ||
|  |         """
 | ||
|  |         Allows for clients to log on and manage the server.  If this is null, no remote administration is possible. | ||
|  |         """
 | ||
|  | 
 | ||
|  |     class DisableItemCheat(Bool): | ||
|  |         """Disallow !getitem""" | ||
|  | 
 | ||
|  |     class LocationCheckPoints(int): | ||
|  |         """
 | ||
|  |         Client hint system | ||
|  |         Points given to a player for each acquired item in their world | ||
|  |         """
 | ||
|  | 
 | ||
|  |     class HintCost(int): | ||
|  |         """
 | ||
|  |         Relative point cost to receive a hint via !hint for players | ||
|  |         so for example hint_cost: 20 would mean that for every 20% of available checks, you get the ability to hint, | ||
|  |         for a total of 5 | ||
|  |         """
 | ||
|  | 
 | ||
|  |     class ReleaseMode(str): | ||
|  |         """
 | ||
|  |         Release modes | ||
|  |         A Release sends out the remaining items *from* a world that releases | ||
|  |         "disabled" -> clients can't release, | ||
|  |         "enabled" -> clients can always release | ||
|  |         "auto" -> automatic release on goal completion | ||
|  |         "auto-enabled" -> automatic release on goal completion and manual release is also enabled | ||
|  |         "goal" -> release is allowed after goal completion | ||
|  |         """
 | ||
|  | 
 | ||
|  |     class CollectMode(str): | ||
|  |         """
 | ||
|  |         Collect modes | ||
|  |         A Collect sends the remaining items *to* a world that collects | ||
|  |         "disabled" -> clients can't collect, | ||
|  |         "enabled" -> clients can always collect | ||
|  |         "auto" -> automatic collect on goal completion | ||
|  |         "auto-enabled" -> automatic collect on goal completion and manual collect is also enabled | ||
|  |         "goal" -> collect is allowed after goal completion | ||
|  |         """
 | ||
|  | 
 | ||
|  |     class RemainingMode(str): | ||
|  |         """
 | ||
|  |         Remaining modes | ||
|  |         !remaining handling, that tells a client which items remain in their pool | ||
|  |         "enabled" -> Client can always ask for remaining items | ||
|  |         "disabled" -> Client can never ask for remaining items | ||
|  |         "goal" -> Client can ask for remaining items after goal completion | ||
|  |         """
 | ||
|  | 
 | ||
|  |     class AutoShutdown(int): | ||
|  |         """Automatically shut down the server after this many seconds without new location checks, 0 to keep running""" | ||
|  | 
 | ||
|  |     class Compatibility(IntEnum): | ||
|  |         """
 | ||
|  |         Compatibility handling | ||
|  |         2 -> Recommended for casual/cooperative play, attempt to be compatible with everything across all versions | ||
|  |         1 -> No longer in use, kept reserved in case of future use | ||
|  |         0 -> Recommended for tournaments to force a level playing field, only allow an exact version match | ||
|  |         """
 | ||
|  |         OFF = 0 | ||
|  |         ON = 1 | ||
|  |         FULL = 2 | ||
|  | 
 | ||
|  |     class LogNetwork(IntEnum): | ||
|  |         """log all server traffic, mostly for dev use""" | ||
|  |         OFF = 0 | ||
|  |         ON = 1 | ||
|  | 
 | ||
|  |     host: Optional[str] = None | ||
|  |     port: int = 38281 | ||
|  |     password: Optional[str] = None | ||
|  |     multidata: Optional[str] = None | ||
|  |     savefile: Optional[str] = None | ||
|  |     disable_save: bool = False | ||
|  |     loglevel: str = "info" | ||
|  |     server_password: Optional[ServerPassword] = None | ||
|  |     disable_item_cheat: Union[DisableItemCheat, bool] = False | ||
|  |     location_check_points: LocationCheckPoints = LocationCheckPoints(1) | ||
|  |     hint_cost: HintCost = HintCost(10) | ||
|  |     release_mode: ReleaseMode = ReleaseMode("goal") | ||
|  |     collect_mode: CollectMode = CollectMode("goal") | ||
|  |     remaining_mode: RemainingMode = RemainingMode("goal") | ||
|  |     auto_shutdown: AutoShutdown = AutoShutdown(0) | ||
|  |     compatibility: Compatibility = Compatibility(2) | ||
|  |     log_network: LogNetwork = LogNetwork(0) | ||
|  | 
 | ||
|  | 
 | ||
|  | class GeneratorOptions(Group): | ||
|  |     """Options for Generation""" | ||
|  | 
 | ||
|  |     class EnemizerPath(LocalFilePath): | ||
|  |         """Location of your Enemizer CLI, available here: https://github.com/Ijwu/Enemizer/releases""" | ||
|  |         is_exe = True | ||
|  | 
 | ||
|  |     class PlayerFilesPath(OptionalUserFolderPath): | ||
|  |         """Folder from which the player yaml files are pulled from""" | ||
|  |         # created on demand, so marked as optional | ||
|  | 
 | ||
|  |     class Players(int): | ||
|  |         """amount of players, 0 to infer from player files""" | ||
|  | 
 | ||
|  |     class WeightsFilePath(str): | ||
|  |         """
 | ||
|  |         general weights file, within the stated player_files_path location | ||
|  |         gets used if players is higher than the amount of per-player files found to fill remaining slots | ||
|  |         """
 | ||
|  |         # this is special because the path is relative to player_files_path | ||
|  | 
 | ||
|  |     class MetaFilePath(str): | ||
|  |         """Meta file name, within the stated player_files_path location""" | ||
|  |         # this is special because the path is relative to player_files_path | ||
|  | 
 | ||
|  |     class Spoiler(IntEnum): | ||
|  |         """
 | ||
|  |         Create a spoiler file | ||
|  |         0 -> None | ||
|  |         1 -> Spoiler without playthrough or paths to playthrough required items | ||
|  |         2 -> Spoiler with playthrough (viable solution to goals) | ||
|  |         3 -> Spoiler with playthrough and traversal paths towards items | ||
|  |         """
 | ||
|  |         NONE = 0 | ||
|  |         BASIC = 1 | ||
|  |         PLAYTHROUGH = 2 | ||
|  |         FULL = 3 | ||
|  | 
 | ||
|  |     class GlitchTriforceRoom(IntEnum): | ||
|  |         """
 | ||
|  |         Glitch to Triforce room from Ganon | ||
|  |         When disabled, you have to have a weapon that can hurt ganon (master sword or swordless/easy item functionality | ||
|  |         + hammer) and have completed the goal required for killing ganon to be able to access the triforce room. | ||
|  |         1 -> Enabled. | ||
|  |         0 -> Disabled (except in no-logic) | ||
|  |         """
 | ||
|  |         OFF = 0 | ||
|  |         ON = 1 | ||
|  | 
 | ||
|  |     class PlandoOptions(str): | ||
|  |         """
 | ||
|  |         List of options that can be plando'd. Can be combined, for example "bosses, items" | ||
|  |         Available options: bosses, items, texts, connections | ||
|  |         """
 | ||
|  | 
 | ||
|  |     class Race(IntEnum): | ||
|  |         """Create encrypted race roms and flag games as race mode""" | ||
|  |         OFF = 0 | ||
|  |         ON = 1 | ||
|  | 
 | ||
|  |     enemizer_path: EnemizerPath = EnemizerPath("EnemizerCLI/EnemizerCLI.Core")  # + ".exe" is implied on Windows | ||
|  |     player_files_path: PlayerFilesPath = PlayerFilesPath("Players") | ||
|  |     players: Players = Players(0) | ||
|  |     weights_file_path: WeightsFilePath = WeightsFilePath("weights.yaml") | ||
|  |     meta_file_path: MetaFilePath = MetaFilePath("meta.yaml") | ||
|  |     spoiler: Spoiler = Spoiler(3) | ||
|  |     glitch_triforce_room: GlitchTriforceRoom = GlitchTriforceRoom(1)  # why is this here? | ||
|  |     race: Race = Race(0) | ||
|  |     plando_options: PlandoOptions = PlandoOptions("bosses") | ||
|  | 
 | ||
|  | 
 | ||
|  | class SNIOptions(Group): | ||
|  |     class SNIPath(LocalFolderPath): | ||
|  |         """
 | ||
|  |         Set this to your SNI folder location if you want the MultiClient to attempt an auto start, \ | ||
|  | does nothing if not found | ||
|  |         """
 | ||
|  | 
 | ||
|  |     class SnesRomStart(str): | ||
|  |         """
 | ||
|  |         Set this to false to never autostart a rom (such as after patching) | ||
|  |         True for operating system default program | ||
|  |         Alternatively, a path to a program to open the .sfc file with | ||
|  |         """
 | ||
|  | 
 | ||
|  |     sni_path: SNIPath = SNIPath("SNI") | ||
|  |     snes_rom_start: Union[SnesRomStart, bool] = True | ||
|  | 
 | ||
|  | 
 | ||
|  | # Top-level group with lazy loading of worlds | ||
|  | 
 | ||
|  | class Settings(Group): | ||
|  |     general_options: GeneralOptions = GeneralOptions() | ||
|  |     server_options: ServerOptions = ServerOptions() | ||
|  |     generator: GeneratorOptions = GeneratorOptions() | ||
|  |     sni_options: SNIOptions = SNIOptions() | ||
|  | 
 | ||
|  |     _filename: Optional[str] = None | ||
|  | 
 | ||
|  |     def __getattribute__(self, key: str) -> Any: | ||
|  |         if key.startswith("_") or key in self.__class__.__dict__: | ||
|  |             # not a group or a hard-coded group | ||
|  |             pass | ||
|  |         elif key not in dir(self) or isinstance(super().__getattribute__(key), dict): | ||
|  |             # settings class not loaded yet | ||
|  |             if key not in _world_settings_name_cache: | ||
|  |                 # find world that provides the settings class | ||
|  |                 _update_cache() | ||
|  |                 # check for missing keys to update _changed | ||
|  |                 for world_settings_name in _world_settings_name_cache: | ||
|  |                     if world_settings_name not in dir(self): | ||
|  |                         self._changed = True | ||
|  |             if key not in _world_settings_name_cache: | ||
|  |                 # not a world group | ||
|  |                 return super().__getattribute__(key) | ||
|  |             # directly import world and grab settings class | ||
|  |             world_mod, world_cls_name = _world_settings_name_cache[key].rsplit(".", 1) | ||
|  |             world = cast(type, getattr(__import__(world_mod, fromlist=[world_cls_name]), world_cls_name)) | ||
|  |             assert getattr(world, "settings_key") == key | ||
|  |             try: | ||
|  |                 cls_or_name = world.__annotations__["settings"] | ||
|  |             except KeyError: | ||
|  |                 import warnings | ||
|  |                 warnings.warn(f"World {world_cls_name} does not define settings. Please consider upgrading the world.") | ||
|  |                 return super().__getattribute__(key) | ||
|  |             if isinstance(cls_or_name, str): | ||
|  |                 # Try to resolve type. Sadly we can't use get_type_hints, see https://bugs.python.org/issue43463 | ||
|  |                 cls_name = cls_or_name | ||
|  |                 if "[" in cls_name:  # resolve ClassVar[] | ||
|  |                     cls_name = cls_name.split("[", 1)[1].rsplit("]", 1)[0] | ||
|  |                 cls = cast(type, getattr(__import__(world_mod, fromlist=[cls_name]), cls_name)) | ||
|  |             else: | ||
|  |                 type_args = typing.get_args(cls_or_name)  # resolve ClassVar[] | ||
|  |                 cls = type_args[0] if type_args else cast(type, cls_or_name) | ||
|  |             impl: Group = cast(Group, cls()) | ||
|  |             assert isinstance(impl, Group), f"{world_cls_name}.settings has to inherit from settings.Group. " \ | ||
|  |                                             "If that's already the case, please avoid recursive partial imports." | ||
|  |             # above assert fails for recursive partial imports | ||
|  |             # upcast loaded data to settings class | ||
|  |             try: | ||
|  |                 dct = super().__getattribute__(key) | ||
|  |                 if isinstance(dct, dict): | ||
|  |                     impl.update(dct) | ||
|  |                 else: | ||
|  |                     self._changed = True  # key is a class var -> new section | ||
|  |             except AttributeError: | ||
|  |                 self._changed = True  # key is unknown -> new section | ||
|  |             setattr(self, key, impl) | ||
|  | 
 | ||
|  |         return super().__getattribute__(key) | ||
|  | 
 | ||
|  |     def __init__(self, location: Optional[str]):  # change to PathLike[str] once we drop 3.8? | ||
|  |         super().__init__() | ||
|  |         if location: | ||
|  |             from Utils import parse_yaml | ||
|  |             with open(location, encoding="utf-8-sig") as f: | ||
|  |                 options = parse_yaml(f.read()) | ||
|  |                 # TODO: detect if upgrade is required | ||
|  |                 # TODO: once we have a cache for _world_settings_name_cache, detect if any game section is missing | ||
|  |                 self.update(options or {}) | ||
|  |             self._filename = location | ||
|  | 
 | ||
|  |         def autosave() -> None: | ||
|  |             if self._filename and self.changed: | ||
|  |                 self.save() | ||
|  | 
 | ||
|  |         import atexit | ||
|  |         atexit.register(autosave) | ||
|  | 
 | ||
|  |     def save(self, location: Optional[str] = None) -> None:  # as above | ||
|  |         location = location or self._filename | ||
|  |         assert location, "No file specified" | ||
|  |         # can't use utf-8-sig because it breaks backward compat: pyyaml on Windows with bytes does not strip the BOM | ||
|  |         with open(location, "w", encoding="utf-8") as f: | ||
|  |             self.dump(f) | ||
|  |         self._filename = location | ||
|  | 
 | ||
|  |     def dump(self, f: TextIO, level: int = 0) -> None: | ||
|  |         # load all world setting classes | ||
|  |         _update_cache() | ||
|  |         for key in _world_settings_name_cache: | ||
|  |             self.__getattribute__(key)  # load all worlds | ||
|  |         super().dump(f, level) | ||
|  | 
 | ||
|  |     @property | ||
|  |     def filename(self) -> Optional[str]: | ||
|  |         return self._filename | ||
|  | 
 | ||
|  | 
 | ||
|  | # host.yaml loader | ||
|  | 
 | ||
|  | def get_settings() -> Settings: | ||
|  |     """Returns settings from the default host.yaml""" | ||
|  |     with _lock:  # make sure we only have one instance | ||
|  |         res = getattr(get_settings, "_cache", None) | ||
|  |         if not res: | ||
|  |             import os | ||
|  |             from Utils import user_path, local_path | ||
|  |             filenames = ("options.yaml", "host.yaml") | ||
|  |             locations: List[str] = [] | ||
|  |             if os.path.join(os.getcwd()) != local_path(): | ||
|  |                 locations += filenames  # use files from cwd only if it's not the local_path | ||
|  |             locations += [user_path(filename) for filename in filenames] | ||
|  |             for location in locations: | ||
|  |                 try: | ||
|  |                     res = Settings(location) | ||
|  |                     break | ||
|  |                 except FileNotFoundError: | ||
|  |                     continue | ||
|  |             else: | ||
|  |                 warnings.warn(f"Could not find {filenames[1]} to load options. Creating a new one.") | ||
|  |                 res = Settings(None) | ||
|  |                 res.save(user_path(filenames[1])) | ||
|  |             setattr(get_settings, "_cache", res) | ||
|  |         return res |