SQLAlchemyでAurora MySQLのfailover追従を行う

Aurora MySQLでfailoverが発生した時、primaryから脱落したインスタンスは、通常のMySQLであれば起動後に変更されない innodb_read_only というシステム変数が1になり、更新クエリが 1290 "The MySQL server is running with the --read-only option so it cannot execute this statement" のようなエラーを発生させます。

SQLAlchemyのコネクションプールは pool_recycle で設定した時間まで再利用されてしまうので、failover前のインスタンスへの接続は切断して新規接続しなければなりません。

なお、古い接続を切ってもDNS更新がまだできていなければ新しい接続もfailover前のインスタンスに向かってしまうのでエラーになります。DNS更新は通常30秒以下とAWSのドキュメントに書かれていますが、それを待てない場合は高速failoverを実装した RDS Proxy や ProxySQL のようなプロダクトを利用する必要があります。この記事ではそういったプロダクトは使わず、通常の failover 追従だけを行います。

古い接続を切るにはおおまかにいって2種類のやり方があります。

  • 1290 等の innodb_read_only によって引き起こされるエラーを検出してその接続を再利用不可にする
  • select @@innodb_read_only を実行して 1 ならその接続を使わない

どちらのやり方が良いかはアプリケーションがDBを使う方法に依存します。 autocommit を使って明示的トランザクションをあまり使わず、単発のクエリ=トランザクションをたくさん実行するのであれば、クエリを投げるたびに select @@innodb_read_only クエリを実行するとクエリ数が2倍になってしまうので前者の方が適しています。例えばGoの sql.DB はそういった使われ方をすることが多いので、 go-mysql-driver では rejectReadOnly というオプションを提供しています。

一方でエラーチェックにはデメリットがあります。 Aurora の failover 発生時に起きるエラーは 1290 だけではありませんし、中には failover 以外の条件でも発生するエラーだってあるので、failover検出用に選択したエラーが実際に発生する前に複数回他のエラーが発生する可能性があるのです。最悪の場合はfailover検出にひっかからないエラーでループする状況にもなり得ます。最悪のケースに備える安全策として、コネクションプールの接続再利用時間を短めに設定して検出がうまくいかない場合もその時間でfailover追従できるようにしましょう。 SQLAlchemy の場合は create_engine の pool_recycle と言うオプションがこれに該当するので、1~5分程度に設定するのが良いでしょう。

一方でSQLAlchemy ORMを使う場合は autocommit は使わずに明示的なトランザクションを使うことが多いです。多くの場合はWebアプリケーションへの1リクエストあたり1トランザクションで済むでしょう。トランザクション開始時にコネクションプールから接続を取得し、トランザクション終了時にコネクションプールに接続を返すので、接続取得時に select @@innodb_read_only を実行しても1リクエストあたり1クエリ程度しか追加せずに済みます。またそのチェックで failover を検出して再接続が正しい primary に接続できれば、そのWebアプリケーションのリクエストをエラーにせずに済みます。SQLAlchemy ORMを普通に使っている場合はこちらの方がおすすめです。

サンプルコード

次のサンプルコードは説明した2種類の failover 追従の両方を実装しています。実際に利用するときは併用する意味がない(エラーを検出した接続をプールに戻しても、次回チェックアウト時にちゃんと捨てられる)ので、どちらか片方だけを利用してください。

innodb_read_only チェックをする場合は、 create_engine() のオプションで pool_pre_ping (接続チェックアウト時に select 1 を実行して接続の有効性をチェックする) は無駄になるので、もし使っているようであれば消しましょう。

エラー検出の方を利用する場合は、上で説明したように検出漏れへの安全策として pool_recycle を短めの値にしておきましょう。

from typing import Any

from flask import Flask
from sqlalchemy import create_engine, Column, Integer, engine as sa_engine
from sqlalchemy import event
from sqlalchemy.orm import scoped_session, sessionmaker, declarative_base
from sqlalchemy.exc import DisconnectionError


engine = create_engine(
    "mysql://root@127.0.0.1:3306/test", echo="debug", echo_pool="debug", pool_recycle=300,
)
db_session = scoped_session(sessionmaker(bind=engine))
Base = declarative_base()


