effective_python

【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 4 回 - 並列処理 ( concurrency ) 実現のために queue を利用するとリファクタリング ( refactoring ) 作業が大変です、の巻 投稿一覧へ戻る

Tags: refactoring , python , concurrency , queue , fan-out , fan-in , effective

Published 2020年9月6日9:46 by T.Tsuyoshi

シリーズ第 3 回の記事 の検証で、コンスタントに fan-out / fan-in を繰り返す場合にはスレッド ( thread ) だけでの構築は不向き、ということが分かりました。


それにも懲りず今回も Conway's Game of Life サンプルを利用して、ブロッキング I/O の並行実行 ( parallelism ) 実装に向けた最適ツールの探求を続けていきたいと思います。


今回取り上げるツールは、queue 組み込みモジュールの Queue クラスです。


Queue クラスは、producer-consumer pipeline を構築するための賢固な基盤を提供してくれる、という記事で以前取り上げました。


アプローチ方法ですが、Conway's Game of Life の各ステップにおける各セルの処理ごとにスレッドを作成する代わりに、必要と思われる数のスレッドを事前に作成しておき、それらのスレッドで I/O 操作を並行実行します。


キューからタスクを取り出し複数のワーカースレッドで処理を行う段階が fan-out で、処理結果をもう 1 つのキューに put し、そこからタスクを取り出しながら次ステップのグリッドを作成する段階が fan-in というものです。


これによってリソースの使用量を常に把握しておくことが可能になりますし、スレッドを新規作成するためのオーバーヘッドも無視することが出来ます。


そのための第一段階として、I/O 処理を組み込んだ game_logic() 関数を実行するワーカースレッドへのタスク供給と結果取得のための Queue インスタンスを用意しましょう。


LIVE = '■'
DEAD = '□'


from queue import Queue


class ClosableQueue(Queue):
SENTINEL = object() # 最後のタスクであることを把握するための「標識 ( sentinel value )」です

def add_last_one(self): # sentinel value をキューに投入する際に呼び出します
self.put(self.SENTINEL)

def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return # StopIteration 例外を発生させて取り出し側のループを終了させます。すなわちスレッドが終了します。
yield item
finally:
self.task_done()


in_queue = ClosableQueue()
out_queue = ClosableQueue()



このキューは、最後のタスクであることを示す sentinel value に遭遇するまでループで値を返し続けようとします。


ただし、Queue クラスの get() メソッドを利用することで、キューが空の場合に生じる busy waiting を回避することが出来ますから安心!


そして sentinel value に遭遇すると return を返すことにより、このキューから値を取り出している先のループを終了させるため、結果的にスレッドが終了することになります。


generator 関数が return を返す、ということは、イコール、StopIteration 例外を投げる、ということですからね。


続けて in_queue からタスクを取り出し、結果を out_queue に put するためのワーカースレッドを定義します。


from threading import Thread


class StoppableWorker(Thread):
def __init__(self, func, in_queue: ClosableQueue, out_queue: ClosableQueue, **kwargs):
super().__init__(**kwargs)
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue

def run(self):
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)



先程の Queue サブクラスの定義でも書きましたが、この thread サブクラスの run メソッドでは、in_queue からタスクを取り出し、target 関数で処理をした結果を out_queue に put しています。


このループは、in_queue で sentinel value が取り出され return が返される ( == StopIteration 例外が投げられる ) ことで終了します。


次に、ワーカースレッドで動作する target 関数である geme_logic() 関数自体には変更はありませんが、そこで発生する可能性のある I/O 関連の例外をスレッド外へ伝播するために、この game_logic() 関数をラップする形で game_logic_thread() 関数を新たに定義します。


この 2 つの関数はブロッキング I/O ( blocking I/O ) 操作に携わっており、また、複数のワーカースレッドで動作させますから、すなわち並行実行 ( parallelism ) となり、I/O 操作時における待機時間 ( latency ) を削減できます!!


import select
import socket


def game_logic(state, neighbors):
# ブロッキング I/O 処理 ( ここでは模擬的な処理を実装 )
select.select([socket.socket()], [], [], 0.1)

if state == LIVE:
if neighbors < 2:
return DEAD
elif neighbors > 3:
return DEAD
else:
if neighbors == 3:
return LIVE
return state


