検索ガイド -Search Guide-

単語と単語を空白で区切ることで AND 検索になります。
例: python デコレータ ('python' と 'デコレータ' 両方を含む記事を検索します)
単語の前に '-' を付けることで NOT 検索になります。
例: python -デコレータ ('python' は含むが 'デコレータ' は含まない記事を検索します)
" (ダブルクオート) で語句を囲むことで 完全一致検索になります。
例: "python data" 実装 ('python data' と '実装' 両方を含む記事を検索します。'python data 実装' の検索とは異なります。)
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
>>
effective_python

【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 5 回 - 並列処理 ( concurrency ) のためにスレッド ( thread ) を利用する場合は concurrent.futures モジュールの ThreadPoolExecutor の導入を検討しましょう、の巻 投稿一覧へ戻る

SUPPORT UKRAINE

- Your indifference to the act of cruelty can thrive rogue nations like Russia -

さて、えっちらおっちらとやってきまして、このシリーズも第 5 回になっております。


取り敢えず整理も兼ねまして、過去 4 回のポストへのリンクをまとめておきます。


このシリーズで並列処理 ( concurrency ) ならびに並行実行 ( parallelism ) への検討を重ねている Conway's Game of Life の大元のコードは第 1 回と第 2 回で作成したものですので、関心がある方はそこから読んで頂いたほうが理解が早いと思います。


第 1 回
第 2 回
第 3 回
第 4 回


大元の Conway's Game of Life のコードに I/O 処理を追加する、そこで fan-out、fan-in パターンを実装してブロッキング I/O 処理を効率的に実行しよう、という目標の元、第 3 回では Thread だけを利用した場合、第 4 回では Thread と Queue を利用して producer-consumer pipeline を構築した場合を検討してきましたが、どちらも「大満足」という結果にはなりませんでした。


そこで今回は、Thread と Queue の「いいとこ取り!」を目指して、concurrent.futures 組み込みモジュールの ThreadPoolExecutor クラスの利用を検討します。


Grid クラスは複数のスレッドがアクセスするため、データ競合 ( data race ) を防止しデータの一貫性を保持するために Lock を利用するサブクラスを定義します。


これは第 3 回の Thread を利用した場合と同じです。


from threading import Lock


LIVE = '■'
DEAD = '□'


class Grid:
def __init__(self, rows, columns):
self.rows = rows
self.columns = columns
self.mat = [[DEAD] * self.columns for _ in range(self.rows)]

def get(self, row, column):
return self.mat[row % self.rows][column % self.columns]

def set(self, row, column, state):
self.mat[row % self.rows][column % self.columns] = state

def __str__(self):
result = ''
for row in range(self.rows):
result += ''.join([self.mat[row][column] for column in range(self.columns)]) + '\n'
return result


class LockingGrid(Grid):
def __init__(self, rows, columns):
super().__init__(rows, columns)
self.lock = Lock()

def get(self, row, column):
with self.lock:
return super().get(row, column)

def set(self, row, column, state):
with self.lock:
super().set(row, column, state)

def __str__(self):
with self.lock:
return super().__str__()



以下の 3 つの関数に変更はありません。


import select
import socket
import time


def count_neighbors(row, column, get_func):
u_ = get_func(row - 1, column + 0) # 上
ur = get_func(row - 1, column + 1) # 右上
r_ = get_func(row + 0, column + 1) # 右
dr = get_func(row + 1, column + 1) # 右下
d_ = get_func(row + 1, column + 0) # 下
dl = get_func(row + 1, column - 1) # 左下
l_ = get_func(row + 0, column - 1) # 左
ul = get_func(row - 1, column - 1) # 左上
neighbor_states = [u_, ur, r_, dr, d_, dl, l_, ul]
count = 0
for state in neighbor_states:
if state == LIVE:
count += 1
return count


def game_logic(state, live_ct):
...
# ブロッキング入出力処理
data = my_socket.recv(128)
select.select([socket.socket()], [], [], 0.1) # 模擬的ブロッキング I/O 処理
...

if state == LIVE:
if live_ct < 2 or live_ct > 3:
return DEAD
else:
if live_ct == 3:
return LIVE
return state


def step_cell(row, column, cur_get, next_set):
state = cur_get(row, column)
live_ct = count_neighbors(row, column, cur_get)
next_state = game_logic(state, live_ct)
next_set(row, column, next_state)



今回の ThreadPoolExecutor を利用した fan-out、fan-in パターンの実装では、ステップごとのグリッドを作成するたびにスレッドを作成、終了させるのではなく、executor に対して実行する関数 ( step_cell() 関数 ) を渡すことで fan-out を、その終了を待つことで fan-in を実現します。


from concurrent.futures import ThreadPoolExecutor


def simulate_pool(pool, source_grid: LockingGrid):
next_grid = LockingGrid(source_grid.rows, source_grid.columns)

futures = []
for row in range(source_grid.rows):
for column in range(source_grid.columns):
args = (row, column, source_grid.get, next_grid.set)
future = pool.submit(step_cell, *args) # Fan out
futures.append(future)

for future in futures:
future.result() # Fan in

return next_grid


class StagePrinter:
def __init__(self):
self.stages = []

def append(self, data: str):
self.stages.append(data)

def __str__(self):
row_ct = len(self.stages[0].splitlines())

rows = [''] * row_ct
for m in range(row_ct):
for index, stage in enumerate(self.stages):
line = stage.splitlines()[m]
rows[m] += line

if index < len(self.stages) - 1:
if not m == row_ct // 2:
rows[m] += ' ' * 3
else:
rows[m] += ' → '

return '\n'.join(rows)


grid = LockingGrid(5, 9)

