SQLAlchemyでAurora MySQLのfailover追従を行う

Aurora MySQLでfailoverが発生した時、primaryから脱落したインスタンスは、通常のMySQLであれば起動後に変更されない innodb_read_only というシステム変数が1になり、更新クエリが 1290 "The MySQL server is running with the --read-only option so it cannot execute this statement" のようなエラーを発生させます。

SQLAlchemyのコネクションプールは pool_recycle で設定した時間まで再利用されてしまうので、failover前のインスタンスへの接続は切断して新規接続しなければなりません。

なお、古い接続を切ってもDNS更新がまだできていなければ新しい接続もfailover前のインスタンスに向かってしまうのでエラーになります。DNS更新は通常30秒以下とAWSのドキュメントに書かれていますが、それを待てない場合は高速failoverを実装した RDS Proxy や ProxySQL のようなプロダクトを利用する必要があります。この記事ではそういったプロダクトは使わず、通常の failover 追従だけを行います。

古い接続を切るにはおおまかにいって2種類のやり方があります。

  • 1290 等の innodb_read_only によって引き起こされるエラーを検出してその接続を再利用不可にする
  • select @@innodb_read_only を実行して 1 ならその接続を使わない

どちらのやり方が良いかはアプリケーションがDBを使う方法に依存します。 autocommit を使って明示的トランザクションをあまり使わず、単発のクエリ=トランザクションをたくさん実行するのであれば、クエリを投げるたびに select @@innodb_read_only クエリを実行するとクエリ数が2倍になってしまうので前者の方が適しています。例えばGoの sql.DB はそういった使われ方をすることが多いので、 go-mysql-driver では rejectReadOnly というオプションを提供しています。

一方でエラーチェックにはデメリットがあります。 Aurora の failover 発生時に起きるエラーは 1290 だけではありませんし、中には failover 以外の条件でも発生するエラーだってあるので、failover検出用に選択したエラーが実際に発生する前に複数回他のエラーが発生する可能性があるのです。最悪の場合はfailover検出にひっかからないエラーでループする状況にもなり得ます。最悪のケースに備える安全策として、コネクションプールの接続再利用時間を短めに設定して検出がうまくいかない場合もその時間でfailover追従できるようにしましょう。 SQLAlchemy の場合は create_engine の pool_recycle と言うオプションがこれに該当するので、1~5分程度に設定するのが良いでしょう。

一方でSQLAlchemy ORMを使う場合は autocommit は使わずに明示的なトランザクションを使うことが多いです。多くの場合はWebアプリケーションへの1リクエストあたり1トランザクションで済むでしょう。トランザクション開始時にコネクションプールから接続を取得し、トランザクション終了時にコネクションプールに接続を返すので、接続取得時に select @@innodb_read_only を実行しても1リクエストあたり1クエリ程度しか追加せずに済みます。またそのチェックで failover を検出して再接続が正しい primary に接続できれば、そのWebアプリケーションのリクエストをエラーにせずに済みます。SQLAlchemy ORMを普通に使っている場合はこちらの方がおすすめです。

サンプルコード

次のサンプルコードは説明した2種類の failover 追従の両方を実装しています。実際に利用するときは併用する意味がない(エラーを検出した接続をプールに戻しても、次回チェックアウト時にちゃんと捨てられる)ので、どちらか片方だけを利用してください。

innodb_read_only チェックをする場合は、 create_engine() のオプションで pool_pre_ping (接続チェックアウト時に select 1 を実行して接続の有効性をチェックする) は無駄になるので、もし使っているようであれば消しましょう。

エラー検出の方を利用する場合は、上で説明したように検出漏れへの安全策として pool_recycle を短めの値にしておきましょう。

from typing import Any

from flask import Flask
from sqlalchemy import create_engine, Column, Integer, engine as sa_engine
from sqlalchemy import event
from sqlalchemy.orm import scoped_session, sessionmaker, declarative_base
from sqlalchemy.exc import DisconnectionError


engine = create_engine(
    "mysql://root@127.0.0.1:3306/test", echo="debug", echo_pool="debug", pool_recycle=300,
)
db_session = scoped_session(sessionmaker(bind=engine))
Base = declarative_base()


# コネクションプールから接続を取り出した時に innodb_read_only をチェックする
# https://docs.sqlalchemy.org/en/21/core/events.html#sqlalchemy.events.PoolEvents.checkout
@event.listens_for(engine, "checkout")
def handle_pool_checkout(
    dbapi_connection: Any, connection_record: Any, connection_proxy: Any
) -> None:
    try:
        cursor = dbapi_connection.cursor()
        cursor.execute("SELECT @@innodb_read_only")
        read_only = cursor.fetchone()[0]
        if read_only:
            raise DisconnectionError("detected innodb_read_only=1")
    except Exception as e:
        # pool_pre_ping 代わり。任意のエラーを再接続させる。
        raise DisconnectionError(str(e))


# 1290等の read_only で発生するエラーで接続を再利用しないようにする
@event.listens_for(engine, "handle_error")
def handle_readonly_error(context) -> None:
    orig = context.original_exception
    if not isinstance(orig, Exception):
        return

    # MySQL Driverの例外は .args[0] がエラーコードになっている。
    if args := getattr(orig, "args", None):
        # read replica で発生するエラー.
        # 1015: Can't lock file (errno: 165 - Table is read only)
        # 1290: The MySQL server is running with the --read-only option so it cannot execute this statement
        # 1836: Running in read-only mode
        if args[0] in (1015, 1290, 1836) and context.connection is not None:
            # connection.invalidate(exception=orig) を使うとconnectionがすぐにcloseされてしまい
            # このあとの cursor.close() でエラーが発生してログを汚してしまう。
            # このコネクションがpoolに戻されないようにdetachだけを行う。
            context.connection.detach()


app = Flask(__name__)


class Counter(Base):
    __tablename__ = "counter"
    id = Column(Integer, primary_key=True)
    counter = Column(Integer, default=0)


def setup():
    Base.metadata.create_all(engine)
    if db_session.query(Counter).first() is None:
        db_session.add(Counter(counter=0))
        db_session.commit()


@app.route("/")
def index():
    if 0:  # insert
        db_session.add(Counter(counter=0))
        db_session.commit()
        return "OK"
    else:  # update
        counter_obj: Counter = db_session.query(Counter).with_for_update().first()
        assert counter_obj is not None
        counter_obj.counter += 1
        db_session.commit()
        return f"Counter: {counter_obj.counter}"


with app.app_context():
    setup()


if __name__ == "__main__":
    app.run(debug=True)
このブログに乗せているコードは引用を除き CC0 1.0 で提供します。