# コネクションプールから接続を取り出した時に innodb_read_only をチェックする
# https://docs.sqlalchemy.org/en/21/core/events.html#sqlalchemy.events.PoolEvents.checkout
@event.listens_for(engine, "checkout")
def handle_pool_checkout(
    dbapi_connection: Any, connection_record: Any, connection_proxy: Any
) -> None:
    try:
        cursor = dbapi_connection.cursor()
        cursor.execute("SELECT @@innodb_read_only")
        read_only = cursor.fetchone()[0]
        if read_only:
            raise DisconnectionError("detected innodb_read_only=1")
    except Exception as e:
        # pool_pre_ping 代わり。任意のエラーを再接続させる。
        raise DisconnectionError(str(e))


# 1290等の read_only で発生するエラーで接続を再利用しないようにする
@event.listens_for(engine, "handle_error")
def handle_readonly_error(context) -> None:
    orig = context.original_exception
    if not isinstance(orig, Exception):
        return

    # MySQL Driverの例外は .args[0] がエラーコードになっている。
    if args := getattr(orig, "args", None):
        # read replica で発生するエラー.
        # 1015: Can't lock file (errno: 165 - Table is read only)
        # 1290: The MySQL server is running with the --read-only option so it cannot execute this statement
        # 1836: Running in read-only mode
        if args[0] in (1015, 1290, 1836) and context.connection is not None:
            # connection.invalidate(exception=orig) を使うとconnectionがすぐにcloseされてしまい
            # このあとの cursor.close() でエラーが発生してログを汚してしまう。
            # このコネクションがpoolに戻されないようにdetachだけを行う。
            context.connection.detach()


app = Flask(__name__)


class Counter(Base):
    __tablename__ = "counter"
    id = Column(Integer, primary_key=True)
    counter = Column(Integer, default=0)


def setup():
    Base.metadata.create_all(engine)
    if db_session.query(Counter).first() is None:
        db_session.add(Counter(counter=0))
        db_session.commit()


@app.route("/")
def index():
    if 0:  # insert
        db_session.add(Counter(counter=0))
        db_session.commit()
        return "OK"
    else:  # update
        counter_obj: Counter = db_session.query(Counter).with_for_update().first()
        assert counter_obj is not None
        counter_obj.counter += 1
        db_session.commit()
        return f"Counter: {counter_obj.counter}"


with app.app_context():
    setup()


if __name__ == "__main__":
    app.run(debug=True)

OTLPのspan nameが情報量少なすぎる件

OpenTelemetry 公式の Instrumentation ライブラリを使っていると、Requestsのspanの名前が "GET" だけだったり、 mysqlclient のspanが "SELECT" だけだったりします。

たとえばGrafanaでトレースを見ていると "SELECT" ばかりが何個も並んでいて、一つ一つクリックして attributes の db.statement を確認していかないとどのクエリを実行しているのか分かりません。非常に面倒です。

Flaskのspanであれば "GET /user/<id:int>" のように表示してくれるのに、なぜRequestsのspanの名前はtable名やpathを書いてくれないのでしょうか。それは OpenTelemetry の Semantic conventions でそう推奨されているからです。

HTTP span の Convention:

HTTP span names SHOULD be {method} {target} if there is a (low-cardinality) target available. If there is no (low-cardinality) {target} available, HTTP span names SHOULD be {method}.

https://opentelemetry.io/docs/specs/semconv/http/http-spans/

Tracing API の Spec に書かれている Span name についての説明:

The span name concisely identifies the work represented by the Span, for example, an RPC method name, a function name, or the name of a subtask or stage within a larger computation. The span name SHOULD be the most general string that identifies a (statistically) interesting class of Spans, rather than individual Span instances while still being human-readable.

...

Span Name Guidance
get Too general
get_account/42 Too specific
get_account Good, and account_id=42 would make a nice Span attribute
get_account/{accountId} Also good (using the "HTTP route")

https://github.com/open-telemetry/opentelemetry-specification/blob/v1.54.0/specification/trace/api.md#span

