検索ガイド -Search Guide-

単語と単語を空白で区切ることで AND 検索になります。
例: python デコレータ ('python' と 'デコレータ' 両方を含む記事を検索します)
単語の前に '-' を付けることで NOT 検索になります。
例: python -デコレータ ('python' は含むが 'デコレータ' は含まない記事を検索します)
" (ダブルクオート) で語句を囲むことで 完全一致検索になります。
例: "python data" 実装 ('python data' と '実装' 両方を含む記事を検索します。'python data 実装' の検索とは異なります。)
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
>>
effective_python

【 Effective Python, 2nd Edition + coding challenge 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第2回 - Conway's Game of Life coding challenge の実装例と課題、の巻 投稿一覧へ戻る

Published 2020年8月26日19:04 by mootaro23

SUPPORT UKRAINE

- Your indifference to the act of cruelty can thrive rogue nations like Russia -

今回は、前回 の Conway's Game of Life の実装を追っていきながら、並行処理 ( concurrency ) へ向けた課題を検討したいと思います。


まずは前回の Conway's Game of Life coding challenge の実装例を...


グリッド ( grid ) を象徴するクラスは前回示したとおりです。


Conway's Game of Life では、グリッドの上下、左右がそれぞれつながっている、ある意味「無限空間」となっているため、Grid クラスの get()、set() ではどのような行番号、列番号が渡されてきてもグリッドの範囲内に収まるように、% オペレータを利用しています。


LIVE = '■'
DEAD = '□'


class Grid:
"""
グリッドを象徴する
:param rows: 行数
:param columns: 列数
"""

def __init__(self, rows, columns):
self.rows = rows
self.columns = columns
# 与えられた行数、列数をもつ 2 次元リストを作成する。全ての要素のデフォルト値は DEAD
self.mat = [[DEAD] * self.columns for _ in range(self.rows)]

def get(self, row, column):
"""
対象セルの値 ( 状態: LIVE or DEAD ) を返す
"""

return self.mat[row % self.rows][column % self.columns]

def set(self, row, column, state):
"""
対象セルに値 ( 状態: LIVE or DEAD ) をセットする
"""

self.mat[row % self.rows][column % self.columns] = state

def __str__(self):
"""
このグリッドを象徴する文字列を返す
この文字列は '□□□\n■□□\n□□■\n■■□\n' のように、各列の末尾に '\n' が付加されたもの
"""

result = ''
for row in range(self.rows):
result += ''.join([self.mat[row][column] for column in range(self.columns)]) + '\n'
return result


grid = Grid(5, 9)

grid.set(0, 3, LIVE)
grid.set(1, 4, LIVE)
grid.set(2, 2, LIVE)
grid.set(2, 3, LIVE)
grid.set(2, 4, LIVE)


print(grid)

# □□□■□□□□□
# □□□□■□□□□
# □□■■■□□□□
# □□□□□□□□□
# □□□□□□□□□



次に、対象セルを取り囲む 8 つのセルの状態を取得するためのヘルパー関数 count_neighbors() を作成しました。


やっていることは単純です。


ただ、この関数では、グリッドを象徴するクラスオブジェクト自体を引数として取らずに、情報だけを返してくれる関数を第 3 パラメータとして取っています。


これによって汎用性を高めているんです。


例えば、任意のデータを渡すことが出来る簡易的な get 関数を用意すれば、この関数が期待通りに機能しているかを、様々なデータをセットした複数の Grid オブジェクトを作成せずともテストできたりします。


def count_neighbors(row, column, get_func):
"""
対象セル (row, column) の 8 つの隣り合ったセルのうち生存しているセルの個数を返す

:param row: 対象セルの行番号
:param column: 対象セルの列番号
:param get_func: 行、列 2 つの引数を取り、その場所の状態 ( LIVE か DEAD ) を取得するための関数
:return: 隣り合う 8 つのセルのうち生存しているセルの個数
"""

