586 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			586 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import annotations
 | 
						|
 | 
						|
import shutil
 | 
						|
import typing
 | 
						|
import builtins
 | 
						|
import os
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import pickle
 | 
						|
import functools
 | 
						|
import io
 | 
						|
import collections
 | 
						|
import importlib
 | 
						|
import logging
 | 
						|
 | 
						|
if typing.TYPE_CHECKING:
 | 
						|
    from tkinter import Tk
 | 
						|
else:
 | 
						|
    Tk = typing.Any
 | 
						|
 | 
						|
 | 
						|
def tuplize_version(version: str) -> Version:
 | 
						|
    return Version(*(int(piece, 10) for piece in version.split(".")))
 | 
						|
 | 
						|
 | 
						|
class Version(typing.NamedTuple):
 | 
						|
    major: int
 | 
						|
    minor: int
 | 
						|
    build: int
 | 
						|
 | 
						|
 | 
						|
__version__ = "0.3.3"
 | 
						|
version_tuple = tuplize_version(__version__)
 | 
						|
 | 
						|
is_linux = sys.platform.startswith('linux')
 | 
						|
is_macos = sys.platform == 'darwin'
 | 
						|
is_windows = sys.platform in ("win32", "cygwin", "msys")
 | 
						|
 | 
						|
import jellyfish
 | 
						|
from yaml import load, load_all, dump, SafeLoader
 | 
						|
 | 
						|
try:
 | 
						|
    from yaml import CLoader as Loader
 | 
						|
except ImportError:
 | 
						|
    from yaml import Loader
 | 
						|
 | 
						|
 | 
						|
def int16_as_bytes(value: int) -> typing.List[int]:
 | 
						|
    value = value & 0xFFFF
 | 
						|
    return [value & 0xFF, (value >> 8) & 0xFF]
 | 
						|
 | 
						|
 | 
						|
def int32_as_bytes(value: int) -> typing.List[int]:
 | 
						|
    value = value & 0xFFFFFFFF
 | 
						|
    return [value & 0xFF, (value >> 8) & 0xFF, (value >> 16) & 0xFF, (value >> 24) & 0xFF]
 | 
						|
 | 
						|
 | 
						|
def pc_to_snes(value: int) -> int:
 | 
						|
    return ((value << 1) & 0x7F0000) | (value & 0x7FFF) | 0x8000
 | 
						|
 | 
						|
 | 
						|
def snes_to_pc(value: int) -> int:
 | 
						|
    return ((value & 0x7F0000) >> 1) | (value & 0x7FFF)
 | 
						|
 | 
						|
 | 
						|
RetType = typing.TypeVar("RetType")
 | 
						|
 | 
						|
 | 
						|
def cache_argsless(function: typing.Callable[[], RetType]) -> typing.Callable[[], RetType]:
 | 
						|
    assert not function.__code__.co_argcount, "Can only cache 0 argument functions with this cache."
 | 
						|
 | 
						|
    sentinel = object()
 | 
						|
    result: typing.Union[object, RetType] = sentinel
 | 
						|
 | 
						|
    def _wrap() -> RetType:
 | 
						|
        nonlocal result
 | 
						|
        if result is sentinel:
 | 
						|
            result = function()
 | 
						|
        return typing.cast(RetType, result)
 | 
						|
 | 
						|
    return _wrap
 | 
						|
 | 
						|
 | 
						|
def is_frozen() -> bool:
 | 
						|
    return typing.cast(bool, getattr(sys, 'frozen', False))
 | 
						|
 | 
						|
 | 
						|