def game_logic_thread(item):
row, column, state, neighbors = item
try:
next_state = game_logic(state, neighbors)
except Exception as e: # ワーカースレッド内で発生した例外をスレッドの作成元へ伝播するために、例外を返却値にセットし、呼び出し元でチェックします
next_state = e
return (row, column, next_state)



さて前回では fan-out、fan-in パターンを実装するために各ステップごとにワーカースレッドの作成、終了を繰り返していましたので、そのオーバーヘッドが大きな問題になっていたのでした。


そこで今回は、あらかじめ複数のスレッドを動作させておくことで、各ステップのグリッド作成時のスレッド作成、終了を回避します。


threads = []
for _ in range(5):
thread = StoppableWorker(game_logic_thread, in_queue, out_queue)
thread.start()
threads.append(thread)


class SimulationError(Exception):
"""
ワーカースレッド内で発生した例外 ( この例では OSError ) をより特定された例外として送出するための例外サブクラス
"""

pass


class Grid:
def __init__(self, rows, columns):
self.rows = rows
self.columns = columns
self.mat = [[DEAD] * self.columns for _ in range(self.rows)]

def get(self, row, column):
return self.mat[row % self.rows][column % self.columns]

def set(self, row, column, state):
self.mat[row % self.rows][column % self.columns] = state

def __str__(self):
result = ''
for row in range(self.rows):
result += ''.join([self.mat[row][column] for column in range(self.columns)]) + '\n'
return result


def count_neighbors(row, column, get_func):
u_ = get_func(row - 1, column + 0) # 上
ur = get_func(row - 1, column + 1) # 右上
r_ = get_func(row + 0, column + 1) # 右
dr = get_func(row + 1, column + 1) # 右下
d_ = get_func(row + 1, column + 0) # 下
dl = get_func(row + 1, column - 1) # 左下
l_ = get_func(row + 0, column - 1) # 左
ul = get_func(row - 1, column - 1) # 左上
neighbor_states = [u_, ur, r_, dr, d_, dl, l_, ul]
count = 0
for state in neighbor_states:
if state == LIVE:
count += 1
return count



今回実装したキュー ( in_queue ) にタスクを put し、次ステップにおける各セルの状態をもう 1 つのキュー ( out_queue ) から取得してグリッドを作成、返すために simulate() 関数も実装し直します。


ここで、in_queue にタスクを put することでブロッキング I/O 処理を複数のスレッドに分散させますから fan-out になります。


Queue サブクラスでは get() でタスクを取り出すたびに task_done() で終了タスクカウントを +1 しています。


このカウントがキューに投入したタスクの数と同じになると join() でのブロックが解除されますね。


つまり、この時点で in_queue に投入したタスクが全て終了し、次ステップにおける各セルの状態が out_queue に put された、ということですから、ここで out_queue に sentinel value を投入します。


続けて、シーケンシャルに out_queue から値を取り出しながら次ステップのグリッドを作成しますが、この動作が fan-in に相当します。


あと、ここで勘違いしないでいただきたいのは、この fan-in 動作はあくまでも「シーケンシャル」に処理をしているだけで、ワーカースレッドでの並列処理をしているわけではない、ということです。


ですから、out_queue で sentinel value が捕捉され StopIteration 例外が投げられた、といっても、スレッドが終了するわけでも何でもありません。


ただ単に、この sumulate_pipeline() が終了し次ステップのグリッドが返された、もしくは、エラーによりプログラム自体が終了した、ということです。


def simulate_pipeline(grid: Grid, in_queue: ClosableQueue, out_queue: ClosableQueue):
for row in range(grid.rows):
for column in range(grid.columns):
state = grid.get(row, column)
neighbors = count_neighbors(row, column, grid.get)
in_queue.put((row, column, state, neighbors)) # Fan out

in_queue.join()
out_queue.add_last_one()

next_grid = Grid(grid.rows, grid.columns)
for item in out_queue: # Fan in
row, column, next_state = item
if isinstance(next_state, Exception): # ワーカースレッド内で発生した例外をキューを通してキャッチします
raise SimulationError(row, column) from next_state # デバッグがやり易くなるように、メインスレッドで再度例外を投げます
next_grid.set(row, column, next_state)

return next_grid


import time


grid = Grid(5, 9)

grid.set(0, 3, LIVE)
grid.set(1, 4, LIVE)
grid.set(2, 2, LIVE)
grid.set(2, 3, LIVE)
grid.set(2, 4, LIVE)


