検索ガイド -Search Guide-

単語と単語を空白で区切ることで AND 検索になります。
例: python デコレータ ('python' と 'デコレータ' 両方を含む記事を検索します)
単語の前に '-' を付けることで NOT 検索になります。
例: python -デコレータ ('python' は含むが 'デコレータ' は含まない記事を検索します)
" (ダブルクオート) で語句を囲むことで 完全一致検索になります。
例: "python data" 実装 ('python data' と '実装' 両方を含む記事を検索します。'python data 実装' の検索とは異なります。)
img_for_tre_tron

Tré Thộn を食べたことがありますか?
ベトナム・ビンズオン滞在中の方は是非注文して食べてみて!
絶対に美味しいです!
ホーチミン市内へも配達可能です。お問い合わせください。

Have you ever had "Tré Thộn" before?
If you're living at Bình Dương in Vietnam, you "must" try to order and eat it.
I'm sure you're very surprised how delicious it is!!
If you're in Hồ Chí Minh, you have a chance to get it too. Please call!!
>>
effective_python

【 Effective Python, 2nd Edition 】スレッド ( thread ) とコルーチン ( coroutine ) を混在させながら、asyncio を利用した非同期プログラムへ段階的に移行させよう! 投稿一覧へ戻る

Published 2020年10月1日21:36 by mootaro23

SUPPORT UKRAINE

- Your indifference to the act of cruelty can thrive rogue nations like Russia -

前回の記事 では、ブロッキング I/O ( blocking I/O ) とスレッド ( thread ) を利用して作成した TCP サーバー / クライアントプログラムを、コルーチン ( coroutine ) と asyncio を利用した非同期プログラム ( asynchronous program ) へとリファクタリング ( refactoring ) しました。


しかし既存の巨大なプログラム全体を一気にリファクタリングする、というのは現実的な話ではありませんよね。


コールスタックの順番を追いながら一段階一段階移行を進めていく、ということになると思います。


ということは、その移行期間において、ワーカースレッドでブロッキング I/O を処理している関数と、移行中のコルーチンが 1 つのプログラムの中で共存する必要がある、ということです。


つまり、コルーチンの中で「既存」のブロッキング I/O 処理関数をワーカースレッドに分散実行させたり、非同期 I/O 処理を司るコルーチンを「従来の」ワーカースレッド内で実行し終了を待つ、ということが求められることになります。


しかし asyncio にはこういった「共存関係」をうまく処理するための組み込み機能が用意されています。


今回は、複数のファイルの内容を 1 つのファイルにまとめる ( 複数のログファイルを 1 つにまとめて処理したい、等の場面を想定しています ) 際に、スレッドを利用してブロッキング I/O 処理を fan-out、fan-in パターンで実装しているプログラムを、asyncio とコルーチンを利用するプログラムへと移行していく経過を追いながら、asyncio における「共存関係」機能をみていきたいと思います。


この「既存」プログラムでは、1 つの読み込み元ファイル処理につき 1 つのワーカースレッドを立ち上げて処理します。


ワーカースレッドで実行されるブロッキング I/O 処理関数では、ファイルから 1 行ずつ読み込むたびに「まとめ」ファイルへ書き込みます。


お察しのように「まとめ」ファイルへの書き込みは複数のスレッドが実行しますから、Lock を利用してデータ競合 ( data race ) やデータ構造破壊 ( data structure corruption ) を防止しなければいけません。


from pathlib import Path
from threading import Lock, Thread
from typing import List, Tuple


def tail_file(in_path: Path, write_func):
"""
読み込み書き込み関数: ワーカースレッドで動作します
"""

with in_path.open('rb') as f:
for line in f:
write_func(line)


def run_threads(in_paths: List[Path], out_path: Path):
"""
複数のファイルの内容を1つのファイルにまとめます
読み込み元のファイル1つごとにスレッドを立ち上げ fan-out し、書き込み終了を待って fan-in します
"""

