コネクションプールなしでhttpxを使う場合の高速化

httpx をコネクションプールありで使う場合は client = httpx.Client() して client.get() などを使いますが、コネクションプールなしで使う場合は httpx.get() などを使います。

この httpx.get() のような単発でHTTPリクエストを実行するAPIは実際には内部で httpx.Client()インスタンスを生成して破棄しています。実はこの Client()インスタンス生成が遅いのです。

# x1.py
from httpx import Client
for _ in range(100):
    c = Client()
$ hyperfine '.venv/bin/python x1.py'
Benchmark 1: .venv/bin/python x1.py
  Time (mean ± σ):      1.484 s ±  0.018 s    [User: 1.421 s, System: 0.041 s]
  Range (min … max):    1.455 s …  1.522 s    10 runs

100回クライアントを作るのに1.5秒かかっています。1回あたり15msです。もうちょっとなんとかしたい。

実はこのクライアントの生成時間のほとんどは、SSLContext()を作るのに使われています。コネクションプールを使わない場合もSSLContextだけを使い回すことでhttpxを高速化できます。

from httpx import Client, create_ssl_context
ssl_context = create_ssl_context()
for _ in range(100):
    c = Client(verify=ssl_context)
$ hyperfine '.venv/bin/python x1.py' '.venv/bin/python x2.py'
Benchmark 1: .venv/bin/python x1.py
  Time (mean ± σ):      1.477 s ±  0.024 s    [User: 1.412 s, System: 0.040 s]
  Range (min … max):    1.453 s …  1.532 s    10 runs

Benchmark 2: .venv/bin/python x2.py
  Time (mean ± σ):     120.7 ms ±   5.3 ms    [User: 85.6 ms, System: 15.2 ms]
  Range (min … max):   104.2 ms … 129.6 ms    22 runs

Summary
  .venv/bin/python x2.py ran
   12.23 ± 0.58 times faster than .venv/bin/python x1.py

10倍以上高速化できました。

実際に使う場合には get() などのメソッドが verify キーワード引数を受け取るのでそこに SSLContext を渡します。

import httpx

ssl_context = httpx.create_ssl_context()
res = httpx.get('https://google.com', verify=ssl_context)
print("Status:", res.status_code)
print(res.text[:100])

ThreadPoolExecutorの終了処理

しかし、たとえば標準ライブラリの concurrent.futures.ThreadPoolExecutor はdamonスレッドを使っていません。 そのため、 executor.shutdown(wait=True) を atexit から呼び出すことができません。

atexitで終了させるスレッドはdaemonにしよう - methaneのブログ

と言っていましたが、この話題でThreadPoolExecutorを使うのはちょっとミスリードだった気がしたのと私もThreadPoolExecutorの終了処理をちゃんと把握していなかったので補足します。

まず、ThreadPoolExecutorはデフォルトで終了前にjoinされます。つまりThreadPoolExecutorにsubmitされたタスク全てが終了するのを待ってから終了します。なので自前で atexit を使って ThreadPoolExecutor.shutdown() を呼び出す必要はありませんし、やったとしても特に問題ありません。

この終了前の join はどう実装されているのか確認します。 [_Py_Finalize()] で _PyAtExit_Call() が呼ばれる前に、非daemonスレッドを待つ関数を読んでいます。

cpython/Python/pylifecycle.c at a2ba0a7552580f616f74091f8976410f8a310313 · python/cpython · GitHub

    // Wrap up existing "threading"-module-created, non-daemon threads.
    wait_for_thread_shutdown(tstate);

この関数は threading モジュールの _shutdown 関数を読んでいます。

cpython/Python/pylifecycle.c at a2ba0a7552580f616f74091f8976410f8a310313 · python/cpython · GitHub

