ThreadPoolExecutorの終了処理

しかし、たとえば標準ライブラリの concurrent.futures.ThreadPoolExecutor はdamonスレッドを使っていません。 そのため、 executor.shutdown(wait=True) を atexit から呼び出すことができません。

atexitで終了させるスレッドはdaemonにしよう - methaneのブログ

と言っていましたが、この話題でThreadPoolExecutorを使うのはちょっとミスリードだった気がしたのと私もThreadPoolExecutorの終了処理をちゃんと把握していなかったので補足します。

まず、ThreadPoolExecutorはデフォルトで終了前にjoinされます。つまりThreadPoolExecutorにsubmitされたタスク全てが終了するのを待ってから終了します。なので自前で atexit を使って ThreadPoolExecutor.shutdown() を呼び出す必要はありませんし、やったとしても特に問題ありません。

この終了前の join はどう実装されているのか確認します。 [_Py_Finalize()] で _PyAtExit_Call() が呼ばれる前に、非daemonスレッドを待つ関数を読んでいます。

cpython/Python/pylifecycle.c at a2ba0a7552580f616f74091f8976410f8a310313 · python/cpython · GitHub

    // Wrap up existing "threading"-module-created, non-daemon threads.
    wait_for_thread_shutdown(tstate);

この関数は threading モジュールの _shutdown 関数を読んでいます。

cpython/Python/pylifecycle.c at a2ba0a7552580f616f74091f8976410f8a310313 · python/cpython · GitHub

wait_for_thread_shutdown(PyThreadState *tstate)
{
    PyObject *result;
    PyObject *threading = PyImport_GetModule(&_Py_ID(threading));
    if (threading == NULL) {
        ...
    }
    result = PyObject_CallMethodNoArgs(threading, &_Py_ID(_shutdown));

threading._shutdown() は非daemonスレッドの終了を待つ前に、 atexit モジュールではなく、 threadingモジュールの _threading_atexits に登録された関数を実行します。 _threading_atexits に登録するための関数は _register_atexit() です。

cpython/Lib/threading.py at a2ba0a7552580f616f74091f8976410f8a310313 · python/cpython · GitHub

_threading_atexits = []
_SHUTTING_DOWN = False


def _register_atexit(func, *arg, **kwargs):
    """CPython internal: register *func* to be called before joining threads.

    The registered *func* is called with its arguments just before all
    non-daemon threads are joined in `_shutdown()`. It provides a similar
    purpose to `atexit.register()`, but its functions are called prior to
    threading shutdown instead of interpreter shutdown.

    For similarity to atexit, the registered functions are called in reverse.
    """
    if _SHUTTING_DOWN:
        raise RuntimeError("can't register atexit after shutdown")

    _threading_atexits.append(lambda: func(*arg, **kwargs))

...

def _shutdown():
    """
    Wait until the Python thread state of all non-daemon threads get deleted.
    """
    # Obscure: other threads may be waiting to join _main_thread.  That's
    # dubious, but some code does it. We can't wait for it to be marked as done
    # normally - that won't happen until the interpreter is nearly dead. So
    # mark it done here.
    if _main_thread._os_thread_handle.is_done() and _is_main_interpreter():
        # _shutdown() was already called
        return

    global _SHUTTING_DOWN
    _SHUTTING_DOWN = True

    # Call registered threading atexit functions before threads are joined.
    # Order is reversed, similar to atexit.
    for atexit_call in reversed(_threading_atexits):
        atexit_call()

    if _is_main_interpreter():
        _main_thread._os_thread_handle._set_done()

    # Wait for all non-daemon threads to exit.
    _thread_shutdown()

ThreadPoolExecutorはこの threading._register_atexit() を使って全てのスレッドプールの終了を待ちます。

cpython/Lib/concurrent/futures/thread.py at a2ba0a7552580f616f74091f8976410f8a310313 · python/cpython · GitHub

def _python_exit():
    global _shutdown
    with _global_shutdown_lock:
        _shutdown = True
    items = list(_threads_queues.items())
    for t, q in items:
        q.put(None)
    for t, q in items:
        t.join()


# Register for `_python_exit()` to be called just before joining all
# non-daemon threads. This is used instead of `atexit.register()` for
# compatibility with subinterpreters, which no longer support daemon threads.
# See bpo-39812 for context.
threading._register_atexit(_python_exit)

ということで、uwsgiの中でグローバル変数executor = ThreadPoolExecutor() とかしていても特別なケアなしに graceful shutdown は実現できます。 uwsgi.atexit とかに頼る必要はありません。確かめてみましょう。

#wsgi.py
from concurrent.futures import ThreadPoolExecutor
import time

executor = ThreadPoolExecutor(max_workers=16)

def background(i):
    print(f"starting {i}")
    time.sleep(5)
    print(f"ending {i}")

counter = 0

def application(environ, start_response):
    global counter
    start_response("200 OK", [("Content-type", "text/plain; charset=utf-8")])
    count = counter
    counter += 1
    executor.submit(background, count)
    return [b"Hello, world"]
#!/bin/bash

# .venv ディレクトリがなかったら作る
if [ ! -d .venv ]; then
    uv venv
    uv pip install uwsgi
fi

# uwsgiをバックグラウンドで起動
.venv/bin/uwsgi --http-socket :4321 --enable-threads --module wsgi --callable application --lazy-app --threads=4 --die-on-term --master --pidfile uwsgi.pid -d uwsgi.log

for i in {0..50}
do
    curl http://127.0.0.1:4321/
    echo
done

kill -TERM $(cat uwsgi.pid)

uwsgi.log を tail すると、ちゃんと50番目のタスクを実行してから終了していることがわかります。

ending 43
ending 44
ending 45
ending 46
ending 47
ending 48
ending 49
ending 50
worker 1 buried after 20 seconds
goodbye to uWSGI.

めでたしめでたし。

このブログに乗せているコードは引用を除き CC0 1.0 で提供します。