def local_path(*path: str) -> str:
 | 
						|
    """Returns path to a file in the local Archipelago installation or source."""
 | 
						|
    if hasattr(local_path, 'cached_path'):
 | 
						|
        pass
 | 
						|
    elif is_frozen():
 | 
						|
        if hasattr(sys, "_MEIPASS"):
 | 
						|
            # we are running in a PyInstaller bundle
 | 
						|
            local_path.cached_path = sys._MEIPASS  # pylint: disable=protected-access,no-member
 | 
						|
        else:
 | 
						|
            # cx_Freeze
 | 
						|
            local_path.cached_path = os.path.dirname(os.path.abspath(sys.argv[0]))
 | 
						|
    else:
 | 
						|
        import __main__
 | 
						|
        if hasattr(__main__, "__file__"):
 | 
						|
            # we are running in a normal Python environment
 | 
						|
            local_path.cached_path = os.path.dirname(os.path.abspath(__main__.__file__))
 | 
						|
        else:
 | 
						|
            # pray
 | 
						|
            local_path.cached_path = os.path.abspath(".")
 | 
						|
 | 
						|
    return os.path.join(local_path.cached_path, *path)
 | 
						|
 | 
						|
 | 
						|
def home_path(*path: str) -> str:
 | 
						|
    """Returns path to a file in the user home's Archipelago directory."""
 | 
						|
    if hasattr(home_path, 'cached_path'):
 | 
						|
        pass
 | 
						|
    elif sys.platform.startswith('linux'):
 | 
						|
        home_path.cached_path = os.path.expanduser('~/Archipelago')
 | 
						|
        os.makedirs(home_path.cached_path, 0o700, exist_ok=True)
 | 
						|
    else:
 | 
						|
        # not implemented
 | 
						|
        home_path.cached_path = local_path()  # this will generate the same exceptions we got previously
 | 
						|
 | 
						|
    return os.path.join(home_path.cached_path, *path)
 | 
						|
 | 
						|
 | 
						|
def user_path(*path: str) -> str:
 | 
						|
    """Returns either local_path or home_path based on write permissions."""
 | 
						|
    if hasattr(user_path, 'cached_path'):
 | 
						|
        pass
 | 
						|
    elif os.access(local_path(), os.W_OK):
 | 
						|
        user_path.cached_path = local_path()
 | 
						|
    else:
 | 
						|
        user_path.cached_path = home_path()
 | 
						|
        # populate home from local - TODO: upgrade feature
 | 
						|
        if user_path.cached_path != local_path() and not os.path.exists(user_path('host.yaml')):
 | 
						|
            for dn in ('Players', 'data/sprites'):
 | 
						|
                shutil.copytree(local_path(dn), user_path(dn), dirs_exist_ok=True)
 | 
						|
            for fn in ('manifest.json', 'host.yaml'):
 | 
						|
                shutil.copy2(local_path(fn), user_path(fn))
 | 
						|
 | 
						|
    return os.path.join(user_path.cached_path, *path)
 | 
						|
 | 
						|
 | 
						|
def output_path(*path: str):
 | 
						|
    if hasattr(output_path, 'cached_path'):
 | 
						|
        return os.path.join(output_path.cached_path, *path)
 | 
						|
    output_path.cached_path = user_path(get_options()["general_options"]["output_path"])
 | 
						|
    path = os.path.join(output_path.cached_path, *path)
 | 
						|
    os.makedirs(os.path.dirname(path), exist_ok=True)
 | 
						|
    return path
 | 
						|
 | 
						|
 | 
						|
def open_file(filename):
 | 
						|
    if sys.platform == 'win32':
 | 
						|
        os.startfile(filename)
 | 
						|
    else:
 | 
						|
        open_command = 'open' if sys.platform == 'darwin' else 'xdg-open'
 | 
						|
        subprocess.call([open_command, filename])
 | 
						|
 | 
						|
 | 
						|
# from https://gist.github.com/pypt/94d747fe5180851196eb#gistcomment-4015118 with some changes
 | 
						|
class UniqueKeyLoader(SafeLoader):
 | 
						|
    def construct_mapping(self, node, deep=False):
 | 
						|
        mapping = set()
 | 
						|
        for key_node, value_node in node.value:
 | 
						|
            key = self.construct_object(key_node, deep=deep)
 | 
						|
            if key in mapping:
 | 
						|
                logging.error(f"YAML duplicates sanity check failed{key_node.start_mark}")
 | 
						|
                raise KeyError(f"Duplicate key {key} found in YAML. Already found keys: {mapping}.")
 | 
						|
            mapping.add(key)
 | 
						|
        return super().construct_mapping(node, deep)
 | 
						|
 | 
						|
 | 
						|
