検索ガイド -Search Guide-

単語と単語を空白で区切ることで AND 検索になります。
例: python デコレータ ('python' と 'デコレータ' 両方を含む記事を検索します)
単語の前に '-' を付けることで NOT 検索になります。
例: python -デコレータ ('python' は含むが 'デコレータ' は含まない記事を検索します)
" (ダブルクオート) で語句を囲むことで 完全一致検索になります。
例: "python data" 実装 ('python data' と '実装' 両方を含む記事を検索します。'python data 実装' の検索とは異なります。)
img_for_tre_tron

Tré Thộn を食べたことがありますか?
ベトナム・ビンズオン滞在中の方は是非注文して食べてみて!
絶対に美味しいです!
ホーチミン市内へも配達可能です。お問い合わせください。

Have you ever had "Tré Thộn" before?
If you're living at Bình Dương in Vietnam, you "must" try to order and eat it.
I'm sure you're very surprised how delicious it is!!
If you're in Hồ Chí Minh, you have a chance to get it too. Please call!!
>>
effective_python

【 Effective Python, 2nd Edition 】Queue クラスを利用した producer-consumer パイプライン ( pipelines ) を構築して、マルチスレッドシーケンス処理をエレガントに管理しよう! 並行実行 ( parallelism ) と並列処理 ( concurrency ) もついでにちゃんとイメージしよう! 投稿一覧へ戻る

SUPPORT UKRAINE

- Your indifference to the act of cruelty can thrive rogue nations like Russia -

唐突ですが、放牧飼いをしている乳牛を搾乳する手順を考えます。牛たちはもう搾乳室 ( パーラー ) の前に集まっています、良い子!!


a: 搾乳室の入口を開けて1頭1頭牛を呼び込みます。


1: 乳頭の汚れを落とし、かつ、刺激します ( 泌乳ホルモン分泌のために )。


2: ミルカーを装着して搾乳します。


3: 搾乳が終了したら乳頭にディッピング ( 消毒のようなものです ) をします。


b: 搾乳室の出口を開けて牛を外へ出します。




このパーラーはタンデム式という少し古めの型ですが、1 頭搾り終わったらその牛を出して次の牛を入れて、と 1 頭ずつ処理していくことが出来ます。


さて、パーラー内でのこれら 1 ~ 3 の仕事をこなすプログラムを考えましょう。


1 ~ 3 の作業は 1 頭に関しては順番に行われる必要がありますが、パーラー内の全頭が常に同じ作業の対象になっているわけではありません。


1 頭はミルカーをつけて搾乳中です。


その隣の 1 頭は搾乳が終了した ( ミルカーは自動離脱装置付きです ) のでディッピングをする必要があります。


もう 1 頭はパーラーに入ってきたばかりで乳頭の汚れを落としているところです。


このように プログラム全体を見れば多くの作業が同時に進行 しています。


ですが、今日は奥さんが風邪のため休養で、パーラー内で働いているのは旦那さん一人です。


つまり、複数の作業が進行中ではありますが、ある瞬間に処理している作業 ( 動いているスレッド == 旦那さん ) は 1 つだけ、ということです。


すなわち 並行実行 ( parallelism ) ではなくて並列処理 ( concurrency ) ですね、ややっこしいですけど。


さて、1, 2, 3 それぞれの作業はプログラム内におけるそれぞれ別の関数と考えることが出来ますね。


そしてある意味、流れ作業 ( パイプライン ) における各フェーズ ( phase ) であるともいえます。1 が終わったら 2、2 が終わったら 3、ですね。


また、パーラーに入ってくる牛はプログラムが処理すべき「仕事 ( work / task )」です。


牛は最後の 1 頭が終わるまで、パイプラインに追加され、各フェーズの処理を順番に受けながら搾乳されていきます。


実は、私はもう 1, 2, 3 の処理をするための関数を作ってあるんです、素敵でしょ!


def cleaning(cow):
# プレディッピングをして乳頭の汚れを落とし、かつ、刺激を与えることで泌乳ホルモン; オキシトシンの分泌を促します
return cow


def milking(cow):
# ミルカーを装着して搾乳します
return cow


def dipping(cow):
# ポストディッピングをして乳頭を保護します
return cow