with out_path.open('wb') as output:
lock = Lock()

def write(data):
"""
書き込み関数
別スレッドで実行される tail_file() 内で呼び出されます
書き込み先ファイルは複数のスレッドからアクセスされるので Lock を利用してデータ競合 ( data race ) を防止します
"""

with lock:
output.write(data)

threads = []
for i_path in in_paths:
args = (i_path, write)
thread = Thread(target=tail_file, args=args) # fan-out
thread.start()
threads.append(thread)

for thread in threads:
thread.join() # fan-in



この先 3 つの関数は本題には直接関係ありません。


今回のプログラムを検証するためのデータである、読み込み先となる複数のファイルと、書き込み先ファイルを一時ディレクトリ内に用意するためのものです。


from collections import defaultdict
import random
import string
from tempfile import TemporaryDirectory


def write_random_data(out_path: Path, write_count: int):
with out_path.open('w', encoding='utf-8') as f:
for i in range(write_count):
letters = random.choices(string.ascii_lowercase, k=10)
data = f"{str(out_path)}-{i:02}-{''.join(letters)}\n"
f.write(data)
f.flush()


def start_write_threads(directory: str, file_count: int) -> List[Path]:
paths = []
threads = []
for i in range(file_count):
path = Path(directory) / f"{i}.txt"

paths.append(path)
args = (path, 10)
thread = Thread(target=write_random_data, args=args)
thread.start()
threads.append(thread)

for thread in threads:
thread.join()

return paths


def setup() -> Tuple[TemporaryDirectory, List[Path], Path]:
temp_dir = TemporaryDirectory()
in_paths = start_write_threads(temp_dir.name, 5)

out_path = Path(temp_dir.name) / 'merges.txt'
return temp_dir, in_paths, out_path



confirm_merge() も今回の目的とは直接関係ありません。


プログラムが期待通りに動作しているか否かをチェックしているだけです。


def confirm_merge(in_paths: List[Path], out_path: Path):
# out_path ファイルの内容を in_paths に含まれるファイルごとにグループ化する
found = defaultdict(list)
with out_path.open('rb') as f:
for line in f:
for path in in_paths:
if line.find(str(path).encode()) == 0:
found[str(path)].append(line)

# in_paths に含まれるファイルの内容を、ファイルごとにリストにして保存する
expected = defaultdict(list)
for path in in_paths:
with path.open('rb') as f:
expected[str(path)].extend(f.readlines())

# 元のファイルと、全てのファイルを統合したものを各ファイルごとにグループ化した内容を比較する
for key, expected_lines in expected.items():
found_lines = found[key]
if expected_lines == found_lines:
print(f"{key}: 'Matched!'")
else:
print(f"{key}: Not matched.")


temp_dir, in_paths, out_path = setup()

run_threads(in_paths, out_path)

confirm_merge(in_paths, out_path)

temp_dir.cleanup()

# C:\Users\...\Temp\tmpleiinx65\0.txt: 'Matched!'
# C:\Users\...\Temp\tmpleiinx65\1.txt: 'Matched!'
# C:\Users\...\Temp\tmpleiinx65\2.txt: 'Matched!'
# C:\Users\...\Temp\tmpleiinx65\3.txt: 'Matched!'
# C:\Users\...\Temp\tmpleiinx65\4.txt: 'Matched!'



さて、「移行」の対象となる run_threads() と tail_file() の 2 つの関数を、どのようにして「徐々」に asyncio とコルーチンを利用したコードに書き換えていけばよいのでしょうか?


アプローチ方法は 2 つあります、トップダウン ( top-down ) とボトムアップ ( bottom-up ) です。


a) トップダウンアプローチ


main() のようなプログラムの入り口 ( entry point ) から始めて、徐々に個別の関数やクラスを変更していく方法です。


プログラム内の複数の箇所で、もしくは、異なる多くのプログラムで、多くの共通部品 ( 関数やクラス ) を利用している状況で有効な方法です。