parse_yaml = functools.partial(load, Loader=UniqueKeyLoader)
 | 
						|
parse_yamls = functools.partial(load_all, Loader=UniqueKeyLoader)
 | 
						|
unsafe_parse_yaml = functools.partial(load, Loader=Loader)
 | 
						|
 | 
						|
 | 
						|
def get_cert_none_ssl_context():
 | 
						|
    import ssl
 | 
						|
    ctx = ssl.create_default_context()
 | 
						|
    ctx.check_hostname = False
 | 
						|
    ctx.verify_mode = ssl.CERT_NONE
 | 
						|
    return ctx
 | 
						|
 | 
						|
 | 
						|
@cache_argsless
 | 
						|
def get_public_ipv4() -> str:
 | 
						|
    import socket
 | 
						|
    import urllib.request
 | 
						|
    ip = socket.gethostbyname(socket.gethostname())
 | 
						|
    ctx = get_cert_none_ssl_context()
 | 
						|
    try:
 | 
						|
        ip = urllib.request.urlopen('https://checkip.amazonaws.com/', context=ctx).read().decode('utf8').strip()
 | 
						|
    except Exception as e:
 | 
						|
        try:
 | 
						|
            ip = urllib.request.urlopen('https://v4.ident.me', context=ctx).read().decode('utf8').strip()
 | 
						|
        except:
 | 
						|
            logging.exception(e)
 | 
						|
            pass  # we could be offline, in a local game, so no point in erroring out
 | 
						|
    return ip
 | 
						|
 | 
						|
 | 
						|
@cache_argsless
 | 
						|
def get_public_ipv6() -> str:
 | 
						|
    import socket
 | 
						|
    import urllib.request
 | 
						|
    ip = socket.gethostbyname(socket.gethostname())
 | 
						|
    ctx = get_cert_none_ssl_context()
 | 
						|
    try:
 | 
						|
        ip = urllib.request.urlopen('https://v6.ident.me', context=ctx).read().decode('utf8').strip()
 | 
						|
    except Exception as e:
 | 
						|
        logging.exception(e)
 | 
						|
        pass  # we could be offline, in a local game, or ipv6 may not be available
 | 
						|
    return ip
 | 
						|
 | 
						|
 | 
						|
@cache_argsless
 | 
						|