u_ = get_func(row - 1, column + 0) # 上
ur = get_func(row - 1, column + 1) # 右上
r_ = get_func(row + 0, column + 1) # 右
dr = get_func(row + 1, column + 1) # 右下
d_ = get_func(row + 1, column + 0) # 下
dl = get_func(row + 1, column - 1) # 左下
l_ = get_func(row + 0, column - 1) # 左
ul = get_func(row - 1, column - 1) # 左上
neighbor_states = [u_, ur, r_, dr, d_, dl, l_, ul]
count = 0
for state in neighbor_states:
if state == LIVE:
count += 1
return count



game_logic() 関数は、Conway's Game of Life におけるゲームロジック実装部分です。


ルール自体は前回の記事を参考にしてください。


def game_logic(state, live_ct):
"""
次のステップでの状態 ( LIVE か DEAD ) を返す

:param state: 対象セルの現在の状態 ( LIVE か DEAD )
:param live_ct: 対象のセルを取り囲む 8 つのセルのうち生存しているセルの数
:return: 次のステップにおける対象セルの生死 ( LIVE か DEAD )
"""

if state == LIVE:
if live_ct < 2 or live_ct > 3:
return DEAD
else:
if live_ct == 3:
return LIVE
return state



ここまで作成した 2 つのヘルパー関数を利用して、各セルの次のステップにおける状態を新しい Grid オブジェクトにセットしていくのが step_cell() 関数です。


この関数でも、count_neighbors() 関数と同じ理由で、第 3、第 4 パラメータとして Grid オブジェクトではなく関数を受け取っています。


def step_cell(row, column, cur_get, next_set):
"""
現ステップのセルの状態から次ステップのセルの状態を取得しセットする

:param row: 対象セルの行番号
:param column: 対象セルの列番号
:param cur_get: 現ステップのグリッドにおいて 行、列 2 つの引数を取り、その場所の状態 ( LIVE か DEAD ) を取得するための関数
:param next_set: 次ステップのグリッドにおいて 行、列、状態 3 つの引数を取り、該当セルに「状態」の値をセットするための関数
"""

state = cur_get(row, column)
live_ct = count_neighbors(row, column, cur_get)
next_state = game_logic(state, live_ct)
next_set(row, column, next_state)



simulate() 関数は step_cell() 関数を Grid オブジェクトに含まれるセルの数だけ呼び出し、次のステップのグリッドを作成、返すためのものです。


もしかすると実装している際に、元のグリッドに次のステップの結果をどんどん書き込んでいってしまった方はいませんか?


それでは結果が大きく異なってしまうはずです。


この Conway's Game of Life 実装時における重要事項は、元のグリッドからセルの情報を取得し、その結果から特定した次のステップでの状態を新たなグリッドへ書き込んでいく、という作業を黙々と繰り返す ことですね。


そして、その制御をしているのがこの simulate() 関数です。グリッド内の各セルをシーケンシャルに処理しながら次のステップのグリッドを作成します。


他のヘルパー関数は、自分が扱っている情報がどのグリッドから来てどのグリッドへ行くものなのか、ということは一切関知していません。


ただ渡されてきた get 関数から情報を取得し、処理し、渡されてきた set 関数へ渡しているだけです。


こういった関数インターフェース ( function interface ) を利用することで、プログラムの構造自体をシンプルにすることができますね!


def simulate(source_grid: Grid):
"""
現ステップのグリッド内全てのセルの遷移を反映した次ステップのグリッドを返す
次ステップのグリッドの大きさは現ステップのグリッドに準拠する

:param source_grid: 現ステップのグリッド
:return: 次ステップのグリッド
"""

next_grid = Grid(source_grid.rows, source_grid.columns)
for row in range(source_grid.rows):
for column in range(source_grid.columns):
step_cell(row, column, source_grid.get, next_grid.set)
return next_grid



