次世代標準非同期I/Oフレームワーク asyncio (Tulip)

Python Advent Calendar 2013 の4日目です。 Python 3.4 で標準ライブラリに追加される asyncio を触ってみます。

なお、 Tulip とは asyncio のリファレンス実装のプロジェクト名です。

背景

Python はよく非同期 I/O プログラミングに使われます。 Twisted, Tornado, gevent, eventlet, pyuv などのフレームワークがあります。

これらのフレームワークの問題点として、ライブラリの再利用性の低さが挙げられます。 たとえば Twisted 用に書かれた XMPP ライブラリは、そのままでは Tornado で 利用することができません。

この問題の解決策として、良くイベントループの乗り入れが行われます。

GUIアプリケーションに組み込む場合などを考えて、多くのフレームワークが最初から イベントループが pluggable になっています。

フレームワークごとに、 Deferred/Future, stream, coroutine/green thread などに違いがあるものの、基板となる event loop は、ソケットにデータが届いたら callback するなど同じような機能を提供していることが多いので、 API を調整する ラッパーを作れば他のフレームワークのイベントループを使うことができます。

実際に、 libev を使った gevent の core を libuv ベースの pyuv に置き換えるとか、 Twisted のループを Tornado で使うといったことが実現されています。

ただし、 N 個のフレームワークがあれば、相互乗り入れのためのラッパーは N * (N-1) 個 必要になってしまいます。どれか1つ標準があれば、相互乗り入れは圧倒的に楽になります。 そこで登場するのが asyncio です。

インストール

それでは asyncio を使ってみましょう。

asyncio を使うには、 Python 3.3 に pip install asyncio するか、 Python 3.4b1 をインストールします。 今回は Python 3.3 を使いました。

イベントループ

import asyncio

def timer_async():
    print('3 sec timer')
    loop = asyncio.get_event_loop()

    def later(msg):
        print(msg)
        loop.stop()

    loop.call_later(3, later, "done")

def timer():
    loop = asyncio.get_event_loop()
    loop.call_soon(timer_async)
    loop.run_forever()

timer()

はい、とくに問題無いですね。

Future

非同期に実行する処理は基本的に asyncio.Future でラップした API が提供されます。 future.add_done_callback() でコールバックを登録、 future.set_result() で結果を登録、 future.result() で結果の取り出しができます。

Future を使って定番の echo サーバーを書いてみましょう。

import asyncio
import socket


def server():
    s = socket.socket()
    s.bind(('127.0.0.1', 5555))
    s.listen(777)
    s.setblocking(False)
    loop = asyncio.get_event_loop()

    def serve():
        fut = loop.sock_accept(s)
        fut.add_done_callback(on_accept)

    def on_accept(fut):
        conn, peer = fut.result()
        handler(conn, peer)
        serve()

    serve()


def handler(conn, peer):
    loop = asyncio.get_event_loop()

    def start_receive(fut=None):
        fut = loop.sock_recv(conn, 128)
        fut.add_done_callback(on_receive)

    def on_receive(fut):
        data = fut.result()
        if not data:
            conn.close()
        fut = loop.sock_sendall(conn, data)
        fut.add_done_callback(start_receive)

    start_receive()


loop = asyncio.get_event_loop()
loop.call_soon(server)
loop.run_forever()

紹介を省いた fd を直接登録するコールバックAPIに比べると、 socket のノンブロッキング 操作を直接する必要がなくなっているのですが、それでもかなりダルいコードです。 Future はコールバックよりも coroutine を使って待ちましょう。

coroutine

Tulip も Tornado のように generator を使った coroutine を使えます。 Tornado と違う点は、 Python 3.3 から導入された yield from 構文を 利用していることです。

これにより、 Tornado に比べて generator を Task でラップする必要が無くなり、 性能アップが見込めます。

先ほどの echo サーバーを coroutine にしてみましょう。

import asyncio
import socket


def server():
    s = socket.socket()
    s.bind(('127.0.0.1', 5555))
    s.listen(777)
    s.setblocking(False)
    loop = asyncio.get_event_loop()
    while True:
        conn, peer = yield from loop.sock_accept(s)
        asyncio.async(handler(conn, peer))


def handler(conn, peer):
    loop = asyncio.get_event_loop()
    while True:
        data = yield from loop.sock_recv(conn, 128)
        if not data:
            conn.close()
            break
        yield from loop.sock_sendall(conn, data)


loop = asyncio.get_event_loop()
loop.run_until_complete(server())

だいぶスッキリしましたね。 ポイントは次の通りです:

  • Future の代わりに generator を返すことができる
  • Future や generator は yeild from で待つ
  • generator を非同期に実行する場合は、 asyncio.async などを使う
  • callback 関数を受け取るAPIと Future/generator を受け取る API が分かれていることに注意

Protocol と Transport

さらに高級な、ネットワークプログラミングのためのAPI として Protocol と Transport が用意されています。

(コネクション型の場合) Protocol は接続ごとに生成され、接続開始時、データ受信時、切断時などに メソッドが呼ばれます。接続開始時のメソッドは Transport を受け取り、データの送信やバッファの制御などを 行います。

echo サーバーを Protocol で実装してみましょう。

import asyncio

class EchoProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        self.transport.write(data)

    def eof_received(self):
        self.transport.close()

    def connection_lost(self, exc):
        if exc is not None:
            print(exc)
            self.transport.abort()
        self.transport = None

loop = asyncio.get_event_loop()
asyncio.async(loop.create_server(EchoProtocol, '127.0.0.1', 5555))
loop.run_forever()

コードは短くなって、切断時やエラー時の処理がしっかりして、いい感じです。

Stream

HTTP などの行志向の部分があるサーバーを書く場合など、指定したバイト数読み込みたいとか、 改行まで読み込みたいということがあります。 Protocol をそのままつかうと、受信したバイト列を自前でバッファリングして頑張らないといけませんが、 その部分をやってくれる stream もあります。

Echo サーバーを改造して、1行ずつ処理するようにしてみましょう。

import asyncio
import asyncio.streams

def echoserver(reader, writer):
    while True:
        line = yield from reader.readline()
        if not line:
            writer.close()
            break
        writer.write(b'>> ' + line)

asyncio.async(asyncio.streams.start_server(echoserver, '127.0.0.1', 5555))
asyncio.get_event_loop().run_forever()

雑感

基本的に Tornado とか使ったことがあるユーザーなら特に問題なく使いはじめることができると思います。 ちなみに、 Tornado はまだリリースされていない開発版で、 asyncio の loop をラップして Torando の IOLoop にすることができます。 (逆に Tornado のループを asyncio で利用することは今のところできません)

今までは WebSocket などの新しいプロトコルの実装や非同期アプリケーションフレームワークは 非同期IOフレームワークごとに登場していたのですが、今後は asyncio ベースで実装することで 複数の非同期IOフレームワークに対応できるので、新しいものが asyncio に集まってきて、それにつられて Python 3.3 以上への移行が加速しそうです。

他の非同期IOフレームワークも、最初はラッパーを利用した相互乗り入れで始まるでしょうが、 asyncio 対応ライブラリが増えてきたら自身の IO ループに直接 asyncio の eventloop API を サポートし、段階的に現在のAPIを廃止していくことになるのかなぁと思います。

個人的には、とりあえず stream の使い方がわかったので PyMySQL の asyncio 対応に挑戦してみます。

このブログに乗せているコードは引用を除き CC0 1.0 で提供します。