def get_default_options() -> dict:
 | 
						|
    # Refer to host.yaml for comments as to what all these options mean.
 | 
						|
    options = {
 | 
						|
        "general_options": {
 | 
						|
            "output_path": "output",
 | 
						|
        },
 | 
						|
        "factorio_options": {
 | 
						|
            "executable": os.path.join("factorio", "bin", "x64", "factorio"),
 | 
						|
        },
 | 
						|
        "sm_options": {
 | 
						|
            "rom_file": "Super Metroid (JU).sfc",
 | 
						|
            "sni": "SNI",
 | 
						|
            "rom_start": True,
 | 
						|
        },
 | 
						|
        "soe_options": {
 | 
						|
            "rom_file": "Secret of Evermore (USA).sfc",
 | 
						|
        },
 | 
						|
        "lttp_options": {
 | 
						|
            "rom_file": "Zelda no Densetsu - Kamigami no Triforce (Japan).sfc",
 | 
						|
            "sni": "SNI",
 | 
						|
            "rom_start": True,
 | 
						|
 | 
						|
        },
 | 
						|
        "server_options": {
 | 
						|
            "host": None,
 | 
						|
            "port": 38281,
 | 
						|
            "password": None,
 | 
						|
            "multidata": None,
 | 
						|
            "savefile": None,
 | 
						|
            "disable_save": False,
 | 
						|
            "loglevel": "info",
 | 
						|
            "server_password": None,
 | 
						|
            "disable_item_cheat": False,
 | 
						|
            "location_check_points": 1,
 | 
						|
            "hint_cost": 10,
 | 
						|
            "forfeit_mode": "goal",
 | 
						|
            "collect_mode": "disabled",
 | 
						|
            "remaining_mode": "goal",
 | 
						|
            "auto_shutdown": 0,
 | 
						|
            "compatibility": 2,
 | 
						|
            "log_network": 0
 | 
						|
        },
 | 
						|
        "generator": {
 | 
						|
            "teams": 1,
 | 
						|
            "enemizer_path": os.path.join("EnemizerCLI", "EnemizerCLI.Core"),
 | 
						|
            "player_files_path": "Players",
 | 
						|
            "players": 0,
 | 
						|
            "weights_file_path": "weights.yaml",
 | 
						|
            "meta_file_path": "meta.yaml",
 | 
						|
            "spoiler": 2,
 | 
						|
            "glitch_triforce_room": 1,
 | 
						|
            "race": 0,
 | 
						|
            "plando_options": "bosses",
 | 
						|
        },
 | 
						|
        "minecraft_options": {
 | 
						|
            "forge_directory": "Minecraft Forge server",
 | 
						|
            "max_heap_size": "2G",
 | 
						|
            "release_channel": "release"
 | 
						|
        },
 | 
						|
        "oot_options": {
 | 
						|
            "rom_file": "The Legend of Zelda - Ocarina of Time.z64",
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return options
 | 
						|
 | 
						|
 | 
						|
def update_options(src: dict, dest: dict, filename: str, keys: list) -> dict:
 | 
						|
    for key, value in src.items():
 | 
						|
        new_keys = keys.copy()
 | 
						|
        new_keys.append(key)
 | 
						|
        option_name = '.'.join(new_keys)
 | 
						|
        if key not in dest:
 | 
						|
            dest[key] = value
 | 
						|
            if filename.endswith("options.yaml"):
 | 
						|
                logging.info(f"Warning: {filename} is missing {option_name}")
 | 
						|
        elif isinstance(value, dict):
 | 
						|
            if not isinstance(dest.get(key, None), dict):
 | 
						|
                if filename.endswith("options.yaml"):
 | 
						|
                    logging.info(f"Warning: {filename} has {option_name}, but it is not a dictionary. overwriting.")
 | 
						|
                dest[key] = value
 | 
						|
            else:
 | 
						|
                dest[key] = update_options(value, dest[key], filename, new_keys)
 | 
						|
    return dest
 | 
						|
 | 
						|
 | 
						|
@cache_argsless
 | 
						|
def get_options() -> dict:
 | 
						|
    if not hasattr(get_options, "options"):
 | 
						|
        filenames = ("options.yaml", "host.yaml")
 | 
						|
        locations = []
 | 
						|
        if os.path.join(os.getcwd()) != local_path():
 | 
						|
            locations += filenames  # use files from cwd only if it's not the local_path
 | 
						|
        locations += [user_path(filename) for filename in filenames]
 | 
						|
 | 
						|
        for location in locations:
 | 
						|
            if os.path.exists(location):
 | 
						|
                with open(location) as f:
 | 
						|
                    options = parse_yaml(f.read())
 | 
						|
 | 
						|
                get_options.options = update_options(get_default_options(), options, location, list())
 | 
						|
                break
 | 
						|
        else:
 | 
						|
            raise FileNotFoundError(f"Could not find {filenames[1]} to load options.")
 | 
						|
    return get_options.options
 | 
						|
 | 
						|
 | 
						|
def get_item_name_from_id(code: int) -> str:
 | 
						|
    from worlds import lookup_any_item_id_to_name
 | 
						|
    return lookup_any_item_id_to_name.get(code, f'Unknown item (ID:{code})')
 | 
						|
 | 
						|
 | 
						|
def get_location_name_from_id(code: int) -> str:
 | 
						|
    from worlds import lookup_any_location_id_to_name
 | 
						|
    return lookup_any_location_id_to_name.get(code, f'Unknown location (ID:{code})')
 | 
						|
 | 
						|
 | 
						|
def persistent_store(category: str, key: typing.Any, value: typing.Any):
 | 
						|
    path = user_path("_persistent_storage.yaml")
 | 
						|
    storage: dict = persistent_load()
 | 
						|
    category = storage.setdefault(category, {})
 | 
						|
    category[key] = value
 | 
						|
    with open(path, "wt") as f:
 | 
						|
        f.write(dump(storage))
 | 
						|
 | 
						|
 | 
						|
def persistent_load() -> typing.Dict[dict]:
 | 
						|
    storage = getattr(persistent_load, "storage", None)
 | 
						|
    if storage:
 | 
						|
        return storage
 | 
						|
    path = user_path("_persistent_storage.yaml")
 | 
						|
    storage: dict = {}
 | 
						|
    if os.path.exists(path):
 | 
						|
        try:
 | 
						|
            with open(path, "r") as f:
 | 
						|
                storage = unsafe_parse_yaml(f.read())
 | 
						|
        except Exception as e:
 | 
						|
            logging.debug(f"Could not read store: {e}")
 | 
						|
    if storage is None:
 | 
						|
        storage = {}
 | 
						|
    persistent_load.storage = storage
 | 
						|
    return storage
 | 
						|
 | 
						|
 | 
						|
def get_adjuster_settings(gameName: str):
 | 
						|
    adjuster_settings = persistent_load().get("adjuster", {}).get(gameName, {})
 | 
						|
    return adjuster_settings
 | 
						|
 | 
						|
 | 
						|
@cache_argsless
 | 
						|
def get_unique_identifier():
 | 
						|
    uuid = persistent_load().get("client", {}).get("uuid", None)
 | 
						|
    if uuid:
 | 
						|
        return uuid
 | 
						|
 | 
						|
    import uuid
 | 
						|
    uuid = uuid.getnode()
 | 
						|
    persistent_store("client", "uuid", uuid)
 | 
						|
    return uuid
 | 
						|
 | 
						|
 | 
						|
safe_builtins = {
 | 
						|
    'set',
 | 
						|
    'frozenset',
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
class RestrictedUnpickler(pickle.Unpickler):
 | 
						|
    def __init__(self, *args, **kwargs):
 | 
						|
        super(RestrictedUnpickler, self).__init__(*args, **kwargs)
 | 
						|
        self.options_module = importlib.import_module("Options")
 | 
						|
        self.net_utils_module = importlib.import_module("NetUtils")
 | 
						|
        self.generic_properties_module = importlib.import_module("worlds.generic")
 | 
						|
 | 
						|
    def find_class(self, module, name):
 | 
						|
        if module == "builtins" and name in safe_builtins:
 | 
						|
            return getattr(builtins, name)
 | 
						|
        # used by MultiServer -> savegame/multidata
 | 
						|
        if module == "NetUtils" and name in {"NetworkItem", "ClientStatus", "Hint", "SlotType", "NetworkSlot"}:
 | 
						|
            return getattr(self.net_utils_module, name)
 | 
						|
        # Options and Plando are unpickled by WebHost -> Generate
 | 
						|
        if module == "worlds.generic" and name in {"PlandoItem", "PlandoConnection"}:
 | 
						|
            return getattr(self.generic_properties_module, name)
 | 
						|
        if module.endswith("Options"):
 | 
						|
            if module == "Options":
 | 
						|
                mod = self.options_module
 | 
						|
            else:
 | 
						|
                mod = importlib.import_module(module)
 | 
						|
            obj = getattr(mod, name)
 | 
						|
            if issubclass(obj, self.options_module.Option):
 | 
						|
                return obj
 | 
						|
        # Forbid everything else.
 | 
						|
        raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
 | 
						|
                                     (module, name))
 | 
						|
 | 
						|
 | 
						|
def restricted_loads(s):
 | 
						|
    """Helper function analogous to pickle.loads()."""
 | 
						|
    return RestrictedUnpickler(io.BytesIO(s)).load()
 | 
						|
 | 
						|
 | 
						|
class KeyedDefaultDict(collections.defaultdict):
 | 
						|
    def __missing__(self, key):
 | 
						|
        self[key] = value = self.default_factory(key)
 | 
						|
        return value
 | 
						|
 | 
						|
 | 
						|
def get_text_between(text: str, start: str, end: str) -> str:
 | 
						|
    return text[text.index(start) + len(start): text.rindex(end)]
 | 
						|
 | 
						|
 | 
						|
loglevel_mapping = {'error': logging.ERROR, 'info': logging.INFO, 'warning': logging.WARNING, 'debug': logging.DEBUG}
 | 
						|
 | 
						|
 | 
						|
def init_logging(name: str, loglevel: typing.Union[str, int] = logging.INFO, write_mode: str = "w",
 | 
						|
                 log_format: str = "[%(name)s at %(asctime)s]: %(message)s",
 | 
						|
                 exception_logger: typing.Optional[str] = None):
 | 
						|
    loglevel: int = loglevel_mapping.get(loglevel, loglevel)
 | 
						|
    log_folder = user_path("logs")
 | 
						|
    os.makedirs(log_folder, exist_ok=True)
 | 
						|
    root_logger = logging.getLogger()
 | 
						|
    for handler in root_logger.handlers[:]:
 | 
						|
        root_logger.removeHandler(handler)
 | 
						|
        handler.close()
 | 
						|
    root_logger.setLevel(loglevel)
 | 
						|
    file_handler = logging.FileHandler(
 | 
						|
        os.path.join(log_folder, f"{name}.txt"),
 | 
						|
        write_mode,
 | 
						|
        encoding="utf-8-sig")
 | 
						|
    file_handler.setFormatter(logging.Formatter(log_format))
 | 
						|
    root_logger.addHandler(file_handler)
 | 
						|
    if sys.stdout:
 | 
						|
        root_logger.addHandler(
 | 
						|
            logging.StreamHandler(sys.stdout)
 | 
						|
        )
 | 
						|
 | 
						|
    # Relay unhandled exceptions to logger.
 | 
						|
    if not getattr(sys.excepthook, "_wrapped", False):  # skip if already modified
 | 
						|
        orig_hook = sys.excepthook
 | 
						|
 | 
						|
        def handle_exception(exc_type, exc_value, exc_traceback):
 | 
						|
            if issubclass(exc_type, KeyboardInterrupt):
 | 
						|
                sys.__excepthook__(exc_type, exc_value, exc_traceback)
 | 
						|
                return
 | 
						|
            logging.getLogger(exception_logger).exception("Uncaught exception",
 | 
						|
                                                          exc_info=(exc_type, exc_value, exc_traceback))
 | 
						|
            return orig_hook(exc_type, exc_value, exc_traceback)
 | 
						|
 | 
						|
        handle_exception._wrapped = True
 | 
						|
 | 
						|
        sys.excepthook = handle_exception
 | 
						|
 | 
						|
    logging.info(f"Archipelago ({__version__}) logging initialized.")
 | 
						|
 | 
						|
 | 
						|
def stream_input(stream, queue):
 | 
						|
    def queuer():
 | 
						|
        while 1:
 | 
						|
            text = stream.readline().strip()
 | 
						|
            if text:
 | 
						|
                queue.put_nowait(text)
 | 
						|
 | 
						|
    from threading import Thread
 | 
						|
    thread = Thread(target=queuer, name=f"Stream handler for {stream.name}", daemon=True)
 | 
						|
    thread.start()
 | 
						|
    return thread
 | 
						|
 | 
						|
 | 
						|
def tkinter_center_window(window: Tk):
 | 
						|
    window.update()
 | 
						|
    xPos = int(window.winfo_screenwidth() / 2 - window.winfo_reqwidth() / 2)
 | 
						|
    yPos = int(window.winfo_screenheight() / 2 - window.winfo_reqheight() / 2)
 | 
						|
    window.geometry("+{}+{}".format(xPos, yPos))
 | 
						|
 | 
						|
 | 
						|
class VersionException(Exception):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
# noinspection PyPep8Naming
 | 
						|
def format_SI_prefix(value, power=1000, power_labels=('', 'k', 'M', 'G', 'T', "P", "E", "Z", "Y")) -> str:
 | 
						|
    n = 0
 | 
						|
 | 
						|
    while value > power:
 | 
						|
        value /= power
 | 
						|
        n += 1
 | 
						|
    if type(value) == int:
 | 
						|
        return f"{value} {power_labels[n]}"
 | 
						|
    else:
 | 
						|
        return f"{value:0.3f} {power_labels[n]}"
 | 
						|
 | 
						|
 | 
						|
def get_fuzzy_ratio(word1: str, word2: str) -> float:
 | 
						|
    return (1 - jellyfish.damerau_levenshtein_distance(word1.lower(), word2.lower())
 | 
						|
            / max(len(word1), len(word2)))
 | 
						|
 | 
						|
 | 
						|
def get_fuzzy_results(input_word: str, wordlist: typing.Sequence[str], limit: typing.Optional[int] = None) \
 | 
						|
        -> typing.List[typing.Tuple[str, int]]:
 | 
						|
    limit: int = limit if limit else len(wordlist)
 | 
						|
    return list(
 | 
						|
        map(
 | 
						|
            lambda container: (container[0], int(container[1]*100)),  # convert up to limit to int %
 | 
						|
            sorted(
 | 
						|
                map(lambda candidate:
 | 
						|
                    (candidate,  get_fuzzy_ratio(input_word, candidate)),
 | 
						|
                    wordlist),
 | 
						|
                key=lambda element: element[1],
 | 
						|
                reverse=True)[0:limit]
 | 
						|
        )
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
def open_filename(title: str, filetypes: typing.Sequence[typing.Tuple[str, typing.Sequence[str]]]) \
 | 
						|
        -> typing.Optional[str]:
 | 
						|
    def run(*args: str):
 | 
						|
        return subprocess.run(args, capture_output=True, text=True).stdout.split('\n', 1)[0] or None
 | 
						|
 | 
						|
    if is_linux:
 | 
						|
        # prefer native dialog
 | 
						|
        kdialog = shutil.which('kdialog')
 | 
						|
        if kdialog:
 | 
						|
            k_filters = '|'.join((f'{text} (*{" *".join(ext)})' for (text, ext) in filetypes))
 | 
						|
            return run(kdialog, f'--title={title}', '--getopenfilename', '.', k_filters)
 | 
						|
        zenity = shutil.which('zenity')
 | 
						|
        if zenity:
 | 
						|
            z_filters = (f'--file-filter={text} ({", ".join(ext)}) | *{" *".join(ext)}' for (text, ext) in filetypes)
 | 
						|
            return run(zenity, f'--title={title}', '--file-selection', *z_filters)
 | 
						|
 | 
						|
    # fall back to tk
 | 
						|
    try:
 | 
						|
        import tkinter
 | 
						|
        import tkinter.filedialog
 | 
						|
    except Exception as e:
 | 
						|
        logging.error('Could not load tkinter, which is likely not installed. '
 | 
						|
                      f'This attempt was made because open_filename was used for "{title}".')
 | 
						|
        raise e
 | 
						|
    else:
 | 
						|
        root = tkinter.Tk()
 | 
						|
        root.withdraw()
 | 
						|
        return tkinter.filedialog.askopenfilename(title=title, filetypes=((t[0], ' '.join(t[1])) for t in filetypes))
 | 
						|
 | 
						|
 | 
						|
def messagebox(title: str, text: str, error: bool = False) -> None:
 | 
						|
    def is_kivy_running():
 | 
						|
        if 'kivy' in sys.modules:
 | 
						|
            from kivy.app import App
 | 
						|
            return App.get_running_app() is not None
 | 
						|
        return False
 | 
						|
 | 
						|
    if is_kivy_running():
 | 
						|
        from kvui import MessageBox
 | 
						|
        MessageBox(title, text, error).open()
 | 
						|
        return
 | 
						|
 | 
						|
    # fall back to tk
 | 
						|
    try:
 | 
						|
        import tkinter
 | 
						|
        from tkinter.messagebox import showerror, showinfo
 | 
						|
    except Exception as e:
 | 
						|
        logging.error('Could not load tkinter, which is likely not installed. '
 | 
						|
                      f'This attempt was made because messagebox was used for "{title}".')
 | 
						|
        raise e
 | 
						|
    else:
 | 
						|
        root = tkinter.Tk()
 | 
						|
        root.withdraw()
 | 
						|
        showerror(title, text) if error else showinfo(title, text)
 | 
						|
        root.update()
 |