1: def キーワードの代わりに async def キーワードを利用して entry point 関数をコルーチンに変更します。


2: イベントループをブロックする可能性のある処理 ( I/O 処理 ) を行っている呼び出しは、asyncio.run_in_executor() でラップし別スレッドで実行するように変更します。


3: asyncio.run_in_executor() でラップした呼び出し先で利用されるリソースやコールバックで衝突が生じないように、Lock や asyncio.run_coroutine_threadsafe() を適切に利用します。


4: 1 から 3 の変更を、エントリポイントから徐々に個別の関数、クラスへ向けて進めていきます。




ここではまず、今回のプログラムのエントリポイントである run_threads() に 1 - 3 の変更を適用しましょう。


import asyncio


async def run_tasks_mixed(in_paths: List[Path], out_path: Path):
"""
複数のファイルの内容を1つのファイルにまとめます
読み込み元のファイル1つずつの処理タスクを executor に分散させることで fan-out し、全てのタスク終了を待って fan-in します
"""

loop = asyncio.get_running_loop()

with out_path.open('wb') as output:

async def write_async(data):
"""
書き込みコルーチン
イベントループが動作しているスレッドとは異なるスレッドから asyncio.run_coroutine_threadsave() を利用してスケジューリングされます
この処理はスレッドセーフなため Lock を利用してデータ競合を防止する必要はありません
"""

output.write(data)

def write(data):
"""
書き込み関数
別スレッドで実行される tail_file() 内で呼び出されます
実体は、write_async() コルーチンをメインスレッドのイベントループに送り実行をスケジューリングします
"""

coro = write_async(data)
future = asyncio.run_coroutine_threadsafe(coro, loop)
future.result()

tasks = []
for i_path in in_paths:
task = loop.run_in_executor(None, tail_file, i_path, write) # fan-out
tasks.append(task)

await asyncio.gather(*tasks) # fan-in


temp_dir, in_paths, out_path = setup()

asyncio.run(run_tasks_mixed(in_paths, out_path))

confirm_merge(in_paths, out_path)

temp_dir.cleanup()

# C:\Users\...\Temp\tmpvabog4a7\0.txt: 'Matched!'
# C:\Users\...\Temp\tmpvabog4a7\1.txt: 'Matched!'
# C:\Users\...\Temp\tmpvabog4a7\2.txt: 'Matched!'
# C:\Users\...\Temp\tmpvabog4a7\3.txt: 'Matched!'
# C:\Users\...\Temp\tmpvabog4a7\4.txt: 'Matched!'



run_in_executor() は、渡された関数 ( ここでは tail_file() ) が指定された ThreadPoolExecutor で実行されるようにイベントループに通知します。


この例では、使用すべき ThreadPoolExecutor として None を指定していますから、デフォルトの executor インスタンスが使用されますね。


また、tail_file() がコルーチンであれば create_task() する場面ですが、まだそこまで「移行」は進んでいませんから、
run_in_executor() による fan-out、gather() による fan-in で構成される fan-out / fan-in パターンは健在です。


そして、この移行中間段階での注目は、ワーカースレッドで実行される関数 ( tail_file() ) でコルーチン ( write_async() ) を呼び出して、メインスレッドで動作しているイベントループで実行させていることです。


このことによって、複数のワーカースレッドで行われる 1 つのファイルに対する書き込みはイベントループだけによって処理されますから、Lock を利用したデータ競合防止策は必要なくなります。


ここまで述べた、ワーカースレッド内でコルーチンを呼び出し指定されたイベントループで実行されるようにスケジュールする、という移行時に必要な「共存関係」を実現するために Python で用意されているのが asyncio.run_coroutine_threadsafe() メソッド なんです。


最終的に asyncio.gather() の await が解除されれば、それはすなわち、複数のワーカースレッドに分散した 1 つのファイルへの書き込み処理が終了したことを意味しますから、with ブロック終了に伴って書き込みファイルがクローズされても何ら問題はない、ということになります。