class StagePrinter:
def __init__(self):
self.stages = []

def append(self, data: str):
self.stages.append(data)

def __str__(self):
row_ct = len(self.stages[0].splitlines())

rows = [''] * row_ct
for m in range(row_ct):
for index, stage in enumerate(self.stages):
line = stage.splitlines()[m]
rows[m] += line

if index < len(self.stages) - 1:
if not m == row_ct // 2:
rows[m] += ' ' * 3
else:
rows[m] += ' → '

return '\n'.join(rows)


start = time.perf_counter()



あとは、simulate_pipeline() 関数をループで回しながら呼び出すだけです。


for m in range(1):
stages = StagePrinter()
for i in range(5):
stages.append(str(grid))
grid = simulate_pipeline(grid, in_queue, out_queue)
print(stages)

# □□□■□□□□□   □□□□□□□□□   □□□□□□□□□   □□□□□□□□□   □□□□□□□□□
# □□□□■□□□□   □□■□■□□□□   □□□□■□□□□   □□□■□□□□□   □□□□■□□□□
# □□■■■□□□□ → □□□■■□□□□ → □□■□■□□□□ → □□□□■■□□□ → □□□□□■□□□
# □□□□□□□□□   □□□■□□□□□   □□□■■□□□□   □□□■■□□□□   □□□■■■□□□
# □□□□□□□□□   □□□□□□□□□   □□□□□□□□□   □□□□□□□□□   □□□□□□□□□



ちゃんと動作していますね。


キューとスレッドの状態も確認してみましょう。


print(f"in_queue キューは空ですか? {in_queue.empty()}")
print(f"out_queue キューは空ですか? {out_queue.empty()}")
for thread in threads:
print(f"{thread.name} スレッドは生存していますか? {thread.is_alive()}")

# in_queue キューは空ですか? True
# out_queue キューは空ですか? True
# Thread-1 スレッドは生存していますか? True
# Thread-2 スレッドは生存していますか? True
# Thread-3 スレッドは生存していますか? True
# Thread-4 スレッドは生存していますか? True
# Thread-5 スレッドは生存していますか? True



ここで in_queue に sentinel_value をワーカースレッド分投入することで、キューから StopIteration 例外が投げられ、その結果、ワーカースレッドも終了します。


for thread in threads:
in_queue.add_last_one()


print(f"in_queue キューは空ですか? {in_queue.empty()}")
print(f"out_queue キューは空ですか? {out_queue.empty()}")
for thread in threads:
print(f"{thread.name} スレッドは生存していますか? {thread.is_alive()}")

# in_queue キューは空ですか? False
# out_queue キューは空ですか? True
# Thread-1 スレッドは生存していますか? True
# Thread-2 スレッドは生存していますか? False
# Thread-3 スレッドは生存していますか? False
# Thread-4 スレッドは生存していますか? False
# Thread-5 スレッドは生存していますか? False



ワーカースレッドの終了を待って再度状態を確認してみましょう。


for thread in threads:
thread.join()


print(f"in_queue キューは空ですか? {in_queue.empty()}")
print(f"out_queue キューは空ですか? {out_queue.empty()}")
for thread in threads:
print(f"{thread.name} スレッドは生存していますか? {thread.is_alive()}")

# in_queue キューは空ですか? True
# out_queue キューは空ですか? True
# Thread-1 スレッドは生存していますか? False
# Thread-2 スレッドは生存していますか? False
# Thread-3 スレッドは生存していますか? False
# Thread-4 スレッドは生存していますか? False
# Thread-5 スレッドは生存していますか? False


end = time.perf_counter()


print(f"処理時間: {end - start} s")

# 処理時間: 4.913743389 s



全て正常に終了しました!!


また、この実装で大切なことは、I/O 処理をしているワーカースレッドで動作している game_logic() 関数で例外が発生した場合、その例外をスレッドがキャプチャして終了、ではなく、ちゃんとキューまで伝播させ、最終的にはメインスレッドで再度例外を投げることでデバッグをやり易くする、ということです。


試しに gema_logic() 内で故意にエラーを投げて、ちゃんとメインスレッド内で例外が再度投げられているかを確認しましょう。


def game_logic(state, neighbors):
...
raise OSError('Problem with I/O in game_logic')
...