grid.set(0, 3, LIVE)
grid.set(1, 4, LIVE)
grid.set(2, 2, LIVE)
grid.set(2, 3, LIVE)
grid.set(2, 4, LIVE)



executor の実行に割り当てられるスレッドは前もって作成しておきます。


これによって、スレッドの作成、終了を繰り返すことで生じるオーバーヘッドとスレッド開始時のコスト低減を図ると共に、スレッドプール ( thread pool ) で利用するスレッド数の上限を max_workers パラメータで指定することで、ブロッキング I/O 並行実行時のスレッド使用時に発生する可能性のあるメモリ不足によるプログラムクラッシュを予防します。


start = time.perf_counter()


with ThreadPoolExecutor(max_workers=10) as pool:
for m in range(1):
stages = StagePrinter()
for i in range(5):
stages.append(str(grid))
grid = simulate_pool(pool, grid)
print(stages)
print()


end = time.perf_counter()


print(f"処理時間: {end - start} s")

# □□□■□□□□□   □□□□□□□□□   □□□□□□□□□   □□□□□□□□□   □□□□□□□□□
# □□□□■□□□□   □□■□■□□□□   □□□□■□□□□   □□□■□□□□□   □□□□■□□□□
# □□■■■□□□□ → □□□■■□□□□ → □□■□■□□□□ → □□□□■■□□□ → □□□□□■□□□
# □□□□□□□□□   □□□■□□□□□   □□□■■□□□□   □□□■■□□□□   □□□■■■□□□
# □□□□□□□□□   □□□□□□□□□   □□□□□□□□□   □□□□□□□□□   □□□□□□□□□
#
# 処理時間: 2.719415541 s



ThreadPoolExecutor を利用する利点の 1 つは、submit() メソッドで作成した Future オブジェクトの result() メソッドが呼ばれた際に、発生した例外が呼び出し元まで自動的に伝播される、ということです。


これによって、第 3 回のように例外をスレッドがキャプチャしてあとは知らん顔されたり、第 4 回のように「スレッド → キュー → メインスレッド」と例外を伝播させるための実装を自分で行う必要があったり、ということがなくなります。


def game_logic(state, live_ct):
...
raise OSError('I/O で問題が発生しました')
...

# Traceback (most recent call last):
# ...
# grid = simulate_pool(pool, grid)
# ...
# future.result() # Fan in
# ...
# ...
# next_state = game_logic(state, live_ct)
# ...
# raise OSError('I/O で問題が発生しました')
# OSError: I/O で問題が発生しました



しかも、game_logic() だけではなく例えば count_neighbors() 関数で I/O 操作を組み込む必要が生じた場合でも、今回の実装であれば変更を加えなければいけない箇所は 1 ヶ所もないんです!


ThreadPoolExecutor を利用して並列処理しているのは step_cell() 関数ですし、その一部として game_logic() も count_neighbors() も動作していますから、ブロッキング I/O 処理は自動的に並行実行されるんですね。


いいこと尽くめの ThreadPoolExecutor クラスを利用した fan-out、fan-in パターンの実装ですが、残念ながらまだ問題が残ってしまっています。


それは第 4 回の課題と同じく、前もって利用可能なスレッドの数を決め打ちしてしまっている、ということです。


max_workers パラメータに渡す数を大きくすればいいじゃん、となりそうですが、スレッド開始コストとメモリクラッシュの狭間で最適値を探し出すのは容易なことではありません。


しかし、安全性を見積もって少なめに指定すれば、グリッドを構成するセルの数が非常に多い場合などには十分なメリットを得られないことになってしまいます。


まとめ:

1: ThreadPoolExecutor クラスを利用すると、最小限のリファクタリング ( refactoring ) 作業でブロッキング I/O の並行実行を実現することができます。これによって fan-out、fan-in パターン実装に伴うスレッドの開始、終了反復コストを容易に回避可能です。

2: ThreadPoolExecutor を max_workers パラメータを指定して利用することで、第 3 回のようにスレッドを直接使用した際に問題となったメモリクラッシュ ( memory blow-up issues ) の可能性を低減することが可能ですが、I/O 並行実行ラインの数をあらかじめ指定することで最大のパフォーマンスが得られなくなることも事実です。

この記事に興味のある方は次の記事にも関心を持っているようです...
- People who read this article may also be interested in following articles ... -
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 4 回 - 並列処理 ( concurrency ) 実現のために queue を利用するとリファクタリング ( refactoring ) 作業が大変です、の巻
【 Effective Python, 2nd Edition + coding challenge 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第2回 - Conway's Game of Life coding challenge の実装例と課題、の巻
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 6 回 - コルーチン ( coroutines ) を利用して数多くのブロッキング I/O を並列処理する fan-out、fan-in パターンを実現しよう、の巻
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 3 回 - Thread インスタンスの頻繁な start / join による fan-out / fan-in パターン実装は避けるべし、の巻
【 Effective Python, 2nd Edition + coding challenge 】プログラム開発のどの段階で並列処理 ( concurrency ) が必要になるのだろう? そのときどのようにリファクタリング ( refactoring ) していけばいいのだろう? を考えてみるシリーズ ( のはず ) 第1回
【 Effective Python, 2nd Edition 】Queue クラスを利用した producer-consumer パイプライン ( pipelines ) を構築して、マルチスレッドシーケンス処理をエレガントに管理しよう! 並行実行 ( parallelism ) と並列処理 ( concurrency ) もついでにちゃんとイメージしよう!
【 Effective Python, 2nd Edition 】スレッド ( thread ) とコルーチン ( coroutine ) を混在させながら、asyncio を利用した非同期プログラムへ段階的に移行させよう!