検索ガイド -Search Guide-

単語と単語を空白で区切ることで AND 検索になります。
例: python デコレータ ('python' と 'デコレータ' 両方を含む記事を検索します)
単語の前に '-' を付けることで NOT 検索になります。
例: python -デコレータ ('python' は含むが 'デコレータ' は含まない記事を検索します)
" (ダブルクオート) で語句を囲むことで 完全一致検索になります。
例: "python data" 実装 ('python data' と '実装' 両方を含む記事を検索します。'python data 実装' の検索とは異なります。)
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
>>
effective_python

【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 3 回 - Thread インスタンスの頻繁な start / join による fan-out / fan-in パターン実装は避けるべし、の巻 投稿一覧へ戻る

Published 2020年8月31日6:35 by mootaro23

SUPPORT UKRAINE

- Your indifference to the act of cruelty can thrive rogue nations like Russia -

Python において I/O の並行実行を考えたときに真っ先に思い浮かぶのはスレッド ( thread ) だと思います。


しかしながら、fan-out を実現しようとして多数の並行実行ラインを作成した場合、スレッドの大きな弱点が顕わになってしまいます。


これを、前回までの coding challenge で実装した Conway's Game of Life プログラムを利用して考えていきます。


( Conway's Game of Life の coding challenge について不明な方は 前々回前回 の記事を参考にしてください )


さて、1 マシン 1 スレッドで動作していたプログラムを大規模なマルチプレーヤーオンラインゲームへ変貌させようと game_logic() に I/O を処理するためのコードを追加したために、解決すべき待機時間問題 ( latency problem ) が発生してしまったのでした。


そこでブロッキング I/O ( blocking I/O ) を並行実行してその問題を回避しよう、というのが前回までの流れでした。


複数のスレッドが同じデータにアクセスする必要があるわけですから、データ操作競合 ( data race ) を予防しデータ構造の正確性を保証するために、あるスレッドがグリッドデータにアクセスしている際には Lock() を利用して他のスレッドのアクセスは受け付けないようにする必要があります。


( マルチスレッド下におけるデータ操作競合やデータ構造破損 [data structure corruption] については こちらの記事 を参照してください)


そこでまず、Grid クラスのサブクラスを作成します。


このサブクラスにはロック機能を追加し、インスタンスが複数のスレッドから同時にアクセスされても問題が生じないようにしましょう。


from threading import Lock


LIVE = '■'
DEAD = '□'


class Grid:
def __init__(self, rows, columns):
self.rows = rows
self.columns = columns
self.mat = [[DEAD] * self.columns for _ in range(self.rows)]

def get(self, row, column):
return self.mat[row % self.rows][column % self.columns]

def set(self, row, column, state):
self.mat[row % self.rows][column % self.columns] = state

def __str__(self):
result = ''
for row in range(self.rows):
result += ''.join([self.mat[row][column] for column in range(self.columns)]) + '\n'
return result


class LockingGrid(Grid):
def __init__(self, rows, columns):
super().__init__(rows, columns)
self.lock = Lock()

def get(self, row, column):
with self.lock:
return super().get(row, column)

def set(self, row, column, state):
with self.lock:
super().set(row, column, state)

def __str__(self):
with self.lock:
return super().__str__()



count_neighbors()、step_cell() 関数には変更はありません。


game_logic() 関数には他のプレーヤーと情報をやり取りするためのブロッキング I/O 処理が組み込まれます。


その情報を元に次ステップの状態を決定しますが、そのロジックは本稿の目的ではないので割愛します。


またこの例題では、select モジュールの select() 関数を利用して模擬的な I/O 操作を組み入れてみました。1 回のやり取りに要する待機時間 ( latency ) は 0.1 秒とします。


import select
import socket
import time


def count_neighbors(row, column, get_func):
u_ = get_func(row - 1, column + 0) # 上
ur = get_func(row - 1, column + 1) # 右上
r_ = get_func(row + 0, column + 1) # 右
dr = get_func(row + 1, column + 1) # 右下
d_ = get_func(row + 1, column + 0) # 下
dl = get_func(row + 1, column - 1) # 左下
l_ = get_func(row + 0, column - 1) # 左
ul = get_func(row - 1, column - 1) # 左上
neighbor_states = [u_, ur, r_, dr, d_, dl, l_, ul]
count = 0
for state in neighbor_states:
if state == LIVE:
count += 1
return count


def game_logic(state, live_ct):
...
# ブロッキング入出力処理
data = my_socket.recv(128)
select.select([socket.socket()], [], [], 0.1) # 模擬的ブロッキング I/O 処理
...

if state == LIVE:
if live_ct < 2 or live_ct > 3:
return DEAD
else:
if live_ct == 3:
return LIVE
return state


def step_cell(row, column, cur_get, next_set):
state = cur_get(row, column)
live_ct = count_neighbors(row, column, cur_get)
next_state = game_logic(state, live_ct)
next_set(row, column, next_state)



そして、simulate() 関数に fan-out、fan-in 機能を組み込みます。


fan-out 部分では、step_cell() を呼び出すごとに新たなスレッドを作成します。


これによってブロッキング I/O 処理をシーケンシャル実行から並行実行 ( parallelism ) へ移行させます。


また、fan-in 部分では全てのスレッドの終了を待って、最終的な次のステップのグリッドを返すようにします。


from threading import Thread


def simulate_threaded(source_grid: Grid):
next_grid = LockingGrid(source_grid.rows, source_grid.columns)

threads = []
for row in range(source_grid.rows):
for column in range(source_grid.columns):
args = (row, column, source_grid.get, next_grid.set)
thread = Thread(target=step_cell, args=args)
thread.start() # Fan out: ブロッキング I/O 処理をワーカースレッドに分散させます
threads.append(thread)

for thread in threads:
thread.join() # Fan in: 分散させたワーカースレッドの終了を待って結果を返します

return next_grid



ここまで作成したロジックを利用してグリッド遷移を表示するために必要な変更は、Grid クラスの代わりに LockingGrid クラスを使うことと、simulate() を simulate_threaded() に置き換えることの 2 つです。


grid = LockingGrid(5, 9) # Grid オブジェクトから LockingGrid オブジェクトへ変更

grid.set(0, 3, LIVE)
grid.set(1, 4, LIVE)
grid.set(2, 2, LIVE)
grid.set(2, 3, LIVE)
grid.set(2, 4, LIVE)


class StagePrinter:
def __init__(self):
self.stages = []

def append(self, data: str):
self.stages.append(data)

def __str__(self):
row_ct = len(self.stages[0].splitlines())

rows = [''] * row_ct
for m in range(row_ct):
for index, stage in enumerate(self.stages):
line = stage.splitlines()[m]
rows[m] += line

if index < len(self.stages) - 1:
if not m == row_ct // 2:
rows[m] += ' ' * 3
else:
rows[m] += ' → '

return '\n'.join(rows)


start = time.perf_counter()


for m in range(1):
stages = StagePrinter()
for i in range(1):
stages.append(str(grid))
grid = simulate_threaded(grid) # simulate() から simulate_threaded() へ変更
print(stages)
print()


end = time.perf_counter()


print(f"処理時間: {end - start} s")


# □□□■□□□□□
# □□□□■□□□□
# □□■■■□□□□
# □□□□□□□□□
# □□□□□□□□□
#
# 処理時間: 0.112934111 s



このグリッドは 5 x 9 の 45 セルありますから、従来のシーケンシャル処理では 4.5 秒から 5.0 秒前後かかっていました。


それがほぼ期待通りの 0.1 秒で収まっています。これこそ並行実行 ( parallelism ) の本領発揮です!


じゃ、万々歳でいいじゃない、と思われるかもしれませんが、実はこの実装には重大な問題があるんです。


1: Thread を利用する場合、データ競合を防止するために Lock という特別なツールを使用する必要があります。これによるコードの複雑化によって、将来的な拡張やメンテナンスを困難にしてしまいます。


2: 1 つのスレッドを実行するごとに約 8 MB もの大量のメモリを必要とします。この例のように、たった 45 セルのグリッドを扱うのであればほとんどのコンピュータでは問題ないレベルだと思いますが、扱うセルの大きさが 10,000 セル、100,000 セルになったとしたら、私のオンボロコンピュータでは太刀打ちできず、スレッド間の並行実行は夢物語になってしまいます。


3: スレッドを開始する、という処理はパフォーマンス的に非常にコストが高いんです。しかも複数のスレッドを実行する、ということは、それらの間での切り替えを行うオーバーヘッドが生じるということでもあります。しかも今回の実装では、グリッドの各ステップごとに fan-out と fan-in を繰り返しているわけですからオーバーヘッドが非常に大きく、結果的に、期待値である 0.1 秒という I/O 処理待ち時間を実現できない可能性があります。




また、デバッグがし辛い、というのも大きな欠点です。I/O 処理は元来不安定要因が多いため game_logic() 関数内で例外が発生する可能性は常に存在します。


ここでは故意に例外を投げてみます。


def game_logic(state, live_ct):
...
raise OSError('I/O に問題が発生しました')
...



この関数をスレッドで動作させ、発生したエラーを StringIO を利用したインメモリーバッファー ( in-memory buffer ) にリダイレクトさせて書き込んでみましょう。


import contextlib
import io


fake_stderr = io.StringIO()

with contextlib.redirect_stderr(fake_stderr):
thread = Thread(target=game_logic, args=(LIVE, 3))
thread.start()
thread.join()


print(fake_stderr.getvalue())

# Exception in thread Thread-1:
# Traceback (most recent call last):
# File "threading.py", line 932, in _bootstrap_inner
# self.run()
# File "threading.py", line 870, in run
# self._target(*self._args, **self._kwargs)
# File "app.py", line 253, in game_logic
# raise OSError('I/O に問題が発生しました')
# OSError: I/O に問題が発生しました



OSError は期待通りに発生していますが、スレッドを作成し join() を呼んで終了を待っているコードの情報はエラーに一切含まれていません。


これは Thread クラスの性格によるもので、target パラメータで指定された関数から発生した例外については独自にキャッチして sys.stderr に書き込み、スレッド自体を作成、開始した呼び出し元には一切例外を通知しないんです。


ここまでのことを勘案すると、並行実行ラインを作成し、終了し、作成し、 ...、をコンスタントに繰り返す必要がある場合、すなわち fan-out、fan-in が繰り返される場面では、スレッドの利用は解決策にはならない、ということになるようです。


まとめ:

1: スレッドは多くの難点を抱えています。開始する際のオーバーヘッドが大きく、実行に際して多くのメモリを要求します。また、Lock クラスインスタンス等の特別なツールを使用してデータの整合性を保証する必要も生じます。

2: スレッド使用時に発生した例外について、スレッドの作成元、もしくは、スレッド終了を待機している場所への例外発生を知らせる Python 組み込みの方法が用意されておらず、デバッグが困難です。

この記事に興味のある方は次の記事にも関心を持っているようです...
- People who read this article may also be interested in following articles ... -
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 4 回 - 並列処理 ( concurrency ) 実現のために queue を利用するとリファクタリング ( refactoring ) 作業が大変です、の巻
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 6 回 - コルーチン ( coroutines ) を利用して数多くのブロッキング I/O を並列処理する fan-out、fan-in パターンを実現しよう、の巻
【 Effective Python, 2nd Edition + coding challenge 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第2回 - Conway's Game of Life coding challenge の実装例と課題、の巻
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 5 回 - 並列処理 ( concurrency ) のためにスレッド ( thread ) を利用する場合は concurrent.futures モジュールの ThreadPoolExecutor の導入を検討しましょう、の巻
【 Effective Python, 2nd Edition + coding challenge 】プログラム開発のどの段階で並列処理 ( concurrency ) が必要になるのだろう? そのときどのようにリファクタリング ( refactoring ) していけばいいのだろう? を考えてみるシリーズ ( のはず ) 第1回
【 Effective Python, 2nd Edition 】Queue クラスを利用した producer-consumer パイプライン ( pipelines ) を構築して、マルチスレッドシーケンス処理をエレガントに管理しよう! 並行実行 ( parallelism ) と並列処理 ( concurrency ) もついでにちゃんとイメージしよう!
【 Effective Python, 2nd Edition 】スレッド ( thread ) とコルーチン ( coroutine ) を混在させながら、asyncio を利用した非同期プログラムへ段階的に移行させよう!