さて、これらの関数でパイプラインを構成し、作業を並列に処理していく ( do the work concurrently ) ためにはどうしたらいいんでしょう?


まず考えるべきことは、フェーズ間でタスクを受け渡していく必要がある、ということです。


これはスレッドセーフな producer-consumer キュー ( thread-safe producer-consumer queue ) で実現できそうです。


ただ、直前のキュー内のタスクが全部終了したことをワーカースレッド ( consumer ) に知らせるために、指標となる値 ( sentinel value ) をキューの一番最後に投入することにします。


from collections import deque


class MyQueue:
LAST_COW = 'あたしが最後でやんす!' # ワーカースレッドに最後の牛であることを知らせるための「標識 ( sentinel value )」です

def __init__(self):
self.cows = deque()

def add_last_one(self): # sentinel value をキューに投入する際に呼び出してください
self.put(MyQueue.LAST_COW)

def put(self, cow):
self.cows.append(cow)

def get(self):
return self.cows.popleft()



搾乳を待っている牛たち ( task ) は deque の最後にドンドン追加されていきます ( MyQueue.put() メソッド )。


パイプラインを構成するフェーズ、例えば第 1 フェースの cleaning() では、deque の最初から牛を呼び込み ( MyQueue.get() メソッド )「乳頭の汚れ落とし and 刺激」処理をします。


続いて、旦那さんの各仕事に当たるパイプラインの各フェーズは、Python のスレッド ( thread ) として実装しましょう。


各スレッドは、直前のフェーズとつながるキューから仕事を get し、自分の仕事が終了したら直後のフェーズへつながるキューにその仕事を put します。


旦那さん、実は奥さんに見張られています。


大好きな牛とデレデレして仕事が遅くならないように、どれだけ直前のキューをチェックしたのか、どれだけの牛を絞り終えたのか、をカウントされているんです (泣)!


from threading import Thread