この移行作業によって run_tasks_mixed() はコルーチンに変更されましたから、実行するには asyncio.run() を利用する必要があります。


さて、ここまでがトップダウンアプローチの 1 - 3 段階です。


もし tail_file() も従来の run_threads() と同様何らかのブロッキング I/O 処理をワーカースレッドに分散させているのだとしたら、tail_file() にも 1 - 3 ステップを適用します。


この場合、run_tasks_mixed() 内では、コルーチンに書き換えた tail_file() を asyncio.create_task() を利用してスケジューリングするだけになりますから、get_runnnig_loop() も run_in_executor() も必要なくなり、コルーチンに変更した tail_file() 内に記述されることになって行きます。


このようにして次々とコールスタックに沿って「移行」を進めていくことになるんです。


まぁ、今回はたったの 2 段階だけですから tail_file() の移行作業が最終作業になりますけど。


ということで、ステップ 4 に進みましょう。


この「移行最終段階」では、現在ワーカースレッドで実行している tail_file() をコルーチンに変更します。


async def tail_async(in_path: Path, write_func):
with in_path.open('rb') as f:
for line in f:
await write_func(line)



この変更に伴って、「移行中間段階」で必要だった run_tasks_mixed() 内の get_running_loop() と run_in_executor() の呼び出しは不要になりますから、run_tasks() はとってもスッキリします。


中でも最大の変更点は、ワーカースレッドで動作していた tail_file() 内でブロッキング I/O 処理コルーチン ( write_async() ) をイベントループで実行するために run_coroutine_threadsafe() を実装していた「中継関数」である write() が一切必要なくなることです。


tail_file() がコルーチンであれば、その中で別のコルーチンを await できますからね。


async def run_tasks(in_paths: List[Path], out_path: Path):
with out_path.open('wb') as output:

async def write_async(data):
"""
書き込みコルーチン
"""

output.write(data)

tasks = []
for i_path in in_paths:
coro = tail_async(i_path, write_async)
task = asyncio.create_task(coro)
tasks.append(task)

await asyncio.gather(*tasks)


temp_dir, in_paths, out_path = setup()

asyncio.run(run_tasks(in_paths, out_path))

confirm_merge(in_paths, out_path)

temp_dir.cleanup()

# C:\Users\...\Temp\tmp78cy08z7\0.txt: 'Matched!'
# C:\Users\...\Temp\tmp78cy08z7\1.txt: 'Matched!'
# C:\Users\...\Temp\tmp78cy08z7\2.txt: 'Matched!'
# C:\Users\...\Temp\tmp78cy08z7\3.txt: 'Matched!'
# C:\Users\...\Temp\tmp78cy08z7\4.txt: 'Matched!'



ここまでが top-down approach によるリファクタリングの流れです。


ここからはもう1つの bottom-up approach を見ていきましょう。


b) ボトムアップアプローチ


ご想像通り、プログラムの「末端」の関数やクラスからエントリポイントへ向けて非同期バージョンへリファクタリングしていきます。


1: 「移行」させたい「末端」関数のコルーチンバージョンを作成します (この例では tail_async() )。


2: 「移行」前の関数 ( tail_file() ) でコルーチンバージョンをラップし、イベントループで実行させるように実装し直します。


3: コールスタックをエントリポイント方向へ 1 段階進めるたびに、その段階の関数のコルーチンバージョン ( run_tasks() ) を作成し、その中で前段階の同期バージョン ( 元の tail_file() ) 呼び出し箇所をコルーチンバージョン ( 1: で作成した tail_async() ) 呼び出しへ変更します。


4: 不要になった前段階のラッパーバージョン ( 2: で作成した tail_file() ) を削除します。




def tail_file(in_path: Path, write_func):
"""
読み込み書き込み関数: ワーカースレッドで動作します
コルーチンバージョン ( tail_async() ) をイベントループで実行させるための「ラップ関数」です
"""

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

async def write_async(data):
write_func(data)