Flaskのrouteはほぼworkと1対1対応しているので span name に target を含められるのですが、RequestsやWSGIではPATHのどの部分が work に対応している部分なのかがわからないので target が含められていないのです。

また、mysqlclient の場合は次のようなルールにより target が含められていません。

The span name SHOULD be {db.query.summary} if a summary is available.

If no summary is available, the span name SHOULD be {db.operation.name} {target} provided that a (low-cardinality) db.operation.name is available (see below for the exact definition of the {target} placeholder).

...

The {target} SHOULD describe the entity that the operation is performed against and SHOULD adhere to one of the following values, provided they are accessible:

  • db.collection.name SHOULD be used for operations on a specific database collection.

...

[3] db.collection.name: It is RECOMMENDED to capture the value as provided by the application without attempting to do any case normalization.

The collection name SHOULD NOT be extracted from db.query.text, when the database system supports query text with multiple collections in non-batch operations.

https://opentelemetry.io/docs/specs/semconv/db/database-spans/

つまり、 "SELECT user" のようにテーブル名 (db.collection.name) を target として span name に含めることは Convention で指定されているものの、そのテーブル名をSQLから文字列パースして取り出すことは禁止されているのです。

DB DriverやHTTP Clientのようなアプリケーションレイヤーより下のライブラリではこれらのルールを守りながら使いやすい Span name を提供することは難しいです。どちらも "target" を呼び出し元から受け取るように引数を増やして、 opentelemtry-instrumentation-xxx のようなhookライブラリから利用できるところに保存するか、ライブラリ自身がotel対応する必要が出てくるでしょう。

しかし、自身のアプリケーションのトレースを取得するのであれば、別にこれらのConventionを厳密に守る必要はありません。ログと違いトレースは30日やそこらで削除することが多いので、今使っているTrace Viewerで扱いやすいようにカスタマイズしてしまうのがいいでしょう。

span name のカスタマイズは Agent (OTLP Collector) でもできると思いますが、ここでは Python でカスタマイズする例を書いておきます。 カスタマイズ対象は mysqlclient と urllib3 です。(Reuqestsではなくurllib3を使っているのは、今関わっているプロジェクトではNiquestsを選定して、 opentelemetry-instrumentation-requests が使えなくなったからです。)

# opentelemetry-instrumentation-mysqlclient のカスタマイズ例
# span name にコマンドだけでなくテーブル名も表示する
import re
from opentelemetry.instrumentation.dbapi import CursorTracer
from opentelemetry.instrumentation.mysqlclient import MySQLClientInstrumentor

def get_operation_name(self, cursor, args):
    if args and isinstance(args[0], str):
        # Strip leading comments so we get the operation name.
        sql = self._leading_comment_remover.sub("", args[0])
        command = sql.split(maxsplit=1)[0].upper()
        pattern = r"\b(FROM|INTO|UPDATE)\s+[`]?([\w]+)[`]?\b"
        if m := re.search(pattern, sql, re.IGNORECASE):
            return f"{command} {m.group(2)}"
        else:
            return command
    return ""

CursorTracer.get_operation_name = get_operation_name
MySQLClientInstrumentor().instrument()

# urllib3 (Niquests) のカスタマイズ例
# span 名にpathを含める
import urllib.parse
from opentelemetry.instrumentation.urllib3 import URLLib3Instrumentor

def request_hook(span, pool, request_info):
    url = urllib.parse.urlparse(request_info.url)
    span.update_name(f"{request_info.method} {url.path}")

URLLib3Instrumentor().instrument(request_hook=request_hook)

SQLからtable名を取り出す部分はかなり適当ですが、複雑なクエリを使っていないのでこれでも十分に便利になりました。

NiquestsとRequestsの `pool_maxsize` の違い

前回の記事 ではNiquestsに最近修正された深刻なバグがあることを紹介しましたが、それで HTTP/2 や HTTP/3 の利用を不安に思い HTTP/1.1 を使うことにしていました。そして HTTP/1.1 の場合に Requests と pool_maxsize の振る舞いが大きく異なることに気がつきました。

