今回は、
前回 の Conway's Game of Life の実装を追っていきながら、並行処理 ( concurrency ) へ向けた課題を検討したいと思います。
まずは前回の Conway's Game of Life coding challenge の実装例を...
グリッド ( grid ) を象徴するクラスは前回示したとおりです。
Conway's Game of Life では、グリッドの上下、左右がそれぞれつながっている、ある意味「無限空間」となっているため、Grid クラスの get()、set() ではどのような行番号、列番号が渡されてきてもグリッドの範囲内に収まるように、% オペレータを利用しています。
LIVE = '■'
DEAD = '□'
class Grid:
def __init__(self, rows, columns):
self.rows = rows
self.columns = columns
self.mat = [[DEAD] * self.columns for _ in range(self.rows)]
def get(self, row, column):
return self.mat[row % self.rows][column % self.columns]
def set(self, row, column, state):
self.mat[row % self.rows][column % self.columns] = state
def __str__(self):
result = ''
for row in range(self.rows):
result += ''.join([self.mat[row][column] for column in range(self.columns)]) + '\n'
return result
grid = Grid(5, 9)
grid.set(0, 3, LIVE)
grid.set(1, 4, LIVE)
grid.set(2, 2, LIVE)
grid.set(2, 3, LIVE)
grid.set(2, 4, LIVE)
print(grid)
# □□□■□□□□□
# □□□□■□□□□
# □□■■■□□□□
# □□□□□□□□□
# □□□□□□□□□
次に、対象セルを取り囲む 8 つのセルの状態を取得するためのヘルパー関数 count_neighbors() を作成しました。
やっていることは単純です。
ただ、この関数では、グリッドを象徴するクラスオブジェクト自体を引数として取らずに、情報だけを返してくれる関数を第 3 パラメータとして取っています。
これによって汎用性を高めているんです。
例えば、任意のデータを渡すことが出来る簡易的な get 関数を用意すれば、この関数が期待通りに機能しているかを、様々なデータをセットした複数の Grid オブジェクトを作成せずともテストできたりします。
def count_neighbors(row, column, get_func):
u_ = get_func(row - 1, column + 0)
ur = get_func(row - 1, column + 1)
r_ = get_func(row + 0, column + 1)
dr = get_func(row + 1, column + 1)
d_ = get_func(row + 1, column + 0)
dl = get_func(row + 1, column - 1)
l_ = get_func(row + 0, column - 1)
ul = get_func(row - 1, column - 1)
neighbor_states = [u_, ur, r_, dr, d_, dl, l_, ul]
count = 0
for state in neighbor_states:
if state == LIVE:
count += 1
return count
game_logic() 関数は、Conway's Game of Life におけるゲームロジック実装部分です。
ルール自体は前回の記事を参考にしてください。
def game_logic(state, live_ct):
if state == LIVE:
if live_ct < 2 or live_ct > 3:
return DEAD
else:
if live_ct == 3:
return LIVE
return state
ここまで作成した 2 つのヘルパー関数を利用して、各セルの次のステップにおける状態を新しい Grid オブジェクトにセットしていくのが step_cell() 関数です。
この関数でも、count_neighbors() 関数と同じ理由で、第 3、第 4 パラメータとして Grid オブジェクトではなく関数を受け取っています。
def step_cell(row, column, cur_get, next_set):
state = cur_get(row, column)
live_ct = count_neighbors(row, column, cur_get)
next_state = game_logic(state, live_ct)
next_set(row, column, next_state)
simulate() 関数は step_cell() 関数を Grid オブジェクトに含まれるセルの数だけ呼び出し、次のステップのグリッドを作成、返すためのものです。
もしかすると実装している際に、元のグリッドに次のステップの結果をどんどん書き込んでいってしまった方はいませんか?
それでは結果が大きく異なってしまうはずです。
この
Conway's Game of Life 実装時における重要事項は、元のグリッドからセルの情報を取得し、その結果から特定した次のステップでの状態を新たなグリッドへ書き込んでいく、という作業を黙々と繰り返す ことですね。
そして、その制御をしているのがこの simulate() 関数です。グリッド内の各セルをシーケンシャルに処理しながら次のステップのグリッドを作成します。
他のヘルパー関数は、自分が扱っている情報がどのグリッドから来てどのグリッドへ行くものなのか、ということは一切関知していません。
ただ渡されてきた get 関数から情報を取得し、処理し、渡されてきた set 関数へ渡しているだけです。
こういった関数インターフェース ( function interface ) を利用することで、プログラムの構造自体をシンプルにすることができますね!
def simulate(source_grid: Grid):
next_grid = Grid(source_grid.rows, source_grid.columns)
for row in range(source_grid.rows):
for column in range(source_grid.columns):
step_cell(row, column, source_grid.get, next_grid.set)
return next_grid
あとはシュミレーションを出力するだけです。
各ステップごとに Grid オブジェクトを表示すれば済む話ですが ( Grid クラスには __str__() を実装してありますから )、
複数のステップを 1 行に出来るようにすればパターンの遷移がより見やすくなるかな?ということで、
StagePrinter クラスを用意しました。
class StagePrinter:
def __init__(self):
self.stages = []
def append(self, data: str):
self.stages.append(data)
def __str__(self):
row_ct = len(self.stages[0].splitlines())
rows = [''] * row_ct
for m in range(row_ct):
for index, stage in enumerate(self.stages):
line = stage.splitlines()[m]
rows[m] += line
if index < len(self.stages) - 1:
if not m == row_ct // 2:
rows[m] += ' ' * 3
else:
rows[m] += ' → '
return '\n'.join(rows)
StagePrinter オブジェクトを利用して、1 行に 4 ステップ、3 行分の合計 12 ステップを表示してみましょう。
for m in range(3):
stages = StagePrinter()
for i in range(4):
stages.append(str(grid))
grid = simulate(grid)
print(stages)
print()
# □□□■□□□□□ □□□□□□□□□ □□□□□□□□□ □□□□□□□□□
# □□□□■□□□□ □□■□■□□□□ □□□□■□□□□ □□□■□□□□□
# □□■■■□□□□ → □□□■■□□□□ → □□■□■□□□□ → □□□□■■□□□
# □□□□□□□□□ □□□■□□□□□ □□□■■□□□□ □□□■■□□□□
# □□□□□□□□□ □□□□□□□□□ □□□□□□□□□ □□□□□□□□□
#
# □□□□□□□□□ □□□□□□□□□ □□□□□□□□□ □□□□□□□□□
# □□□□■□□□□ □□□□□□□□□ □□□□□□□□□ □□□□□□□□□
# □□□□□■□□□ → □□□■□■□□□ → □□□□□■□□□ → □□□□■□□□□
# □□□■■■□□□ □□□□■■□□□ □□□■□■□□□ □□□□□■■□□
# □□□□□□□□□ □□□□■□□□□ □□□□■■□□□ □□□□■■□□□
#
# □□□□□□□□□ □□□□□■□□□ □□□□□■■□□ □□□□□■■□□
# □□□□□□□□□ □□□□□□□□□ □□□□□□□□□ □□□□□□□□□
# □□□□□■□□□ → □□□□□□□□□ → □□□□□□□□□ → □□□□□□□□□
# □□□□□□■□□ □□□□■□■□□ □□□□□□■□□ □□□□□■□□□
# □□□□■■■□□ □□□□□■■□□ □□□□■□■□□ □□□□□□■■□
ちゃんと動作していますね、よかったよかった。
さて、実はここからが今回のシリーズ掲載の本番です。
今このプログラムは、1 台のマシン上の 1 つのスレッドで動作しています。
ところが、このロジックを元に、このゲームを大規模なマルチプレーヤーオンラインゲームへ展開しよう、というプロジェクトが持ち上がりました。
するとどういうことに対処する必要が出てくるでしょうか?
game_logic() 関数において次のステップの状態を決定する際に、各プレーヤーのグリッド状態を考慮する必要が出てきますし、結果も各プレーヤーへフィードバックしなければいけません。
必然的に game_logic() 関数内で、socket のやり取りをするための I/O 操作が生じることになります。
一番直線的な解決方法は、game_logic() に直接ブロッキング I/O ( blocking I/O ) 操作を追加してしまうことでしょう。
def game_logic(state, live_ct):
...
data = my_socket.recv(128)
...
この実装方法の問題点は、プログラム全体の処理速度を低下させてしまうことです。
インターネット経由でデータをやり取りする際の待機時間 ( latency ) がほぼ 0.1 秒 ( 100 milliseconds ) と考えると、1 つのグリッドが 45 セルから成り立っている場合に、あるステップから次のステップへグリッド全体が移行するのに要する時間は 0.1 * 45 = 4.5 秒となります。
これは、simulate() においてグリッド内の各セルを 1 つずつシーケンシャルに処理しているためです。
遅すぎますよね、次の画面が表示されるまで 4.5 秒、待ってられますか?
もし 1 つのグリッドが 10,000 セルから出来ていたら、ステップが遷移するまで 15,6 分かかっちゃいます、もうコーヒー 1 杯淹れて飲み終わります。
この問題の解決策は、ブロッキング I/O を並列処理することです。
Python における並列処理はブロッキング I/O に関する限りは並行実行 ( parallelism ) と同意ですから、どんなにグリッドが大きくなろうが処理速度はほぼ 0.1 秒です。
ある仕事単位 ( unit of work, ここでは「セル」です ) を並列処理するためにワーカースレッドをドバッと増やす過程を
fan-out と呼んでいます。
一方、次の処理 (ここでは次のステップのグリッド取得、表示です) へ進む前に、それらの並列処理ラインの結果を集約するために各ラインの終了を待つことを
fan-in と呼びます。
Python では fan-out、fan-in を実現するための組み込みツールが多数提供されていますが、それぞれに長所短所を有しています。
今回作成した Conway's Game of Life を利用して、今後数回に分けて、Thread、Queue、ThreadPoolExecutor、Coroutine といったツールを取り上げていくつもりです。
まとめ:
1: プログラムが大きくなるに従ってスコープと複雑さが拡大し、それに伴って並列処理が必要になってくることがあります。
2: 最も典型的な並列処理パターンは、新たな並列処理ラインの生成と、各ラインの処理終了待機からなる fan-out、fan-in です。
3: Python には fan-out、fan-in を実現するための多くの方法が用意されています。