# Traceback (most recent call last):
# ...
# next_state = game_logic(state, neighbors)
# ...
# raise OSError('Problem with I/O in game_logic')
# OSError: Problem with I/O in game_logic
#
# The above exception was the direct cause of the following exception:
#
# Traceback (most recent call last):
# ...
# grid = simulate_pipeline(grid, in_queue, out_queue)
# ...
# raise SimulationError(row, column) from next_state
# __main__.SimulationError: (0, 0)



発生場所のワーカースレッド内関数の情報だけではなく、呼び出した大元の情報までちゃんと表示されるようになっていますね。


今回のキュー ( Queue ) を利用した実装により、前回のスレッド ( Thread ) だけを利用した実装時のメモリ消費、スレッドの頻繁な開始、終了にともなうオーバーヘッド、デバッグ情報の欠落、といった問題は解決できました。


が、がっ、今回は今回で実に多くの問題を抱えてしまっています、困りました...


1: 前回の simulate_thread() 関数に比べ今回の simulate_pipeline() 関数は複雑さが増してしまい、同じく fan-out、fan-in パターンを実装しているにもかかわらず読解性が落ちてしまっている。

2: キューを実装するため必然であるとはいえ、ClosableQueue クラスや StoppableWorker クラスといったサポートクラスを追加したことでプログラム全体が大きく複雑になってしまった。

3: fan-out、fan-in パターンを実装するためのワーカースレッドの必要数を、前回はグリッドを構成するセルの数だけ作成するようにシステムに任せていたが、あらかじめ作成し使い回しするようにしたことで「予想」して準備しなければいけなくなってしまった。

4: デバッグ情報を伝播するために、ワーカースレッド内で動作している関数 ( game_logic_thread() 関数 ) 内で手動で例外をキャッチし、それをキューに伝播し、メインスレッドで再度例外を投げる処理が必要になってしまった。




これだけでも結構大変ですが、実は更に頭の痛い問題を抱えてしまっているんです。


もし将来的に仕様変更があり、他の関数内 ( 例えば count_neighbors() 等 ) にも I/O 処理を組み込むことが必要になった場合、どうなってしまうでしょうか?


count_neighbors() を並行実行するために、今回 game_logic() を並行実行させるために行った変更とまるっきり同じ変更を加える必要が出てくる、ということです。


count_neighbors() を並行実行するためのスレッドクラスを用意し、パイプラインを構築するために別の ClosableQueue クラスインスタンスを作成し、ワーカースレッドからメインスレッドにちゃんと例外が伝播するようにしなければいけません。


複数のスレッドがグリッドの情報にアクセスしますから、データ競合 ( data race ) を防止するために Lock を使用する必要もあるでしょう。


そして、最終的には simulate_pipeline() 関数を更新し、追加するパイプラインのための機能を追加し、そのパイプラインにおいても fan-out、fan-in パターンが正常に機能するようにする必要があります。


不可能ではありません。やる価値もあるかもしれません。


しかし、あまりに多くの変更を伴い、そのほとんどがボイラープレート ( boilerplate ) 的な実装になるでしょう。


Queue を利用することで fan-out、fan-in パターンを実装することが可能であり、それは Thread クラスを利用するよりも良い選択である可能性はありますが、依然としてオーバーヘッドは非常に大きく、将来的な拡張に向けて不安があることも確かです。


また、現在稼働中のプログラムを移行させるには手を入れなければならない箇所が多い、というのもマイナスポイントでしょうか。


まとめ:

1: あらかじめ作成しておいた固定数のスレッドと Queue インスタンスを組み合わせてパイプラインを構築することで、Thread クラスだけを利用した fan-out、fan-in パターンの実装の多くの短所を補うことが可能です。

2: 既存のプログラムを Queue を利用した fan-out、fan-in パターンに移行するためには多大なリファクタリング ( refactoring ) 作業が必要となります。複数のパイプラインが必要になる状況では事態はさらに悪化します。

3: Queue を利用する場合、プログラムで利用できる並行実行ラインの本数は基本的にあらかじめ「予想」して用意しておいた数に限られてしまいます。Python 組み込みで提供されている他の機能やモジュールを利用する場合と比較するとパフォーマンス的に見劣りしてしまいます。

この投稿をメールでシェアする

0 comments

コメントはまだありません。

コメントを追加する(不適切と思われるコメントは削除する場合があります)