WebHost: fix gen timeout/exception resource handling (#5540)

* WebHost: reset Generator proc title on error

* WebHost: fix shutting down autogen

This is still not perfect but solves some of the issues.

* WebHost: properly propagate JOB_TIME

* WebHost: handle autogen shutdown
This commit is contained in:
black-sliver
2025-10-20 07:16:29 +00:00
committed by GitHub
parent 11d18db452
commit 914a534a3b
4 changed files with 93 additions and 17 deletions

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import asyncio
import concurrent.futures
import json
import typing
import builtins
@@ -1138,3 +1139,40 @@ def is_iterable_except_str(obj: object) -> TypeGuard[typing.Iterable[typing.Any]
if isinstance(obj, str):
return False
return isinstance(obj, typing.Iterable)
class DaemonThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
"""
ThreadPoolExecutor that uses daemonic threads that do not keep the program alive.
NOTE: use this with caution because killed threads will not properly clean up.
"""
def _adjust_thread_count(self):
# see upstream ThreadPoolExecutor for details
import threading
import weakref
from concurrent.futures.thread import _worker
if self._idle_semaphore.acquire(timeout=0):
return
def weakref_cb(_, q=self._work_queue):
q.put(None)
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = f"{self._thread_name_prefix or self}_{num_threads}"
t = threading.Thread(
name=thread_name,
target=_worker,
args=(
weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs,
),
daemon=True,
)
t.start()
self._threads.add(t)
# NOTE: don't add to _threads_queues so we don't block on shutdown

View File

@@ -36,25 +36,39 @@ def handle_generation_failure(result: BaseException):
logging.exception(e)
def _mp_gen_game(gen_options: dict, meta: dict[str, Any] | None = None, owner=None, sid=None) -> PrimaryKey | None:
def _mp_gen_game(
gen_options: dict,
meta: dict[str, Any] | None = None,
owner=None,
sid=None,
timeout: int|None = None,
) -> PrimaryKey | None:
from setproctitle import setproctitle
setproctitle(f"Generator ({sid})")
res = gen_game(gen_options, meta=meta, owner=owner, sid=sid)
try:
return gen_game(gen_options, meta=meta, owner=owner, sid=sid, timeout=timeout)
finally:
setproctitle(f"Generator (idle)")
return res
def launch_generator(pool: multiprocessing.pool.Pool, generation: Generation):
def launch_generator(pool: multiprocessing.pool.Pool, generation: Generation, timeout: int|None) -> None:
try:
meta = json.loads(generation.meta)
options = restricted_loads(generation.options)
logging.info(f"Generating {generation.id} for {len(options)} players")
pool.apply_async(_mp_gen_game, (options,),
{"meta": meta,
pool.apply_async(
_mp_gen_game,
(options,),
{
"meta": meta,
"sid": generation.id,
"owner": generation.owner},
handle_generation_success, handle_generation_failure)
"owner": generation.owner,
"timeout": timeout,
},
handle_generation_success,
handle_generation_failure,
)
except Exception as e:
generation.state = STATE_ERROR
commit()
@@ -135,6 +149,7 @@ def autogen(config: dict):
with multiprocessing.Pool(config["GENERATORS"], initializer=init_generator,
initargs=(config,), maxtasksperchild=10) as generator_pool:
job_time = config["JOB_TIME"]
with db_session:
to_start = select(generation for generation in Generation if generation.state == STATE_STARTED)
@@ -145,7 +160,7 @@ def autogen(config: dict):
if sid:
generation.delete()
else:
launch_generator(generator_pool, generation)
launch_generator(generator_pool, generation, timeout=job_time)
commit()
select(generation for generation in Generation if generation.state == STATE_ERROR).delete()
@@ -157,7 +172,7 @@ def autogen(config: dict):
generation for generation in Generation
if generation.state == STATE_QUEUED).for_update()
for generation in to_start:
launch_generator(generator_pool, generation)
launch_generator(generator_pool, generation, timeout=job_time)
except AlreadyRunningException:
logging.info("Autogen reports as already running, not starting another.")

View File

@@ -14,7 +14,7 @@ from pony.orm import commit, db_session
from BaseClasses import get_seed, seeddigits
from Generate import PlandoOptions, handle_name, mystery_argparse
from Main import main as ERmain
from Utils import __version__, restricted_dumps
from Utils import __version__, restricted_dumps, DaemonThreadPoolExecutor
from WebHostLib import app
from settings import ServerOptions, GeneratorOptions
from .check import get_yaml_data, roll_options
@@ -107,7 +107,7 @@ def start_generation(options: dict[str, dict | str], meta: dict[str, Any]):
else:
try:
seed_id = gen_game({name: vars(options) for name, options in gen_options.items()},
meta=meta, owner=session["_id"].int)
meta=meta, owner=session["_id"].int, timeout=app.config["JOB_TIME"])
except BaseException as e:
from .autolauncher import handle_generation_failure
handle_generation_failure(e)
@@ -118,7 +118,7 @@ def start_generation(options: dict[str, dict | str], meta: dict[str, Any]):
return redirect(url_for("view_seed", seed=seed_id))
def gen_game(gen_options: dict, meta: dict[str, Any] | None = None, owner=None, sid=None):
def gen_game(gen_options: dict, meta: dict[str, Any] | None = None, owner=None, sid=None, timeout: int|None = None):
if meta is None:
meta = {}
@@ -172,11 +172,12 @@ def gen_game(gen_options: dict, meta: dict[str, Any] | None = None, owner=None,
ERmain(args, seed, baked_server_options=meta["server_options"])
return upload_to_db(target.name, sid, owner, race)
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
thread_pool = DaemonThreadPoolExecutor(max_workers=1)
thread = thread_pool.submit(task)
try:
return thread.result(app.config["JOB_TIME"])
return thread.result(timeout)
except concurrent.futures.TimeoutError as e:
if sid:
with db_session:
@@ -189,6 +190,9 @@ def gen_game(gen_options: dict, meta: dict[str, Any] | None = None, owner=None,
format_exception(e))
gen.meta = json.dumps(meta)
commit()
except (KeyboardInterrupt, SystemExit):
# don't update db, retry next time
raise
except BaseException as e:
if sid:
with db_session:
@@ -200,6 +204,11 @@ def gen_game(gen_options: dict, meta: dict[str, Any] | None = None, owner=None,
gen.meta = json.dumps(meta)
commit()
raise
finally:
# free resources claimed by thread pool, if possible
# NOTE: Timeout depends on the process being killed at some point
# since we can't actually cancel a running gen at the moment.
thread_pool.shutdown(wait=False, cancel_futures=True)
@app.route('/wait/<suuid:seed>')

View File

@@ -0,0 +1,14 @@
import unittest
from Utils import DaemonThreadPoolExecutor
class DaemonThreadPoolExecutorTest(unittest.TestCase):
def test_is_daemon(self) -> None:
def run() -> None:
pass
with DaemonThreadPoolExecutor(1) as executor:
executor.submit(run)
self.assertTrue(next(iter(executor._threads)).daemon)