タイプヒントには「実装の最小要件」ではなく「想定範囲」を表す型を書く
リストを受け取ってループで処理する関数を実装するとき、引数のタイプヒントに list ではなく最小の要求として Iterable を書くことを好む人がいる。コードの実装が引数に対して必要としている最小要件(必要十分条件)を表すためだ。
def func(arg: Iterable[int]) -> None: for a in arg: do_work(a)
しかし、その関数でログかトレースにその引数の中身を追加したくなった場合にどうしたらいいだろうか?
OpenTelemetryのAttributeValue型はSequenceには対応しているがIterableには対応していない。
また、Iterableを一度巡回してしまうと再び巡回できる保証はないので、 arg の中身を複数回使うことができない。
引数のタイプヒントをlistかSequenceに修正しようと思っても、他のコードも「最小要件原則」で書かれていると大量の呼び出し元のコードのtype hintも次々に修正しないといけなくなる。もしこの関数のユーザーがチーム外で、後方互換性を保つ必要があるのであれば、そもそもこの修正はできない。
そこで諦めてタイプヒントを修正せずに対応すると次のようになる。
def func(arg: Iterable[int]) -> None: # arg : list[int] = list(arg) # Mypyは再定義をエラーにする。 arg = list(arg) with tracer.start_span("func") as span: span.set_attribute("arg", arg) for a in arg: do_work(a)
ここで3つのコストが発生した。
- 一度修正を試みてから、影響範囲が広いからという理由で修正を断念するまでの作業コスト
- 引数を毎回
list(arg)する実行コスト - argの型が途中で
Iterableからlistに変わることによる認知負荷。(list型に別の名前をつけても2つの変数の認知負荷になるだけである。)
もし、この関数が最初からlistを受け取る用途しか想定していないなら型ヒントには list を使うべきだったし、listかtupleのどちらかを受け取ることを想定していたならlist|tupleかSequenceを使うべきだった。このように、タイプヒントには「(今の)実装が求める最小要件」ではなく「想定している引数の型の範囲」を表すべきである。
しかし、想定する引数の型を最初から完全に決めるのは難しい。試しにこの関数のtype hintがlistだったのにtupleを渡したくなった場合を考えてみよう。上の例と逆に「具体的すぎた」ケースだ。
- タイプヒントを
list | tupleかSequenceに修正する場合、既存の呼び出し元はlist型の値を渡しているので芋づる式に大量の呼び出し元の修正は必要ない。 - タイプヒントを変えずに呼び出し元で
list(arg)に変換する場合も、変換コストがかかるのはその1箇所だけで済む。
このように、「実装の最小要件」を使うポリシーよりも「必要になるまで具体型を使う」ポリシーの方が対応コストが低くなることが多い。どこまでの抽象度の型を受け取るべきか判断を後回しにしたい場合は、とりあえず具体型を使うことにしよう。変更の必要が生じた時は、その関数のあるべき仕様をより正しく理解してタイプヒントを書けるはずだ。
戻り値についても考えてみよう。戻り値を list から Sequence に変更するのは破壊的変更になるので、特にライブラリの公開APIのように利用側コードが別チームで開発されている場合は簡単に変更できない。ただしこれはタイプヒントだけの問題ではない。タイプヒントのないコードでも、 list を返していた関数が tuple 型を返すようになったら破壊的変更になる。だから破壊的変更を恐れてなるべく抽象度の高い型を選ぶ必要性は薄い。そもそも戻り値の型をlistから別のシーケンス型に変えるケースなんてどれくらいあるだろうか?稀に内部処理を変更して処理結果が tuple になることがあったとして、後方互換性を保つ必要があるならlistに変換して返せばいいだけだ。なので、引数よりは少し気を遣うとはいえ、最初から list と書いてしまって良い場合が多い。
結論:
- タイプヒントには「実装の最小要件」ではなく「想定範囲」を表す型を書く。
- 迷ったらとりあえず具体型を書いて、必要になってから抽象型に変える。
atexitで終了させるスレッドはdaemonにしよう
なにかの処理をバックグラウンドスレッドで実行して、アプリケーション終了時にその処理を止めたいことがあります。 たとえばOpenTelemetryのトレースやログを送信するためにスレッドが使われていますが、それらは終了時にバッファリングしているデータを送信してから終了します。
コマンドラインアプリケーションでは、main関数の終了時にバックグラウンドスレッドの終了を呼び出して待つことができます。
# th_main.py import threading import time alive = True def bgmain(): while alive: print("hello") time.sleep(1) th = threading.Thread(target=bgmain) th.start() def main(): global alive try: while True: time.sleep(1) finally: alive = False th.join() main()
実行例:
$ python3 th_main.py
hello
hello
^CTraceback (most recent call last):
File "/Users/inada-n/work/th_main.py", line 27, in <module>
main()
~~~~^^
File "/Users/inada-n/work/th_main.py", line 21, in main
time.sleep(1)
~~~~~~~~~~^^^
KeyboardInterrupt
しかし、wsgiアプリケーションのようにmain関数が無い場合があります。そのような場合に終了処理を実装するための標準ライブラリとして提供されているのがatexitモジュールです。次のようにすればバックグラウンドスレッドを止められそうに見えます。
# th_atexit.py import threading import time import atexit alive = True def bgmain(): while alive: print("hello") time.sleep(1) th = threading.Thread(target=bgmain) th.start() @atexit.register def on_exit(): global alive print("atexit") alive = False th.join() def main(): while True: time.sleep(1) main()
しかし、このプログラムを止めるためにはCtrl-Cを2回押す必要がありました。コマンドラインプログラムだから停止できましたが、wsgiアプリケーションだとしたら強制的に終了されるまで待ち続けることになります。
$ python3 th_atexit.py
hello
hello
hello
^CTraceback (most recent call last):
File "/Users/inada-n/work/th_atexit.py", line 32, in <module>
main()
~~~~^^
File "/Users/inada-n/work/th_atexit.py", line 29, in main
time.sleep(1)
~~~~~~~~~~^^^
KeyboardInterrupt
hello
hello
^CTraceback (most recent call last):
File "/opt/homebrew/Cellar/python@3.13/3.13.5/Frameworks/Python.framework/Versions/3.13/lib/python3.13/threading.py", line 1542, in _shutdown
_thread_shutdown()
KeyboardInterrupt:
atexit
ateixit が表示されているのが二回目のCtrl-Cの後、最後であることがわかります。 これはatexitが呼ばれるタイミングの問題です。
Py_Finalize(PyRuntimeState *runtime)
// Block some operations. tstate->interp->finalizing = 1; // Wrap up existing "threading"-module-created, non-daemon threads. wait_for_thread_shutdown(tstate); // Make any remaining pending calls. _Py_FinishPendingCalls(tstate); /* The interpreter is still entirely intact at this point, and the * exit funcs may be relying on that. In particular, if some thread * or exit func is still waiting to do an import, the import machinery * expects Py_IsInitialized() to return true. So don't say the * runtime is uninitialized until after the exit funcs have run. * Note that Threading.py uses an exit func to do a join on all the * threads created thru it, so this also protects pending imports in * the threads created via Threading. */ _PyAtExit_Call(tstate->interp);
atexitが呼ばれるのは非daemonスレッドが全て終了してからだということがわかります。
Pythonのドキュメントにはdaemonスレッドについてこう書かれています。
スレッドには "デーモンスレッド (daemon thread)" であるというフラグを立てられます。 このフラグには、残っているスレッドがデーモンスレッドだけになった時に Python プログラム全体を終了させるという意味があります。フラグの初期値はスレッドを生成したスレッドから継承します。フラグの値は daemon プロパティまたは daemon コンストラクタ引数を通して設定できます。
注釈 デーモンスレッドは終了時にいきなり停止されます。デーモンスレッドで使われたリソース (開いているファイル、データベースのトランザクションなど) は適切に解放されないかもしれません。きちんと (gracefully) スレッドを停止したい場合は、スレッドを非デーモンスレッドにして、Event のような適切なシグナル送信機構を使用してください。
https://docs.python.org/ja/3.12/library/threading.html#thread-objects
このドキュメントを読むと、アプリケーション終了時にきちんと終了させるスレッドはデーモンスレッドにするべきでないということになります。しかし、atexitから終了処理を実行する場合はデーモンスレッドにする必要があるので注意が必要です。
先ほどのプログラムをデーモンスレッドに修正してみましょう。
# th_atexit2.py import threading import time import atexit alive = True def bgmain(): while alive: print("hello") time.sleep(1) th = threading.Thread(target=bgmain, daemon=True) # daemonスレッドに th.start() @atexit.register def on_exit(): global alive print("atexit") alive = False th.join() def main(): while True: time.sleep(1) main()
$ python3 th_atexit2.py
hello
hello
hello
^CTraceback (most recent call last):
File "/Users/inada-n/work/th_atexit2.py", line 32, in <module>
main()
~~~~^^
File "/Users/inada-n/work/th_atexit2.py", line 29, in main
time.sleep(1)
~~~~~~~~~~^^^
KeyboardInterrupt
atexit
きちんと1回のCtrl-Cでatexitが呼ばれて終了しました。
しかし、たとえば標準ライブラリの concurrent.futures.ThreadPoolExecutor はdamonスレッドを使っていません。
そのため、 executor.shutdown(wait=True) を atexit から呼び出すことができません。
この問題は現在コア開発者で話し合いが進められていますが、まだ解決のためのAPIは実装されていません。 今は非デーモンスレッドをバックグラウンドで使うライブラリにモンキーパッチを当てていくか、WSGIサーバーが提供する独自APIを使う必要があります。
例えば uwsgi には uwsgi.atexit というAPIがあり、これに登録された処理は Py_Finalize() を呼び出す前に実行されるので ThreadPoolExecutorを止めることができます。
9/2追記: ThreadPoolExecutorはatexit実行前に終了待ちされるので、 executor.shutdown() などを atexit から実行する必要はありません。問題になるのはバックグラウンドスレッドで動き続けている処理を止めるためにatexitを使う場合だけで、ThreadPoolExecutorの一般的な使い方であるそれなりの時間で終わる処理をバックグラウンドで実行するだけであれば問題ありません。次の補足記事を参照してください。
コンテナのPythonからMySQLにzstd圧縮を有効にして接続する
mysql-connector-python を使えば簡単なのですが、あえて mysqlclient を使う場合の話です。
まず、Pythonの公式DockerイメージはDebianベースになっていますが、Debianではデフォルトではapt-getでMySQLをインストールできません。 (Debian sid では MySQL が復活していますが、stableになるには時間がかかります)
代わりにmariadbのクライアントライブラリ(libmariadb)はインストールできるのですが、MariaDBはzstd圧縮に対応しておらず、そのせいかクライアントライブラリはzstd圧縮に対応しているもののDebian bookwormのパッケージには入っていないので使えません。
なので標準の apt-get だけでzstd圧縮を使う方法がありません。代わりにライブラリをインストールする必要があります。
まず MySQL なんですが、8.0からMySQL Connector/Cの配布を止めてしまっていて、MySQLを入れろというふうになっていて面倒です。 また apt レポジトリも公開してくれているのですが、 i386 と amd64 はあるのに arm64 がないのでそれも面倒です。
次に MariaDB ですが、次のダウンロードページを見てみると、Debian用のバイナリをarm64とamd64でバイナリパッケージを tar.gz で提供してくれています。
https://mariadb.com/downloads/connectors/
これを使ってmysqlclientを用意してみます。
FROM python:3.13 # 先にデフォルトのlibmariadbを消しておく。 python:3.13-slim の場合は省略可能 RUN apt-get -y purge libmariadb3 libmariadb-dev # MariaDB Connector/C をインストールする WORKDIR /usr/local ADD https://dlm.mariadb.com/4234446/Connectors/c/connector-c-3.4.5/mariadb-connector-c-3.4.5-debian-bookworm-aarch64.tar.gz /usr/local/mariadb-connector.tar.gz # AMD64の場合はこっち # ADD https://dlm.mariadb.com/4234464/Connectors/c/connector-c-3.4.5/mariadb-connector-c-3.4.5-debian-bookworm-amd64.tar.gz /usr/local/mariadb-connector.tar.gz RUN tar xf mariadb-connector.tar.gz --strip-components=1 && rm mariadb-connector.tar.gz # /usr/local/lib/mariadb の下のライブラリはデフォルトで見つけてくれないので、 libmariadb.so(.3) へのsymlink を ld.so.conf で指定されているディレクトリに作る # または下にある方法で rpath を設定しても良い WORKDIR /usr/local/lib RUN ln -s mariadb/libmariadb.so* . && ldconfig # mysqlclient をインストール WORKDIR /home RUN pip install -v mysqlclient # rpath を設定する場合 #RUN MYSQLCLIENT_CFLAGS=$(/usr/local/bin/mariadb_config --cflags) \ # MYSQLCLIENT_LDFLAGS="$(/usr/local/bin/mariadb_config --libs) -Wl,-rpath=/usr/local/lib/mariadb" \ # pip install -v mysqlclient
これで zstd 接続ができているかを確認しましょう。
$ docker run --network host --name mysql -e MYSQL_ROOT_PASSWORD=my-secret-pw -d mysql:8.0
$ docker build -t hoge .
$ docker run --network host -it hoge bash
root@colima:/home# ldd /usr/local/lib/python3.13/site-packages/MySQLdb/_mysql.cpython-313-aarch64-linux-gnu.so
linux-vdso.so.1 (0x0000fc6873027000)
libmariadb.so.3 => /usr/local/lib/libmariadb.so.3 (0x0000fc6872f40000)
libc.so.6 => /lib/aarch64-linux-gnu/libc.so.6 (0x0000fc6872d90000)
libssl.so.3 => /lib/aarch64-linux-gnu/libssl.so.3 (0x0000fc6872cd0000)
libcrypto.so.3 => /lib/aarch64-linux-gnu/libcrypto.so.3 (0x0000fc6872800000)
/lib/ld-linux-aarch64.so.1 (0x0000fc6872fea000)
root@colima:/home# python
Python 3.13.3 (main, Apr 9 2025, 02:20:29) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import MySQLdb
>>> con = MySQLdb.connect(host="127.0.0.1", user="root", password="my-secret-pw", compress=True)
>>> cur = con.cursor()
>>> cur.execute('SHOW STATUS LIKE "Compress%"')
3
>>> for v in cur:
... print(v)
...
('Compression', 'ON')
('Compression_algorithm', 'zstd')
('Compression_level', '3')
>>>
Go 1.24 の testing.B.Loop() が便利
Benchmark関数を書くとき、重い初期化処理が必要になるケースがある。 例えばデータベースから大量のデータを読み込むベンチマークを書きたい場合、まずはその大きいデータを持ったテーブルを作る必要がある。
Goのベンチマーク関数は、普通は実行時間がターゲット(デフォルトで1秒)に近づくように b.N を変えながら複数回呼び出される。
func BenchmarkBigData(b *testing.B) { // 複数回呼び出される // 重いセットアップ b.ResetTimer() for range b.N { // 測りたいコード } b.StopTimer() // クリーンアップ }
このようなコードでは複数回「重いセットアップ」「クリーンアップ」がよばれてしまうので、1秒のベンチマークのためのセットアップ&クリーンアップ時間が数十秒かかる可能性が出てくる。
これを避けるために今まで使っていたのは b.Run() を使う方法だ。 b.Run() を呼び出したら、呼び出した側のベンチマーク関数は1度しか実行されなくなり、b.Run() に渡したベンチマーク関数が複数回よばれる。
func BenchmarkBigData(b *testing.B) { // 1回だけ呼び出される // 重いセットアップ b.Run("go", func(b *testing.B) { // 複数回呼び出される for range b.N { // 測りたいコード } }) // クリーンアップ }
ただ、ベンチマークを実行するときに "BenchmarkBigData/go" みたいにサブテスト名が追加されてしまうのがダサくなってしまう。 Go 1.24 で追加された b.Loop() を使えばサブテストが不要になる。 ただし b.Run() から for b.Loop() に切り替える時は、 b.ResetTimer() や b.StopTimer() を忘れないこと。
func BenchmarkBigData(b *testing.B) { // 1回だけ呼び出される // 重いセットアップ b.ResetTimer() for b.Loop() { // 測りたいコード } b.StopTimer() // クリーンアップ }
Rows.Scan() に渡す変数はループ外で宣言した方が速い
go-mysql-driver のアロケーションを調査していて気づいた小ネタ。
--- a/benchmark_test.go +++ b/benchmark_test.go @@ -423,9 +423,9 @@ func BenchmarkReceiveMassiveRows(b *testing.B) { b.Errorf("failed to select: %v", err) return } + var i int + var s sql.RawBytes for rows.Next() { - var i int - var s sql.RawBytes err = rows.Scan(&i, &s) if err != nil { b.Errorf("failed to scan: %v", err)
こう言うふうに、Scanに渡す変数は rows.Next() ループの外で宣言した方がいい。
rows.Scan() に &i のようにアドレスを渡しているが、Scan() に渡された変数はエスケープしていると判断される。
C言語などではこのアドレスがScan()を呼び出した後に利用されないことをプログラマが保証するのでコンパイラはスタックに変数を宣言できるが、Goではエスケープするならヒープアロケーションしてしまう。
ループの外で宣言することで、ヒープアロケーションをループごとではなく1度だけに限定できる。