あとはシュミレーションを出力するだけです。


各ステップごとに Grid オブジェクトを表示すれば済む話ですが ( Grid クラスには __str__() を実装してありますから )、


複数のステップを 1 行に出来るようにすればパターンの遷移がより見やすくなるかな?ということで、


StagePrinter クラスを用意しました。


class StagePrinter:
"""
ステップの遷移を出力する
"""

def __init__(self):
self.stages = []

def append(self, data: str):
"""
この関数を呼び出して追加したグリッド象徴文字列が、このクラスの __str__() を呼び出した際に 1 行に表示される
例えば、3 つのグリッド象徴文字列を追加した後 StagePringer.__str__() を呼び出すと、

■■□   □■□   ■□□
□■□ → ■■□ → □□□
■■□   □■□   ■■□ のように表示される

:param data: あるステップのグリッドを象徴する文字列 ( Grid.__str__() の返り値)
"""

self.stages.append(data)

def __str__(self):
row_ct = len(self.stages[0].splitlines())

rows = [''] * row_ct
for m in range(row_ct):
for index, stage in enumerate(self.stages):
line = stage.splitlines()[m]
rows[m] += line

if index < len(self.stages) - 1:
if not m == row_ct // 2:
rows[m] += ' ' * 3
else:
rows[m] += ' → '

return '\n'.join(rows)



StagePrinter オブジェクトを利用して、1 行に 4 ステップ、3 行分の合計 12 ステップを表示してみましょう。


for m in range(3):
stages = StagePrinter()
for i in range(4):
stages.append(str(grid))
grid = simulate(grid)
print(stages)
print()


# □□□■□□□□□   □□□□□□□□□   □□□□□□□□□   □□□□□□□□□
# □□□□■□□□□   □□■□■□□□□   □□□□■□□□□   □□□■□□□□□
# □□■■■□□□□ → □□□■■□□□□ → □□■□■□□□□ → □□□□■■□□□
# □□□□□□□□□   □□□■□□□□□   □□□■■□□□□   □□□■■□□□□
# □□□□□□□□□   □□□□□□□□□   □□□□□□□□□   □□□□□□□□□
#
# □□□□□□□□□   □□□□□□□□□   □□□□□□□□□   □□□□□□□□□
# □□□□■□□□□   □□□□□□□□□   □□□□□□□□□   □□□□□□□□□
# □□□□□■□□□ → □□□■□■□□□ → □□□□□■□□□ → □□□□■□□□□
# □□□■■■□□□   □□□□■■□□□   □□□■□■□□□   □□□□□■■□□
# □□□□□□□□□   □□□□■□□□□   □□□□■■□□□   □□□□■■□□□
#
# □□□□□□□□□   □□□□□■□□□   □□□□□■■□□   □□□□□■■□□
# □□□□□□□□□   □□□□□□□□□   □□□□□□□□□   □□□□□□□□□
# □□□□□■□□□ → □□□□□□□□□ → □□□□□□□□□ → □□□□□□□□□
# □□□□□□■□□   □□□□■□■□□   □□□□□□■□□   □□□□□■□□□
# □□□□■■■□□   □□□□□■■□□   □□□□■□■□□   □□□□□□■■□



ちゃんと動作していますね、よかったよかった。


さて、実はここからが今回のシリーズ掲載の本番です。


今このプログラムは、1 台のマシン上の 1 つのスレッドで動作しています。


ところが、このロジックを元に、このゲームを大規模なマルチプレーヤーオンラインゲームへ展開しよう、というプロジェクトが持ち上がりました。


するとどういうことに対処する必要が出てくるでしょうか?


game_logic() 関数において次のステップの状態を決定する際に、各プレーヤーのグリッド状態を考慮する必要が出てきますし、結果も各プレーヤーへフィードバックしなければいけません。


必然的に game_logic() 関数内で、socket のやり取りをするための I/O 操作が生じることになります。