ちなみに pool_maxsize について初めて触れたのは Requests の設定についての記事でした。

urllib3 では PoolManager の maxsize で同時接続数を制限できてデフォルトで1なのですが、 requests ではこれを10に置き換えてしまっており、これはほとんどのアプリケーションにとっては過剰でしょう。最大接続数を超えてもデフォルトの設定ではブロックせずに新規接続してくれるので、基本的には maxsize=1 を使い、それで足りないような場合にだけ増やすのがいいと思います。

-- requestsで長時間Sessionを使う場合はidle_timeoutに注意

Niquestsはデフォルトで HTTP/2 は HTTP/3 に対応しており、それらを使った場合は1本の接続で複数のリクエストを多重化して送信します。なので何も設定しなくても大量の接続ができることはありません。

問題は HTTP/1.1 のときで、Niquestsは「デフォルトの設定ではブロックせずに新規接続して」くれません。 HTTPAdapterに pool_block=False を指定しても変わりません。どうやら一つのコネクションプールクラスでHTTP/1.1の接続とHTTP/2,3の接続を混ぜて使えるようにする上で生まれた制約のようです。

結果として、Niquestsで pool_maxsize=1 を設定してしまうと、本当に同時にリクエストを送信したくなった場合に接続が空くのを待つようになってしまい、WebアプリケーションからWeb APIを呼んでいるケースではレスポンスタイムの低下につながります。

対策は pool_maxsize を大きい値に設定することです。デフォルト値が10なので、10スレッド以上で並行アクセスするのでなければデフォルト値で構いません。しかしそうすると、一時的に多めに並行アクセスが発生した後に落ち着いても、たくさん作った接続がずっと解放されないままになってしまいます。

さて、RequestsからNiquestsに移行した最大の理由は keepalive_delay でHTTP/1.1接続の寿命を設定できることでした。 そしてNiquestsはHTTP/2やHTTP/3の接続の管理のためにバックグラウンドスレッドでコネクションプールのメンテ(PING送信)をしています。 そのバックグラウンドスレッドで寿命が来たHTTP/1.1接続のcloseもやってほしいと頼んだらすぐに実装してもらえました。 urllib3-future 2.17.902で実装されたので、Niquestsを使っていてkeepalive接続の繋ぎっぱなしが気になる人はこのバージョン以降にアップデートしてください。

ちなみに、 pool_maxsize が小さいときに接続待ちをしているリクエストがいる状態で、先に接続を使っているリクエストが接続エラーに遭遇すると、待っていたまだ送信していないリクエストまで道連れにエラーになってしまうというバグも発見して報告しています。この問題の修正には少し時間がかかるようですが、 pool_maxsize を大きめにして接続待ちを無くせばこのバグも回避できています。

https://github.com/jawah/urllib3.future/issues/323

Niquestsの深刻なバグについての注意 (urllib3-future 2.15.902 で修正済み)

最近何度か紹介していた Niquests に致命的なバグがあり、問題の大きさの割にアナウンスが小さいので解説しておきます。

なお、このバグは urllib3-future 2.14.906 (2025-11-06) から発生し、2.15.902 (2026-02-03) で修正されました。 Niquests を使っている人は利用している urllib3-future のバージョンの確認を強く推奨します。

バグの内容

niquestsのSessionを使って同一ホストに対して並行にリクエストを送信したとき、その接続が HTTP/2 か HTTP/3 だと、レスポンスの取り違えが発生します。

再現コード

import niquests
import threading
import time
import sys
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("main")

session = niquests.Session(pool_maxsize=1, keepalive_delay=60)

stop = False


def get_worker(name):
    while not stop:
        response = session.get(f"https://httpbin.org/delay/1?name={name}")
        if response.status_code != 200:
            logger.error("bad status. code=%s, %s, body=%s", response.status_code, response.status, response.text)
        json = response.json()
        if json["args"]["name"] != name:
            logger.error(f"wrong name! got {json['args']['name']}, expected {name}")