coro = tail_async(in_path, write_async)
loop.run_until_complete(coro)


temp_dir, in_paths, out_path = setup()

run_threads(in_paths, out_path)

confirm_merge(in_paths, out_path)

temp_dir.cleanup()

# C:\Users\...\Temp\tmpgc66u82d\0.txt: 'Matched!'
# C:\Users\...\Temp\tmpgc66u82d\1.txt: 'Matched!'
# C:\Users\...\Temp\tmpgc66u82d\2.txt: 'Matched!'
# C:\Users\...\Temp\tmpgc66u82d\3.txt: 'Matched!'
# C:\Users\...\Temp\tmpgc66u82d\4.txt: 'Matched!'



続けてコールスタックの次の段階である run_threads() のコルーチンバージョン ( run_tasks() ) を作成し、これを run_threads() でラップして動作させ、前の段階のラップ関数 (中継関数) である tail_file() は不要になりますから削除して ...


という経過を経て、トップダウンアプローチ、ボトムアップアプローチいずれの移行形態をとっても最終的には同じ実装に落ち着く、メデタシ、ということに相成ります。


まとめ:


1: トップダウンアプローチは、プログラムのエントリポイントからコルーチンに書き換えていく中で、いかに「既存」の同期コードを実行するか、に焦点が当たります。

具体的には、ブロッキング I/O 処理に関わっている「既存」関数を、run_in_executor() を利用して awaitable なタスクとして実行するようにします。

そして、別スレッドで実行されるその「既存」関数内の実際のブロッキング I/O 処理は、既存のまま Lock を利用してデータ競合を防止するか、イベントループが一手に引き受けて処理するように asyncio.run_coroutine_threadsafe() を利用してスケジューリングします。


2: ボトムアップアプローチは、プログラムの「末端」関数をコルーチンに書き換え、それをいかに「既存」の同期コードの中で実行するか、に焦点が当たっています。

具体的には、ブロッキング I/O 処理に関わり別スレッドで実行されている「既存」関数のコルーチンバージョンを作成し、それを「既存」の関数でラップして実行するようにします。

その際、「既存」関数が実行されているワーカースレッドに紐付いたイベントループを asyncio.new_event_loop() で取得、asyncio.set_event_loop() で設定すると共に、非同期実行バージョンに書き換えたコルーチンを run_until_complete() でスケジューリング、完了を待つようにします。

この記事に興味のある方は次の記事にも関心を持っているようです...
- People who read this article may also be interested in following articles ... -
【 Effective Python, 2nd Edition 】ブロッキング I/O ( blocking I/O ) とスレッド ( thread ) を利用しているプログラムを、asyncio 組み込みモジュールを利用してコルーチン ( coroutine ) と非同期 I/O ( asyncronous I/O ) を利用したプログラムにリファクタリング ( refactoring ) しよう!
【 Using Asyncio in Python 】Python における asyncio を利用した非同期プログラム ( asynchronous programming ) の勉強を継続する前に、今一度スレッド ( thread ) についてのちょっとした復習ノート、まとめてみた
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 6 回 - コルーチン ( coroutines ) を利用して数多くのブロッキング I/O を並列処理する fan-out、fan-in パターンを実現しよう、の巻
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 5 回 - 並列処理 ( concurrency ) のためにスレッド ( thread ) を利用する場合は concurrent.futures モジュールの ThreadPoolExecutor の導入を検討しましょう、の巻
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 4 回 - 並列処理 ( concurrency ) 実現のために queue を利用するとリファクタリング ( refactoring ) 作業が大変です、の巻
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 3 回 - Thread インスタンスの頻繁な start / join による fan-out / fan-in パターン実装は避けるべし、の巻
【 Effective Python, 2nd Edition + coding challenge 】プログラム開発のどの段階で並列処理 ( concurrency ) が必要になるのだろう? そのときどのようにリファクタリング ( refactoring ) していけばいいのだろう? を考えてみるシリーズ ( のはず ) 第1回