しかし、たとえば標準ライブラリの concurrent.futures.ThreadPoolExecutor はdamonスレッドを使っていません。 そのため、 executor.shutdown(wait=True) を atexit から呼び出すことができません。
atexitで終了させるスレッドはdaemonにしよう - methaneのブログ
と言っていましたが、この話題でThreadPoolExecutorを使うのはちょっとミスリードだった気がしたのと私もThreadPoolExecutorの終了処理をちゃんと把握していなかったので補足します。
まず、ThreadPoolExecutorはデフォルトで終了前にjoinされます。つまりThreadPoolExecutorにsubmitされたタスク全てが終了するのを待ってから終了します。なので自前で atexit を使って ThreadPoolExecutor.shutdown() を呼び出す必要はありませんし、やったとしても特に問題ありません。
この終了前の join はどう実装されているのか確認します。 [_Py_Finalize()] で _PyAtExit_Call() が呼ばれる前に、非daemonスレッドを待つ関数を読んでいます。
cpython/Python/pylifecycle.c at a2ba0a7552580f616f74091f8976410f8a310313 · python/cpython · GitHub
// Wrap up existing "threading"-module-created, non-daemon threads. wait_for_thread_shutdown(tstate);
この関数は threading モジュールの _shutdown 関数を読んでいます。
cpython/Python/pylifecycle.c at a2ba0a7552580f616f74091f8976410f8a310313 · python/cpython · GitHub
wait_for_thread_shutdown(PyThreadState *tstate) { PyObject *result; PyObject *threading = PyImport_GetModule(&_Py_ID(threading)); if (threading == NULL) { ... } result = PyObject_CallMethodNoArgs(threading, &_Py_ID(_shutdown));
threading._shutdown() は非daemonスレッドの終了を待つ前に、 atexit モジュールではなく、 threadingモジュールの _threading_atexits に登録された関数を実行します。 _threading_atexits に登録するための関数は _register_atexit() です。
cpython/Lib/threading.py at a2ba0a7552580f616f74091f8976410f8a310313 · python/cpython · GitHub
_threading_atexits = [] _SHUTTING_DOWN = False def _register_atexit(func, *arg, **kwargs): """CPython internal: register *func* to be called before joining threads. The registered *func* is called with its arguments just before all non-daemon threads are joined in `_shutdown()`. It provides a similar purpose to `atexit.register()`, but its functions are called prior to threading shutdown instead of interpreter shutdown. For similarity to atexit, the registered functions are called in reverse. """ if _SHUTTING_DOWN: raise RuntimeError("can't register atexit after shutdown") _threading_atexits.append(lambda: func(*arg, **kwargs)) ... def _shutdown(): """ Wait until the Python thread state of all non-daemon threads get deleted. """ # Obscure: other threads may be waiting to join _main_thread. That's # dubious, but some code does it. We can't wait for it to be marked as done # normally - that won't happen until the interpreter is nearly dead. So # mark it done here. if _main_thread._os_thread_handle.is_done() and _is_main_interpreter(): # _shutdown() was already called return global _SHUTTING_DOWN _SHUTTING_DOWN = True # Call registered threading atexit functions before threads are joined. # Order is reversed, similar to atexit. for atexit_call in reversed(_threading_atexits): atexit_call() if _is_main_interpreter(): _main_thread._os_thread_handle._set_done() # Wait for all non-daemon threads to exit. _thread_shutdown()
ThreadPoolExecutorはこの threading._register_atexit() を使って全てのスレッドプールの終了を待ちます。
def _python_exit(): global _shutdown with _global_shutdown_lock: _shutdown = True items = list(_threads_queues.items()) for t, q in items: q.put(None) for t, q in items: t.join() # Register for `_python_exit()` to be called just before joining all # non-daemon threads. This is used instead of `atexit.register()` for # compatibility with subinterpreters, which no longer support daemon threads. # See bpo-39812 for context. threading._register_atexit(_python_exit)
ということで、uwsgiの中でグローバル変数に executor = ThreadPoolExecutor() とかしていても特別なケアなしに graceful shutdown は実現できます。 uwsgi.atexit とかに頼る必要はありません。確かめてみましょう。
#wsgi.py from concurrent.futures import ThreadPoolExecutor import time executor = ThreadPoolExecutor(max_workers=16) def background(i): print(f"starting {i}") time.sleep(5) print(f"ending {i}") counter = 0 def application(environ, start_response): global counter start_response("200 OK", [("Content-type", "text/plain; charset=utf-8")]) count = counter counter += 1 executor.submit(background, count) return [b"Hello, world"]
#!/bin/bash
# .venv ディレクトリがなかったら作る
if [ ! -d .venv ]; then
uv venv
uv pip install uwsgi
fi
# uwsgiをバックグラウンドで起動
.venv/bin/uwsgi --http-socket :4321 --enable-threads --module wsgi --callable application --lazy-app --threads=4 --die-on-term --master --pidfile uwsgi.pid -d uwsgi.log
for i in {0..50}
do
curl http://127.0.0.1:4321/
echo
done
kill -TERM $(cat uwsgi.pid)
uwsgi.log を tail すると、ちゃんと50番目のタスクを実行してから終了していることがわかります。
ending 43 ending 44 ending 45 ending 46 ending 47 ending 48 ending 49 ending 50 worker 1 buried after 20 seconds goodbye to uWSGI.
めでたしめでたし。