def main():
    threads = []
    for i in range(4):
        t = threading.Thread(target=get_worker, args=(f"worker-{i}",))
        t.start()
        threads.append(t)

    try:
        for t in threads:
            t.join()
    except KeyboardInterrupt:
        global stop
        stop = True

        sys.stdout.write("\nStopping threads...\n")
        for t in threads:
            t.join()


if __name__ == "__main__":
    main()

実行結果

2026-03-05 18:09:24,262 ERROR wrong name! got worker-3, expected worker-2
2026-03-05 18:09:24,263 ERROR wrong name! got worker-2, expected worker-3
2026-03-05 18:09:25,716 ERROR wrong name! got worker-1, expected worker-0
2026-03-05 18:09:25,718 ERROR wrong name! got worker-0, expected worker-2
2026-03-05 18:09:25,717 ERROR wrong name! got worker-3, expected worker-1
2026-03-05 18:09:25,720 ERROR wrong name! got worker-2, expected worker-3
2026-03-05 18:09:27,687 ERROR wrong name! got worker-1, expected worker-0
2026-03-05 18:09:28,786 ERROR wrong name! got worker-0, expected worker-1

発生条件と影響

この問題は HTTP/1.1 では発生しません。1つの接続に複数のリクエストを多重化しているときにのみレスポンスの取り違えが発生します。

このバグを抱えていると、たとえば稀にあるユーザーに別のユーザーのデータを返してしまう、みたいな深刻な問題が発生する恐れがあります。

2025-11-06 に混入したバグが、 2026-02-03 に報告され、2026-02-04 に修正がリリースされました。メンテナの対応は素晴らしく早いものの、バグが混入してから見つかるまでに3ヶ月かかってしまっているのはユーザー数の少なさというデメリットが現れているのだと思います。

もともと requests で長時間 keep-alive すると稀にエラーになるのをなんとかしたいというモチベーションで niquests を使っていたので、社内で niquests を使おうとしていたあるサービスでは urllib3-future をバージョンアップするだけでなく HTTP/2 と HTTP/3 を無効にして運用することにしました。無効にするのは Session の引数で可能です。

# pool_maxsize は1ホストあたり接続をいくつまでプールするか。
# デフォルトはmaxsizeでも接続が足りなくなったら新規接続する(pool_block=False)ので、
# よっぽどのことがなければ pool_maxsize=1 で十分。
#
# keepalive_delayはHTTP/2用のオプションだが、HTTP/1.1では接続の寿命(新規接続して
# 何秒後に再利用をやめるか)として扱われる。
# 今の所idle_timeoutがないので、サーバー側のidle_timeoutにぶつかってエラーになる
# のを避けるためにkeepalive_delayを使っている。
# 60を採用したのは、nginxのデフォルトのidle_timeout 75sなのでそれより短い値として。
#
# requestsとの大きな違いとして、デフォルトでシステムのルート証明書を利用することに注意。
# ルート証明書のアップデートにはPythonのcertifiパッケージではなく Debian/Ubuntuの
# ca-certificates パッケージをアップデートすること。
#
# https://github.com/jawah/urllib3.future/issues/309
# 1つの接続に複数のリクエストを並行して処理できるHTTP2 or 3 で、並行にアクセスするとレスポンスが他の
# リクエストに行ってしまうという致命的な問題があった。 urllib3-future 2.15.902 で解決済みだが、
# 怖いのでHTTP/1.1に限定して利用する。
session = niquests.Session(pool_maxsize=1, keepalive_delay=60, disable_http2=True, disable_http3=True)

`functools.cache` や `functools.lru_cache` をメソッドに使うメモリリークはruffで検出できる

functools.cacheをメソッドに使う - methaneのブログ で紹介した、普通に functools.cache をメソッドに使うとメモリリークになってしまう問題ですが、半年ぶり2回目遭遇したので再発防止しないとなと思ったらすでに静的チェックがありました。

docs.astral.sh

次のように pyproject.toml に追加しておくと良いでしょう。

# pyproject.toml

[tool.ruff.lint]
extend-select = [
  "I",  # enable isort
  ...
  "B019",  # https://docs.astral.sh/ruff/rules/cached-instance-method/
]
このブログに乗せているコードは引用を除き CC0 1.0 で提供します。