wait_for_thread_shutdown(PyThreadState *tstate)
{
    PyObject *result;
    PyObject *threading = PyImport_GetModule(&_Py_ID(threading));
    if (threading == NULL) {
        ...
    }
    result = PyObject_CallMethodNoArgs(threading, &_Py_ID(_shutdown));

threading._shutdown() は非daemonスレッドの終了を待つ前に、 atexit モジュールではなく、 threadingモジュールの _threading_atexits に登録された関数を実行します。 _threading_atexits に登録するための関数は _register_atexit() です。

cpython/Lib/threading.py at a2ba0a7552580f616f74091f8976410f8a310313 · python/cpython · GitHub

_threading_atexits = []
_SHUTTING_DOWN = False


def _register_atexit(func, *arg, **kwargs):
    """CPython internal: register *func* to be called before joining threads.

    The registered *func* is called with its arguments just before all
    non-daemon threads are joined in `_shutdown()`. It provides a similar
    purpose to `atexit.register()`, but its functions are called prior to
    threading shutdown instead of interpreter shutdown.

    For similarity to atexit, the registered functions are called in reverse.
    """
    if _SHUTTING_DOWN:
        raise RuntimeError("can't register atexit after shutdown")

    _threading_atexits.append(lambda: func(*arg, **kwargs))

...

def _shutdown():
    """
    Wait until the Python thread state of all non-daemon threads get deleted.
    """
    # Obscure: other threads may be waiting to join _main_thread.  That's
    # dubious, but some code does it. We can't wait for it to be marked as done
    # normally - that won't happen until the interpreter is nearly dead. So
    # mark it done here.
    if _main_thread._os_thread_handle.is_done() and _is_main_interpreter():
        # _shutdown() was already called
        return

    global _SHUTTING_DOWN
    _SHUTTING_DOWN = True

    # Call registered threading atexit functions before threads are joined.
    # Order is reversed, similar to atexit.
    for atexit_call in reversed(_threading_atexits):
        atexit_call()

    if _is_main_interpreter():
        _main_thread._os_thread_handle._set_done()

    # Wait for all non-daemon threads to exit.
    _thread_shutdown()

ThreadPoolExecutorはこの threading._register_atexit() を使って全てのスレッドプールの終了を待ちます。

cpython/Lib/concurrent/futures/thread.py at a2ba0a7552580f616f74091f8976410f8a310313 · python/cpython · GitHub

def _python_exit():
    global _shutdown
    with _global_shutdown_lock:
        _shutdown = True
    items = list(_threads_queues.items())
    for t, q in items:
        q.put(None)
    for t, q in items:
        t.join()


# Register for `_python_exit()` to be called just before joining all
# non-daemon threads. This is used instead of `atexit.register()` for
# compatibility with subinterpreters, which no longer support daemon threads.
# See bpo-39812 for context.
threading._register_atexit(_python_exit)

ということで、uwsgiの中でグローバル変数executor = ThreadPoolExecutor() とかしていても特別なケアなしに graceful shutdown は実現できます。 uwsgi.atexit とかに頼る必要はありません。確かめてみましょう。

#wsgi.py
from concurrent.futures import ThreadPoolExecutor
import time

executor = ThreadPoolExecutor(max_workers=16)

def background(i):
    print(f"starting {i}")
    time.sleep(5)
    print(f"ending {i}")

counter = 0

def application(environ, start_response):
    global counter
    start_response("200 OK", [("Content-type", "text/plain; charset=utf-8")])
    count = counter
    counter += 1
    executor.submit(background, count)
    return [b"Hello, world"]
#!/bin/bash

# .venv ディレクトリがなかったら作る
if [ ! -d .venv ]; then
    uv venv
    uv pip install uwsgi
fi

# uwsgiをバックグラウンドで起動
.venv/bin/uwsgi --http-socket :4321 --enable-threads --module wsgi --callable application --lazy-app --threads=4 --die-on-term --master --pidfile uwsgi.pid -d uwsgi.log

for i in {0..50}
do
    curl http://127.0.0.1:4321/
    echo
done

kill -TERM $(cat uwsgi.pid)

uwsgi.log を tail すると、ちゃんと50番目のタスクを実行してから終了していることがわかります。

ending 43
ending 44
ending 45
ending 46
ending 47
ending 48
ending 49
ending 50
worker 1 buried after 20 seconds
goodbye to uWSGI.

めでたしめでたし。

タイプヒントには「実装の最小要件」ではなく「想定範囲」を表す型を書く

リストを受け取ってループで処理する関数を実装するとき、引数のタイプヒントに list ではなく最小の要求として Iterable を書くことを好む人がいる。コードの実装が引数に対して必要としている最小要件(必要十分条件)を表すためだ。

def func(arg: Iterable[int]) -> None:
    for a in arg:
        do_work(a)

しかし、その関数でログかトレースにその引数の中身を追加したくなった場合にどうしたらいいだろうか? OpenTelemetryのAttributeValue型Sequenceには対応しているがIterableには対応していない。 また、Iterableを一度巡回してしまうと再び巡回できる保証はないので、 arg の中身を複数回使うことができない。

引数のタイプヒントをlistSequenceに修正しようと思っても、他のコードも「最小要件原則」で書かれていると大量の呼び出し元のコードのtype hintも次々に修正しないといけなくなる。もしこの関数のユーザーがチーム外で、後方互換性を保つ必要があるのであれば、そもそもこの修正はできない。

そこで諦めてタイプヒントを修正せずに対応すると次のようになる。

def func(arg: Iterable[int]) -> None:
    # arg : list[int] = list(arg) # Mypyは再定義をエラーにする。
    arg = list(arg)
    with tracer.start_span("func") as span:
        span.set_attribute("arg", arg)
        for a in arg:
            do_work(a)

ここで3つのコストが発生した。

  • 一度修正を試みてから、影響範囲が広いからという理由で修正を断念するまでの作業コスト
  • 引数を毎回 list(arg) する実行コスト
  • argの型が途中で Iterable から list に変わることによる認知負荷。(list型に別の名前をつけても2つの変数の認知負荷になるだけである。)

もし、この関数が最初からlistを受け取る用途しか想定していないなら型ヒントには list を使うべきだったし、listかtupleのどちらかを受け取ることを想定していたならlist|tupleSequenceを使うべきだった。このように、タイプヒントには「(今の)実装が求める最小要件」ではなく「想定している引数の型の範囲」を表すべきである。

しかし、想定する引数の型を最初から完全に決めるのは難しい。試しにこの関数のtype hintがlistだったのにtupleを渡したくなった場合を考えてみよう。上の例と逆に「具体的すぎた」ケースだ。

  • タイプヒントを list | tupleSequence に修正する場合、既存の呼び出し元は list 型の値を渡しているので芋づる式に大量の呼び出し元の修正は必要ない。
  • タイプヒントを変えずに呼び出し元で list(arg) に変換する場合も、変換コストがかかるのはその1箇所だけで済む。

このように、「実装の最小要件」を使うポリシーよりも「必要になるまで具体型を使う」ポリシーの方が対応コストが低くなることが多い。どこまでの抽象度の型を受け取るべきか判断を後回しにしたい場合は、とりあえず具体型を使うことにしよう。変更の必要が生じた時は、その関数のあるべき仕様をより正しく理解してタイプヒントを書けるはずだ。

戻り値についても考えてみよう。戻り値を list から Sequence に変更するのは破壊的変更になるので、特にライブラリの公開APIのように利用側コードが別チームで開発されている場合は簡単に変更できない。ただしこれはタイプヒントだけの問題ではない。タイプヒントのないコードでも、 list を返していた関数が tuple 型を返すようになったら破壊的変更になる。だから破壊的変更を恐れてなるべく抽象度の高い型を選ぶ必要性は薄い。そもそも戻り値の型をlistから別のシーケンス型に変えるケースなんてどれくらいあるだろうか?稀に内部処理を変更して処理結果が tuple になることがあったとして、後方互換性を保つ必要があるならlistに変換して返せばいいだけだ。なので、引数よりは少し気を遣うとはいえ、最初から list と書いてしまって良い場合が多い。

結論:

  • タイプヒントには「実装の最小要件」ではなく「想定範囲」を表す型を書く。
  • 迷ったらとりあえず具体型を書いて、必要になってから抽象型に変える。

atexitで終了させるスレッドはdaemonにしよう

なにかの処理をバックグラウンドスレッドで実行して、アプリケーション終了時にその処理を止めたいことがあります。 たとえばOpenTelemetryのトレースやログを送信するためにスレッドが使われていますが、それらは終了時にバッファリングしているデータを送信してから終了します。

コマンドラインアプリケーションでは、main関数の終了時にバックグラウンドスレッドの終了を呼び出して待つことができます。

# th_main.py
import threading
import time

alive = True


def bgmain():
    while alive:
        print("hello")
        time.sleep(1)


th = threading.Thread(target=bgmain)
th.start()


def main():
    global alive
    try:
        while True:
            time.sleep(1)
    finally:
        alive = False
        th.join()


main()

実行例:

$ python3 th_main.py
hello
hello
^CTraceback (most recent call last):
  File "/Users/inada-n/work/th_main.py", line 27, in <module>
    main()
    ~~~~^^
  File "/Users/inada-n/work/th_main.py", line 21, in main
    time.sleep(1)
    ~~~~~~~~~~^^^
KeyboardInterrupt

しかし、wsgiアプリケーションのようにmain関数が無い場合があります。そのような場合に終了処理を実装するための標準ライブラリとして提供されているのがatexitモジュールです。次のようにすればバックグラウンドスレッドを止められそうに見えます。

# th_atexit.py
import threading
import time
import atexit

alive = True


def bgmain():
    while alive:
        print("hello")
        time.sleep(1)


th = threading.Thread(target=bgmain)
th.start()


@atexit.register
def on_exit():
    global alive
    print("atexit")
    alive = False
    th.join()


def main():
    while True:
        time.sleep(1)


main()

しかし、このプログラムを止めるためにはCtrl-Cを2回押す必要がありました。コマンドラインプログラムだから停止できましたが、wsgiアプリケーションだとしたら強制的に終了されるまで待ち続けることになります。

$ python3 th_atexit.py
hello
hello
hello
^CTraceback (most recent call last):
  File "/Users/inada-n/work/th_atexit.py", line 32, in <module>
    main()
    ~~~~^^
  File "/Users/inada-n/work/th_atexit.py", line 29, in main
    time.sleep(1)
    ~~~~~~~~~~^^^
KeyboardInterrupt
hello
hello
^CTraceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.13/3.13.5/Frameworks/Python.framework/Versions/3.13/lib/python3.13/threading.py", line 1542, in _shutdown
    _thread_shutdown()
KeyboardInterrupt:
atexit

ateixit が表示されているのが二回目のCtrl-Cの後、最後であることがわかります。 これはatexitが呼ばれるタイミングの問題です。

Py_Finalize(PyRuntimeState *runtime)

    // Block some operations.
    tstate->interp->finalizing = 1;


    // Wrap up existing "threading"-module-created, non-daemon threads.
    wait_for_thread_shutdown(tstate);


    // Make any remaining pending calls.
    _Py_FinishPendingCalls(tstate);


    /* The interpreter is still entirely intact at this point, and the
     * exit funcs may be relying on that.  In particular, if some thread
     * or exit func is still waiting to do an import, the import machinery
     * expects Py_IsInitialized() to return true.  So don't say the
     * runtime is uninitialized until after the exit funcs have run.
     * Note that Threading.py uses an exit func to do a join on all the
     * threads created thru it, so this also protects pending imports in
     * the threads created via Threading.
     */


    _PyAtExit_Call(tstate->interp);

atexitが呼ばれるのは非daemonスレッドが全て終了してからだということがわかります。

Pythonのドキュメントにはdaemonスレッドについてこう書かれています。

スレッドには "デーモンスレッド (daemon thread)" であるというフラグを立てられます。 このフラグには、残っているスレッドがデーモンスレッドだけになった時に Python プログラム全体を終了させるという意味があります。フラグの初期値はスレッドを生成したスレッドから継承します。フラグの値は daemon プロパティまたは daemon コンストラクタ引数を通して設定できます。

注釈 デーモンスレッドは終了時にいきなり停止されます。デーモンスレッドで使われたリソース (開いているファイル、データベースのトランザクションなど) は適切に解放されないかもしれません。きちんと (gracefully) スレッドを停止したい場合は、スレッドを非デーモンスレッドにして、Event のような適切なシグナル送信機構を使用してください。

https://docs.python.org/ja/3.12/library/threading.html#thread-objects

このドキュメントを読むと、アプリケーション終了時にきちんと終了させるスレッドはデーモンスレッドにするべきでないということになります。しかし、atexitから終了処理を実行する場合はデーモンスレッドにする必要があるので注意が必要です。

先ほどのプログラムをデーモンスレッドに修正してみましょう。

# th_atexit2.py
import threading
import time
import atexit

alive = True


def bgmain():
    while alive:
        print("hello")
        time.sleep(1)


th = threading.Thread(target=bgmain, daemon=True)  # daemonスレッドに
th.start()


@atexit.register
def on_exit():
    global alive
    print("atexit")
    alive = False
    th.join()


def main():
    while True:
        time.sleep(1)


main()
$ python3 th_atexit2.py
hello
hello
hello
^CTraceback (most recent call last):
  File "/Users/inada-n/work/th_atexit2.py", line 32, in <module>
    main()
    ~~~~^^
  File "/Users/inada-n/work/th_atexit2.py", line 29, in main
    time.sleep(1)
    ~~~~~~~~~~^^^
KeyboardInterrupt
atexit

きちんと1回のCtrl-Cでatexitが呼ばれて終了しました。

しかし、たとえば標準ライブラリの concurrent.futures.ThreadPoolExecutor はdamonスレッドを使っていません。 そのため、 executor.shutdown(wait=True) を atexit から呼び出すことができません。

この問題は現在コア開発者で話し合いが進められていますが、まだ解決のためのAPIは実装されていません。 今は非デーモンスレッドをバックグラウンドで使うライブラリにモンキーパッチを当てていくか、WSGIサーバーが提供する独自APIを使う必要があります。

例えば uwsgi には uwsgi.atexit というAPIがあり、これに登録された処理は Py_Finalize() を呼び出す前に実行されるので ThreadPoolExecutorを止めることができます。

9/2追記: ThreadPoolExecutorはatexit実行前に終了待ちされるので、 executor.shutdown() などを atexit から実行する必要はありません。問題になるのはバックグラウンドスレッドで動き続けている処理を止めるためにatexitを使う場合だけで、ThreadPoolExecutorの一般的な使い方であるそれなりの時間で終わる処理をバックグラウンドで実行するだけであれば問題ありません。次の補足記事を参照してください。

methane.hatenablog.jp

コンテナのPythonからMySQLにzstd圧縮を有効にして接続する

mysql-connector-python を使えば簡単なのですが、あえて mysqlclient を使う場合の話です。

まず、Pythonの公式DockerイメージはDebianベースになっていますが、Debianではデフォルトではapt-getでMySQLをインストールできません。 (Debian sid では MySQL が復活していますが、stableになるには時間がかかります)

代わりにmariadbのクライアントライブラリ(libmariadb)はインストールできるのですが、MariaDBはzstd圧縮に対応しておらず、そのせいかクライアントライブラリはzstd圧縮に対応しているもののDebian bookwormのパッケージには入っていないので使えません。

なので標準の apt-get だけでzstd圧縮を使う方法がありません。代わりにライブラリをインストールする必要があります。

まず MySQL なんですが、8.0からMySQL Connector/Cの配布を止めてしまっていて、MySQLを入れろというふうになっていて面倒です。 また apt レポジトリも公開してくれているのですが、 i386amd64 はあるのに arm64 がないのでそれも面倒です。

次に MariaDB ですが、次のダウンロードページを見てみると、Debian用のバイナリをarm64とamd64でバイナリパッケージを tar.gz で提供してくれています。

https://mariadb.com/downloads/connectors/

これを使ってmysqlclientを用意してみます。

FROM python:3.13

# 先にデフォルトのlibmariadbを消しておく。 python:3.13-slim の場合は省略可能
RUN apt-get -y purge libmariadb3 libmariadb-dev

# MariaDB Connector/C をインストールする
WORKDIR /usr/local
ADD https://dlm.mariadb.com/4234446/Connectors/c/connector-c-3.4.5/mariadb-connector-c-3.4.5-debian-bookworm-aarch64.tar.gz /usr/local/mariadb-connector.tar.gz
# AMD64の場合はこっち
# ADD https://dlm.mariadb.com/4234464/Connectors/c/connector-c-3.4.5/mariadb-connector-c-3.4.5-debian-bookworm-amd64.tar.gz /usr/local/mariadb-connector.tar.gz

RUN tar xf mariadb-connector.tar.gz --strip-components=1 && rm mariadb-connector.tar.gz

# /usr/local/lib/mariadb の下のライブラリはデフォルトで見つけてくれないので、 libmariadb.so(.3) へのsymlink を ld.so.conf で指定されているディレクトリに作る
# または下にある方法で rpath を設定しても良い
WORKDIR /usr/local/lib
RUN ln -s mariadb/libmariadb.so* . && ldconfig

# mysqlclient をインストール
WORKDIR /home
RUN pip install -v mysqlclient

# rpath を設定する場合
#RUN MYSQLCLIENT_CFLAGS=$(/usr/local/bin/mariadb_config --cflags) \
#    MYSQLCLIENT_LDFLAGS="$(/usr/local/bin/mariadb_config --libs) -Wl,-rpath=/usr/local/lib/mariadb" \
#    pip install -v mysqlclient

これで zstd 接続ができているかを確認しましょう。

$ docker run --network host --name mysql -e MYSQL_ROOT_PASSWORD=my-secret-pw -d mysql:8.0

$ docker build -t hoge .
$ docker run --network host -it hoge bash

root@colima:/home# ldd /usr/local/lib/python3.13/site-packages/MySQLdb/_mysql.cpython-313-aarch64-linux-gnu.so
        linux-vdso.so.1 (0x0000fc6873027000)
        libmariadb.so.3 => /usr/local/lib/libmariadb.so.3 (0x0000fc6872f40000)
        libc.so.6 => /lib/aarch64-linux-gnu/libc.so.6 (0x0000fc6872d90000)
        libssl.so.3 => /lib/aarch64-linux-gnu/libssl.so.3 (0x0000fc6872cd0000)
        libcrypto.so.3 => /lib/aarch64-linux-gnu/libcrypto.so.3 (0x0000fc6872800000)
        /lib/ld-linux-aarch64.so.1 (0x0000fc6872fea000)

root@colima:/home# python
Python 3.13.3 (main, Apr  9 2025, 02:20:29) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import MySQLdb
>>> con = MySQLdb.connect(host="127.0.0.1", user="root", password="my-secret-pw", compress=True)
>>> cur = con.cursor()
>>> cur.execute('SHOW STATUS LIKE "Compress%"')
3
>>> for v in cur:
...     print(v)
...
('Compression', 'ON')
('Compression_algorithm', 'zstd')
('Compression_level', '3')
>>>
このブログに乗せているコードは引用を除き CC0 1.0 で提供します。