一番直線的な解決方法は、game_logic() に直接ブロッキング I/O ( blocking I/O ) 操作を追加してしまうことでしょう。


def game_logic(state, live_ct):
...
# ブロッキング入出力処理
data = my_socket.recv(128)
...



この実装方法の問題点は、プログラム全体の処理速度を低下させてしまうことです。


インターネット経由でデータをやり取りする際の待機時間 ( latency ) がほぼ 0.1 秒 ( 100 milliseconds ) と考えると、1 つのグリッドが 45 セルから成り立っている場合に、あるステップから次のステップへグリッド全体が移行するのに要する時間は 0.1 * 45 = 4.5 秒となります。


これは、simulate() においてグリッド内の各セルを 1 つずつシーケンシャルに処理しているためです。


遅すぎますよね、次の画面が表示されるまで 4.5 秒、待ってられますか?


もし 1 つのグリッドが 10,000 セルから出来ていたら、ステップが遷移するまで 15,6 分かかっちゃいます、もうコーヒー 1 杯淹れて飲み終わります。


この問題の解決策は、ブロッキング I/O を並列処理することです。


Python における並列処理はブロッキング I/O に関する限りは並行実行 ( parallelism ) と同意ですから、どんなにグリッドが大きくなろうが処理速度はほぼ 0.1 秒です。


ある仕事単位 ( unit of work, ここでは「セル」です ) を並列処理するためにワーカースレッドをドバッと増やす過程を fan-out と呼んでいます。


一方、次の処理 (ここでは次のステップのグリッド取得、表示です) へ進む前に、それらの並列処理ラインの結果を集約するために各ラインの終了を待つことを fan-in と呼びます。


Python では fan-out、fan-in を実現するための組み込みツールが多数提供されていますが、それぞれに長所短所を有しています。


今回作成した Conway's Game of Life を利用して、今後数回に分けて、Thread、Queue、ThreadPoolExecutor、Coroutine といったツールを取り上げていくつもりです。


まとめ:

1: プログラムが大きくなるに従ってスコープと複雑さが拡大し、それに伴って並列処理が必要になってくることがあります。

2: 最も典型的な並列処理パターンは、新たな並列処理ラインの生成と、各ラインの処理終了待機からなる fan-out、fan-in です。

3: Python には fan-out、fan-in を実現するための多くの方法が用意されています。

この記事に興味のある方は次の記事にも関心を持っているようです...
- People who read this article may also be interested in following articles ... -
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 5 回 - 並列処理 ( concurrency ) のためにスレッド ( thread ) を利用する場合は concurrent.futures モジュールの ThreadPoolExecutor の導入を検討しましょう、の巻
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 3 回 - Thread インスタンスの頻繁な start / join による fan-out / fan-in パターン実装は避けるべし、の巻
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 6 回 - コルーチン ( coroutines ) を利用して数多くのブロッキング I/O を並列処理する fan-out、fan-in パターンを実現しよう、の巻
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 4 回 - 並列処理 ( concurrency ) 実現のために queue を利用するとリファクタリング ( refactoring ) 作業が大変です、の巻
【 Effective Python, 2nd Edition + coding challenge 】プログラム開発のどの段階で並列処理 ( concurrency ) が必要になるのだろう? そのときどのようにリファクタリング ( refactoring ) していけばいいのだろう? を考えてみるシリーズ ( のはず ) 第1回
【 Effective Python, 2nd Edition 】Queue クラスを利用した producer-consumer パイプライン ( pipelines ) を構築して、マルチスレッドシーケンス処理をエレガントに管理しよう! 並行実行 ( parallelism ) と並列処理 ( concurrency ) もついでにちゃんとイメージしよう!
【 Effective Python, 2nd Edition 】スレッド ( thread ) とコルーチン ( coroutine ) を混在させながら、asyncio を利用した非同期プログラムへ段階的に移行させよう!