From 914a534a3b11cf2ddb0a25a8023b6207299b34bf Mon Sep 17 00:00:00 2001 From: black-sliver <59490463+black-sliver@users.noreply.github.com> Date: Mon, 20 Oct 2025 07:16:29 +0000 Subject: [PATCH] WebHost: fix gen timeout/exception resource handling (#5540) * WebHost: reset Generator proc title on error * WebHost: fix shutting down autogen This is still not perfect but solves some of the issues. * WebHost: properly propagate JOB_TIME * WebHost: handle autogen shutdown --- Utils.py | 38 ++++++++++++++++++++++++++ WebHostLib/autolauncher.py | 39 ++++++++++++++++++--------- WebHostLib/generate.py | 19 +++++++++---- test/utils/test_daemon_thread_pool.py | 14 ++++++++++ 4 files changed, 93 insertions(+), 17 deletions(-) create mode 100644 test/utils/test_daemon_thread_pool.py diff --git a/Utils.py b/Utils.py index 4fe9c1b4..e79e5418 100644 --- a/Utils.py +++ b/Utils.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import concurrent.futures import json import typing import builtins @@ -1138,3 +1139,40 @@ def is_iterable_except_str(obj: object) -> TypeGuard[typing.Iterable[typing.Any] if isinstance(obj, str): return False return isinstance(obj, typing.Iterable) + + +class DaemonThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor): + """ + ThreadPoolExecutor that uses daemonic threads that do not keep the program alive. + NOTE: use this with caution because killed threads will not properly clean up. + """ + + def _adjust_thread_count(self): + # see upstream ThreadPoolExecutor for details + import threading + import weakref + from concurrent.futures.thread import _worker + + if self._idle_semaphore.acquire(timeout=0): + return + + def weakref_cb(_, q=self._work_queue): + q.put(None) + + num_threads = len(self._threads) + if num_threads < self._max_workers: + thread_name = f"{self._thread_name_prefix or self}_{num_threads}" + t = threading.Thread( + name=thread_name, + target=_worker, + args=( + weakref.ref(self, weakref_cb), + self._work_queue, + self._initializer, + self._initargs, + ), + daemon=True, + ) + t.start() + self._threads.add(t) + # NOTE: don't add to _threads_queues so we don't block on shutdown diff --git a/WebHostLib/autolauncher.py b/WebHostLib/autolauncher.py index 719963e3..fb1f91ae 100644 --- a/WebHostLib/autolauncher.py +++ b/WebHostLib/autolauncher.py @@ -36,25 +36,39 @@ def handle_generation_failure(result: BaseException): logging.exception(e) -def _mp_gen_game(gen_options: dict, meta: dict[str, Any] | None = None, owner=None, sid=None) -> PrimaryKey | None: +def _mp_gen_game( + gen_options: dict, + meta: dict[str, Any] | None = None, + owner=None, + sid=None, + timeout: int|None = None, +) -> PrimaryKey | None: from setproctitle import setproctitle setproctitle(f"Generator ({sid})") - res = gen_game(gen_options, meta=meta, owner=owner, sid=sid) - setproctitle(f"Generator (idle)") - return res + try: + return gen_game(gen_options, meta=meta, owner=owner, sid=sid, timeout=timeout) + finally: + setproctitle(f"Generator (idle)") -def launch_generator(pool: multiprocessing.pool.Pool, generation: Generation): +def launch_generator(pool: multiprocessing.pool.Pool, generation: Generation, timeout: int|None) -> None: try: meta = json.loads(generation.meta) options = restricted_loads(generation.options) logging.info(f"Generating {generation.id} for {len(options)} players") - pool.apply_async(_mp_gen_game, (options,), - {"meta": meta, - "sid": generation.id, - "owner": generation.owner}, - handle_generation_success, handle_generation_failure) + pool.apply_async( + _mp_gen_game, + (options,), + { + "meta": meta, + "sid": generation.id, + "owner": generation.owner, + "timeout": timeout, + }, + handle_generation_success, + handle_generation_failure, + ) except Exception as e: generation.state = STATE_ERROR commit() @@ -135,6 +149,7 @@ def autogen(config: dict): with multiprocessing.Pool(config["GENERATORS"], initializer=init_generator, initargs=(config,), maxtasksperchild=10) as generator_pool: + job_time = config["JOB_TIME"] with db_session: to_start = select(generation for generation in Generation if generation.state == STATE_STARTED) @@ -145,7 +160,7 @@ def autogen(config: dict): if sid: generation.delete() else: - launch_generator(generator_pool, generation) + launch_generator(generator_pool, generation, timeout=job_time) commit() select(generation for generation in Generation if generation.state == STATE_ERROR).delete() @@ -157,7 +172,7 @@ def autogen(config: dict): generation for generation in Generation if generation.state == STATE_QUEUED).for_update() for generation in to_start: - launch_generator(generator_pool, generation) + launch_generator(generator_pool, generation, timeout=job_time) except AlreadyRunningException: logging.info("Autogen reports as already running, not starting another.") diff --git a/WebHostLib/generate.py b/WebHostLib/generate.py index 1bde8f78..cb61d144 100644 --- a/WebHostLib/generate.py +++ b/WebHostLib/generate.py @@ -14,7 +14,7 @@ from pony.orm import commit, db_session from BaseClasses import get_seed, seeddigits from Generate import PlandoOptions, handle_name, mystery_argparse from Main import main as ERmain -from Utils import __version__, restricted_dumps +from Utils import __version__, restricted_dumps, DaemonThreadPoolExecutor from WebHostLib import app from settings import ServerOptions, GeneratorOptions from .check import get_yaml_data, roll_options @@ -107,7 +107,7 @@ def start_generation(options: dict[str, dict | str], meta: dict[str, Any]): else: try: seed_id = gen_game({name: vars(options) for name, options in gen_options.items()}, - meta=meta, owner=session["_id"].int) + meta=meta, owner=session["_id"].int, timeout=app.config["JOB_TIME"]) except BaseException as e: from .autolauncher import handle_generation_failure handle_generation_failure(e) @@ -118,7 +118,7 @@ def start_generation(options: dict[str, dict | str], meta: dict[str, Any]): return redirect(url_for("view_seed", seed=seed_id)) -def gen_game(gen_options: dict, meta: dict[str, Any] | None = None, owner=None, sid=None): +def gen_game(gen_options: dict, meta: dict[str, Any] | None = None, owner=None, sid=None, timeout: int|None = None): if meta is None: meta = {} @@ -172,11 +172,12 @@ def gen_game(gen_options: dict, meta: dict[str, Any] | None = None, owner=None, ERmain(args, seed, baked_server_options=meta["server_options"]) return upload_to_db(target.name, sid, owner, race) - thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + + thread_pool = DaemonThreadPoolExecutor(max_workers=1) thread = thread_pool.submit(task) try: - return thread.result(app.config["JOB_TIME"]) + return thread.result(timeout) except concurrent.futures.TimeoutError as e: if sid: with db_session: @@ -189,6 +190,9 @@ def gen_game(gen_options: dict, meta: dict[str, Any] | None = None, owner=None, format_exception(e)) gen.meta = json.dumps(meta) commit() + except (KeyboardInterrupt, SystemExit): + # don't update db, retry next time + raise except BaseException as e: if sid: with db_session: @@ -200,6 +204,11 @@ def gen_game(gen_options: dict, meta: dict[str, Any] | None = None, owner=None, gen.meta = json.dumps(meta) commit() raise + finally: + # free resources claimed by thread pool, if possible + # NOTE: Timeout depends on the process being killed at some point + # since we can't actually cancel a running gen at the moment. + thread_pool.shutdown(wait=False, cancel_futures=True) @app.route('/wait/') diff --git a/test/utils/test_daemon_thread_pool.py b/test/utils/test_daemon_thread_pool.py new file mode 100644 index 00000000..b8702492 --- /dev/null +++ b/test/utils/test_daemon_thread_pool.py @@ -0,0 +1,14 @@ +import unittest + +from Utils import DaemonThreadPoolExecutor + + +class DaemonThreadPoolExecutorTest(unittest.TestCase): + def test_is_daemon(self) -> None: + def run() -> None: + pass + + with DaemonThreadPoolExecutor(1) as executor: + executor.submit(run) + + self.assertTrue(next(iter(executor._threads)).daemon)