class Worker(Thread):
def __init__(self, func, in_queue: MyQueue, out_queue: MyQueue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
self.try_to_get_count = 0 # 直前のキューに get しに行った回数
self.work_done_count = 0 # 直後のキューに put した回数 ( このフェーズの処理を終えた回数 )

def run(self):
while True:
self.try_to_get_count += 1
try:
cow = self.in_queue.get()
except IndexError:
time.sleep(0.01) # 順番待ちの牛はいません
else:
cow = self.func(cow)
self.out_queue.put(cow)
self.work_done_count += 1
if cow == MyQueue.LAST_COW: # 最後の牛が終わればこのスレッドは終了です
break



このワーカースレッド ( worker thread ) で適切に対処しなければいけない状況は、直前のフェーズからのキューが空っぽで待っている牛がいない場合です。


この場合、self.in_queue.get() を実行すると IndexError が投げられます。旦那さん、ちょっと一服、パイプラインは停止中です。


これが噂の、consumer における busy waiting、CPU サイクルの無駄遣いってやつです。


さて、準備が整いました。


キューとワーカースレッドを繋ぎ合わせてパイプラインを構築します。


cleaning_queue = MyQueue() # 乳頭の汚れ落とし待ち牛群

milking_queue = MyQueue() # 搾乳待ち牛群

dipping_queue = MyQueue() # ディッピング待ち牛群

done_queue = MyQueue() # 搾乳終了牛群

threads = [
Worker(cleaning, cleaning_queue, milking_queue),
Worker(milking, milking_queue, dipping_queue),
Worker(dipping, dipping_queue, done_queue)
]



さあ、仕事開始です。


搾乳待ちの牛たちが次から次へとパイプライン先頭の cleaning_queue に入ってきます。


ここでは、実際の牛をプログラムに突っ込むわけにはいきませんから、object クラスインスタンスを牛に見立てることにしましょう。


for thread in threads:
thread.start()


for _ in range(1000): # おー、牛が 1000 頭も! 旦那さん、大変です
cleaning_queue.put(object())


cleaning_queue.add_last_one() # おー、最後にもう 1 頭! 全部で 1001 頭です



全ての牛が搾乳を終わって done_queue に収まるのを奥さんはモニタカメラで監視しています。


for thread in threads:
# 全作業終了まで少々お待ちください
thread.join()


milked = len(done_queue.cows)
try_to_get = sum(t.try_to_get_count for t in threads)


print(f"搾乳が終了した牛の数: {milked}, キューに牛を取りに行った回数: {try_to_get}")

# 搾乳が終了した牛の数: 1001, キューに牛を取りに行った回数: 3007



牛 1001 頭の搾乳作業は滞りなく終了したようです。


ただ、ご覧のように、直前のキューに get しに行った回数は本来 [ スレッドの数 ( 3 つ ) * 1001 頭 ] で 3003 回のはずですがちょっとだけ多くなっています。


これは直前のキューに待機している牛がいなくて IndexError が発生した証拠です。


今回は各フェーズの処理時間はほぼ均等ですが ( 3 つの関数とも何もしていませんからね )、本来であれば 2 番目の作業である「ミルカーを装着して搾乳する」が一番時間がかかります。


そうなると 3 番目の作業では常に「牛待ち」の状態、つまり、直前のキューが空の状態が頻発し、get しに行っては IndexError を発生させて待機し、また get しに行って、を繰り返すことになります。


結果的にこのワーカースレッドは有益な処理は何も行わないまま CPU サイクルを無駄にしているだけ、ということです。先程もちょっと言いましたが、busy waiting の状態です。


ただし、この実装における短所はこれだけではありません。


すでに話題にしていますが、各フェーズによる処理時間が異なったらどうなるでしょう?


1 番目の処理は速く、2 番目の処理は時間がかかるとしたら、その間の milking_queue のサイズはひたすら大きくなり続けます。もし牛の数が 1001 頭ではなくもの凄く多かったら、おおー、大変です、最終的にはメモリ不足によりプログラムはクラッシュしてしまうでしょう!


だからといって、パイプラインが悪い、といっているわけでは決してありません。


ちゃんと機能する producer-consumer queue を実装する、ということがいかに難しいか、ということです。


ところが Python、抜かりはありません。


これらの問題を解決してくれる機能を備えた queue 組み込みモジュールの Queue クラスを提供してくれているんです。


Queue クラスを利用すると、フェーズの直前のキューが空の場合は新たなタスクが追加されるまで get メソッドの実行をブロックしてくれるので、いちいちループを組んでチェックを繰り返す必要がなくなります。


つまり先程の例で in_queue.get() で IndexError が投げられたらちょっと間をおいて再び覗きに行く、という実装をする必要がない、ということですね。


in_queue.get() を呼び出した時点でキューが空の場合、タスクがキューに put され get がそれを受け取って値を返さない限りこのスレッドは get の場所で待機するようになります。


また、処理の遅いフェーズがパイプライン全体を滞らせキューにタスクがたまりすぎてメモリを圧迫、最悪の場合はプログラムがクラッシュ、という大問題に対しては、キューに溜めて置くことができるタスクの最大数を指定可能、という機能で対処しています。


キュー内の待機タスクの数がこの規定数に達すると put の実行を禁止しちゃうんです。


今回は各フェーズの処理時間に差がないため実装しませんが、興味のある方は試してみてください。


また、Queue クラスでは仕事の進捗度合いを追跡するための task_done() メソッドが提供されています。


consumer において get したタスクを処理し終えた時点で task_done を呼び出すことで、put されたタスク数と完了したタスク数をキューが管理してくれるようになります。


たとえ in_queue が途中で空になったとしても、そこでキューが終了してしまうことはありません。


キューが終了するのは、あくまでも、キューに投入したタスクの数と task_done() によって通知された数が一致した時点です。


ですから task_done() は join() と抱き合わせで使用する、ということですね。


さて、ここまでの勉強を踏まえて、Queue クラスを利用して実装し直してみましょう!


from queue import Queue
from threading import Thread
import time


class MyQueue(Queue):
LAST_COW = 'あたしが最後でやんす!'

def add_last_one(self):
self.put(MyQueue.LAST_COW)


class MyWorker(Thread):
def __init__(self, name, func, in_q: MyQueue, out_q: MyQueue):
super().__init__(name=name)
self.func = func
self.in_queue = in_q
self.out_queue = out_q

def run(self):
while True:
cow = self.in_queue.get()

self.in_queue.task_done()

cow = self.func(cow)

self.out_queue.put(cow)

if cow == MyQueue.LAST_COW:
break


cleaning_queue = MyQueue()
milking_queue = MyQueue()
dipping_queue = MyQueue()
done_queue = MyQueue()


threads = [
MyWorker('Cleaning', cleaning, cleaning_queue, milking_queue),
MyWorker('Milking', milking, milking_queue, dipping_queue),
MyWorker('Dipping', dipping, dipping_queue, done_queue)
]


for thread in threads:
thread.start()


for i in range(1000):
if i % 100 == 0:
time.sleep(0.2) # ちょっと意地悪をして投入途中でお休みをしてみましょう
cleaning_queue.put(object())

cleaning_queue.add_last_one()

cleaning_queue.join()

milking_queue.join()

dipping_queue.join()


print(f"搾乳が終了した牛の数: {done_queue.qsize()}")

# 搾乳が終了した牛の数: 1001


print(f"Cleaning キューは空ですか? {cleaning_queue.empty()}")
print(f"Milking キューは空ですか? {milking_queue.empty()}")
print(f"Dipping キューは空ですか? {dipping_queue.empty()}")
print(f"Done キューは空ですか? {done_queue.empty()}")

# Cleaning キューは空ですか? True
# Milking キューは空ですか? True
# Dipping キューは空ですか? True
# Done キューは空ですか? False


for thread in threads:
print(f"{thread.name} スレッドは生存していますか? {thread.is_alive()}")

# Cleaning スレッドは生存していますか? False
# Milking スレッドは生存していますか? False
# Dipping スレッドは生存していますか? False



もしデータをファイル、データベース、インターネット経由で取得しながら一連の処理をこなす必要がある場合、ブロッキング I/O ( blocking I/O ) 対策としてマルチスレッドを利用して時間短縮をはかり、内部処理では producer-consumer pipeline を組んで整然とデータ処理をしていく、というシステムを組むために必要な知識であることは間違いないと思います。


concurrency ( 並列処理 )、parallelism ( 並行実行 ) と聞くとどうしても腰が及びがちですが、お互いがんばって勉強しましょう!


まとめ:

1: パイプライン ( pipelines ) はシーケンス処理を管理するための非常に有用な実装方法です。特に I/O 処理が絡んでいるときは尚更です。

2: 並列処理を行うパイプラインを実装する際には様々な問題を注意深く考慮する必要があります。busy waiting、ワーカースレッドを終了させるタイミングと方法、各フェースの処理時間が大きく異なる場合のメモリークラッシュの回避方法、などです。

3: Python の Queue クラスは、賢固なパイプラインを構築するために必要な機能を全て提供してくれています。busy waiting を回避し、バッファーサイズの指定によりメモリー不足を予防し、task_done() と join() の連携により進捗度合いを管理してくれます。

この記事に興味のある方は次の記事にも関心を持っているようです...
- People who read this article may also be interested in following articles ... -
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 5 回 - 並列処理 ( concurrency ) のためにスレッド ( thread ) を利用する場合は concurrent.futures モジュールの ThreadPoolExecutor の導入を検討しましょう、の巻
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 4 回 - 並列処理 ( concurrency ) 実現のために queue を利用するとリファクタリング ( refactoring ) 作業が大変です、の巻
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 3 回 - Thread インスタンスの頻繁な start / join による fan-out / fan-in パターン実装は避けるべし、の巻
【 Effective Python, 2nd Edition 】threading モジュールの Lock クラスを利用してマルチスレッド実行時のデータ競合 ( data races ) を予防しよう! GIL はデータ構造 ( data structure ) の安全性まで面倒を見てくれません
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 6 回 - コルーチン ( coroutines ) を利用して数多くのブロッキング I/O を並列処理する fan-out、fan-in パターンを実現しよう、の巻
【 Effective Python, 2nd Edition 】Python のスレッド ( thread ) はブロッキング I/O ( blocking I/O ) 対策で存在しています。決して並行処理 ( parallelism ) を実現するためではありません!
【 Effective Python, 2nd Edition + coding challenge 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第2回 - Conway's Game of Life coding challenge の実装例と課題、の巻