【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 6 回 - コルーチン ( coroutines ) を利用して数多くのブロッキング I/O を並列処理する fan-out、fan-in パターンを実現しよう、の巻 投稿一覧へ戻る
SUPPORT UKRAINE
- Your indifference to the act of cruelty can thrive rogue nations like Russia -
第 2 回
第 3 回
第 4 回
第 5 回
第 3 回から前回の第 5 回にかけて、Thread、Thread と Queue の併用、ThreadPoolExecutor を利用したブロッキング I/O ( blocking I/O ) の効率的処理を検証してきましたが、数多くの並列処理をこなすためには今一つ、というのが実情でした。
しかし、高レベルな (処理数の多い) I/O を並列処理するために、Python では コルーチン ( coroutine ) が用意されています。
コルーチンを利用することで、非常に多く (何千、何万) の処理が同時に実行されているように振舞わせることが可能になるんです。
コルーチンはジェネレータ ( generator ) と同様の機能を有していて、async と await キーワード を利用して実装します。
スレッド ( thread ) を開始するコストは非常に高いですが、コルーチンの開始コストは通常の関数呼び出しと何ら変わりません。
また、実行中のコルーチンが占めるメモリスペースも 1 KB に満たない少なさです。
スレッドと同様に、コルーチンも独立した実行体として入力を受け付け、結果を返すことが出来ます。
ただ、await 式で自身の実行を停止し、他の awaitable オブジェクトを実行、その終了を待って再開することが出来るんです (実際にこれらをコントロールしているのは全てのタスクを監視しているイベントループ [ event loop ] です)。
ちょっと yield 式を含むジェネレータの振る舞いと似ていませんか?
事前にスケジュールされた別々の多くのコルーチンは、まるで同時に実行されているように、そう、複数のスレッドが並列処理を行うように振舞います。
ただ、先ほども少し触れましたが、コルーチンはこうした並列処理を、メモリオーバーヘッド ( memory overhead )、開始コスト、コンテキストスイッチコスト ( context switchin cost )、複雑なデータロック機構等の同期処理コードなどを一切必要とせずに実現 します。
こうした素晴らしいメカニズムを実現している源がイベントループで、莫大な数の awaitable オブジェクト間で実行を素早く切り替えながら効率的に I/O を処理しているんです。
シリーズ 6 回目の今回は、このコルーチンを利用して Conway's Game of Life を実装し直してみたいと思います。
目指すところは、ブロッキング I/O を含む game_logic() を処理する際に Thread や Queue を利用した場合に発生した問題を、コルーチンを利用して克服することです。
そのための第一歩として、async def キーワードを使って game_logic() をコルーチン関数に書き換えましょう。
そうすれば、await 式を利用して I/O 処理の待機時間を有効に使えるようになります。
Grid クラスと count_neighbors() 関数に変更はありません。
DEAD = '□'
class Grid:
def __init__(self, rows, columns):
self.rows = rows
self.columns = columns
self.mat = [[DEAD] * self.columns for _ in range(self.rows)]
def get(self, row, column):
return self.mat[row % self.rows][column % self.columns]
def set(self, row, column, state):
self.mat[row % self.rows][column % self.columns] = state
def __str__(self):
result = ''
for row in range(self.rows):
result += ''.join([self.mat[row][column] for column in range(self.columns)]) + '\n'
return result
def count_neighbors(row, column, get_func):
u_ = get_func(row - 1, column + 0) # 上
ur = get_func(row - 1, column + 1) # 右上
r_ = get_func(row + 0, column + 1) # 右
dr = get_func(row + 1, column + 1) # 右下
d_ = get_func(row + 1, column + 0) # 下
dl = get_func(row + 1, column - 1) # 左下
l_ = get_func(row + 0, column - 1) # 左
ul = get_func(row - 1, column - 1) # 左上
neighbor_states = [u_, ur, r_, dr, d_, dl, l_, ul]
count = 0
for state in neighbor_states:
if state == LIVE:
count += 1
return count
async def キーワードを使って game_logic() をコルーチン関数に定義し直します。
await 式で I/O 処理の間一旦制御をイベントループに戻し、他のコルーチンを開始 / 再開することで CPU を有効活用しましょう。
ここでは擬似的な I/O 処理待機時間として 0.1 秒間実行を止めてみます。
async def game_logic(state, live_ct):
...
# ブロッキング入出力処理
# data = await my_socket.read(50)
await asyncio.sleep(0.1) # 模擬的ブロッキング I/O 待機
...
if state == LIVE:
if live_ct < 2 or live_ct > 3:
return DEAD
else:
if live_ct == 3:
return LIVE
return state
同様に、step_cell() もコルーチン関数に書き換え、await 式で game_logic() を開始、終了を待って次ステップのグリッドデータをセットします。
state = cur_get(row, column)
live_ct = count_neighbors(row, column, cur_get)
next_state = await game_logic(state, live_ct) # I/O 処理の終了を待ちます
next_set(row, column, next_state)
simulate() も、グリッドを構成するセルの数だけ step_cell() を呼び出し ( fan-out )、await 式を利用してそれらの完了を待つ ( fan-in ) 必要がありますから、コルーチン関数へ書き換えます。
next_grid = Grid(source_grid.rows, source_grid.columns)
tasks = []
for row in range(source_grid.rows):
for column in range(source_grid.columns):
task = step_cell(row, column, source_grid.get, next_grid.set) # 1: Fan out
tasks.append(task)
await asyncio.gather(*tasks) # 2: Fan in
return next_grid
ここでもう少し simulate() について説明します。
1: step_cell() を呼び出しています。ですが、ここで step_cell() を「実行している」わけではないことに注意してください。
step_cell() は async def で定義されているコルーチン関数です。
コルーチン関数を呼び出すと、その返り値としてコルーチン (正確には「コルーチンオブジェクト」) が返ってきます。
コルーチンは awaitable オブジェクトですから、await 式を利用して実行を開始、終了を待機することが出来ますね。
これはちょうど、ジェネレータ関数を呼び出すと、即座に実行されるのではなく、ジェネレータ (正確にはジェネレータイテレータです) が返ってくるのと似ています。
2: await 式でasyncio.gather() を実行することで、複数のコルーチン ( step_cell() です) をイベントループで並列処理し、それらの終了を待機します。
simulate() に実行権が戻ってきた、ということは step_cell() が全て終了した、すなわち、次ステップのグリッドが完成した、ということになります。
ここまでで、あれっ、と思われた方もいるかもしれません、鋭いです!!
今回もマルチスレッドのときと同様に多くのコルーチンが Grid インスタンスにアクセスしています。
しかし今回は Lock を利用してデータ競合 ( data race ) を防止する処置を施していません。大丈夫なんでしょうか?
はい、大丈夫なんです。
Asynchronous Programming (非同期プログラミング) は OS の干渉がなく、1 つのプロセスの 1 つのスレッド上で実行される からです。
さて、ここまでで準備は完了しました。
グリッドを表示するために利用している StagePrinter クラスに変更はありません。
def __init__(self):
self.stages = []
def append(self, data: str):
self.stages.append(data)
def __str__(self):
row_ct = len(self.stages[0].splitlines())
rows = [''] * row_ct
for m in range(row_ct):
for index, stage in enumerate(self.stages):
line = stage.splitlines()[m]
rows[m] += line
if index < len(self.stages) - 1:
if not m == row_ct // 2:
rows[m] += ' ' * 3
else:
rows[m] += ' → '
return '\n'.join(rows)
import time
grid = Grid(5, 9)
grid.set(0, 3, LIVE)
grid.set(1, 4, LIVE)
grid.set(2, 2, LIVE)
grid.set(2, 3, LIVE)
grid.set(2, 4, LIVE)
最後に 1 ヶ所だけ変更を加えます。
コルーチン関数として用意した simulate() をコルーチンとしてイベントループ内で実行し、そこに含まれる I/O 操作を協調的マルチタスク ( cooperative multi-tasking ) で並列処理するために、asyncio.run() を実行します。
この asyncio.run() は、イベントループを作成し、そこで実行するコルーチンをラップしたタスク ( Task ) オブジェクトを作成、スケジューリングし、その実行終了を待ってコルーチンからの戻り値を返し、最終的にイベントループを終了する、という一連の動作を全てこなしてくれます。
for m in range(1):
stages = StagePrinter()
for i in range(5):
stages.append(str(grid))
grid = asyncio.run(simulate(grid)) # 実際にイベントループ内でコルーチンを実行します
print(stages)
print()
end = time.perf_counter()
print(f"処理時間: {end - start} s")
# □□□■□□□□□ □□□□□□□□□ □□□□□□□□□ □□□□□□□□□ □□□□□□□□□
# □□□□■□□□□ □□■□■□□□□ □□□□■□□□□ □□□■□□□□□ □□□□■□□□□
# □□■■■□□□□ → □□□■■□□□□ → □□■□■□□□□ → □□□□■■□□□ → □□□□□■□□□
# □□□□□□□□□ □□□■□□□□□ □□□■■□□□□ □□□■■□□□□ □□□■■■□□□
# □□□□□□□□□ □□□□□□□□□ □□□□□□□□□ □□□□□□□□□ □□□□□□□□□
#
# 処理時間: 0.539256062 s
実行結果は今まで検証してきた Thread や Queue を利用した場合と変わりませんが、スレッド利用に伴うオーバーヘッド問題はキレイサッパリ解決しています。
また、コルーチンを利用しているこの非同期プログラムは 1 つのスレッド上で動作していますから、使い慣れたデバッガー ( debugger ) を使ってコードを 1 行 1 行追跡していくことも可能です。
もし将来的に設計が変更になり count_neighbors() 関数内でも I/O を処理する必要が生じた場合でも、async def キーワードで count_neighbors() を定義し直し、step_cell() 内での呼び出しの際に await 式を使ってイベントループでコルーチンを実行するようにするだけで済んじゃいます。
u_ = get_func(row - 1, column + 0) # 上
ur = get_func(row - 1, column + 1) # 右上
r_ = get_func(row + 0, column + 1) # 右
dr = get_func(row + 1, column + 1) # 右下
d_ = get_func(row + 1, column + 0) # 下
dl = get_func(row + 1, column - 1) # 左下
l_ = get_func(row + 0, column - 1) # 左
ul = get_func(row - 1, column - 1) # 左上
neighbor_states = [u_, ur, r_, dr, d_, dl, l_, ul]
count = 0
for state in neighbor_states:
if state == LIVE:
count += 1
return count
async def step_cell(row, column, cur_get, next_set):
state = cur_get(row, column)
live_ct = await count_neighbors(row, column, cur_get)
next_state = await game_logic(state, live_ct) # I/O 処理の終了を待ちます
next_set(row, column, next_state)
for m in range(1):
stages = StagePrinter()
for i in range(5):
stages.append(str(grid))
grid = asyncio.run(simulate(grid)) # 実際にイベントループ内でコルーチンを実行します
print(stages)
print()
# □□□■□□□□□ □□□□□□□□□ □□□□□□□□□ □□□□□□□□□ □□□□□□□□□
# □□□□■□□□□ □□■□■□□□□ □□□□■□□□□ □□□■□□□□□ □□□□■□□□□
# □□■■■□□□□ → □□□■■□□□□ → □□■□■□□□□ → □□□□■■□□□ → □□□□□■□□□
# □□□□□□□□□ □□□■□□□□□ □□□■■□□□□ □□□■■□□□□ □□□■■■□□□
# □□□□□□□□□ □□□□□□□□□ □□□□□□□□□ □□□□□□□□□ □□□□□□□□□
コルーチンを利用することで、I/O 処理等に伴う並列処理実行のために頭を悩ますのではなく、本来のプログラムロジックそのものに集中できるようになる、ということがお分かりいただけたでしょうか?
まとめ:
1: async def キーワードで定義された関数はコルーチン関数です。この関数を await 式で呼び出すことで、関数の戻り値であるコルーチンオブジェクトがイベントループで実行され、その返り値を取得することが可能です。
2: コルーチンは非常に多く (何千何万) の関数を同時に実行する (正確には、「しているように見える」) 効率的な方法を提供してくれます。
3: コルーチンを利用して I/O 処理を並行実行する fan-out、fan-in パターンを実装できます。この実装では、スレッドを利用した同様の実装時に生じる様々な問題を一掃出来ます。
補足:
非同期プログラミング ( asynchronous programming ) とシーケンシャルプログラミング ( sequential programming ) の違い、process、thread、async それぞれの特徴を初心者向けに解説している youtube の動画があります。
Miguel Grinberg Asynchronous Python for the Complete Beginner PyCon 2017 というタイトルで、講演は全て英語ですが、非常に分かりやすくためになります。
特に、非同期プログラムとシーケンシャルプログラムの違いを、「チェスのプロ vs 20 人のアマチュア」の指導チェスにおける所要時間の差異、に例えている部分は特に分かりやすいです。
興味があれば覗いてみてください。