cover_using_asyncio_in_python

【 Using Asyncio in Python 】Python における asyncio を利用した非同期プログラム ( asynchronous programming ) の勉強を継続する前に、今一度スレッド ( thread ) についてのちょっとした復習ノート、まとめてみた 投稿一覧へ戻る

Tags: python , thread , asyncio , asynchronous programming

Published 2020年10月11日19:42 by T.Tsuyoshi

スレッドは OS によって提供されている機能で、プログラムの一部を並行実行するようにするものです。


OS は、マシン上で同時に動作している他のプログラムも含めて、複数のスレッドにどのように CPU リソースを割り当てるか、を決定します。


「スレッドを使うべきではない」と書いている asyncio 関連の本も沢山あるようですが、物事はそれほど単純ではありません。


長所、短所を見極める必要があるのは当然のことです。


Asyncio はスレッドの代替選択肢として採用されました。ですから、ある程度の比較無しには asyncio 本来の価値を見極めることは困難です。


また、asyncio を利用するときでさえ、threads や processes を考慮しなけばいけない状況は多々あります。


ですから、まず threading について少し考えておきましょう。


Warning


ここでの話は並列処理を行うネットワークプログラミングに限定されるものです。


プリエンプティブなマルチスレッド実行 ( preemptive multithreading ) はまた別の話ですし、長所短所の比較項目もまったく異なるものです。


=== Benefits of Threading ===


1: Ease of reading code


コードは並列処理されるようになりますが、従来の関数の記述と変わらず、非常に簡潔にトップダウンに一直線に記述することが可能です。


2: Parallelism with shared memory


複数のスレッド間でメモリを共有しながら複数の CPU を利用可能です。


異なるプロセスが管理する別々のメモリ間での巨大なデータの受け渡しは非常にコストが高いため、これは非常に大切なことです。


3: Know-how and existing code


スレッドを利用したアプリケーション作成に関するベストプラクティス等の情報が非常に多く出回っています。


並列処理のためのマルチスレッド実行を前提にしたいわゆる blocking コードの量も膨大です。



Python に関する限り、「並行実行 ( parallelism )」には常に ? が付きまといます。


それは、python がインタプリタ自身の内部状態の一貫性を保持するために GIL ( Global Interpreter Lock ) と呼ばれるグローバルロック機構を採用しているからです。


GIL によって、複数のスレッド間による破滅的なデータ競合 ( data race ) を未然に防止しているんですね。


ただ、この GIL の副作用として、プログラム内で動作する全てのスレッドが 1 つの CPU に紐付けされてしまっています。


つまり、並行実行によるパフォーマンス向上は期待できない、ということです ( Cython や Numba といったツールを利用してこの制約を回避することも出来ますが... )。


1: に関して言えば、Python におけるスレッド利用は本当に簡単です。


もし今までにデータ競合によるバグで頭を悩ませたことがないのであれば、並列処理実現のための非常に魅力的なツールに映ると思います。


また、たとえスレッド利用に伴うデータ競合やデータ破損の経験があったとしても、簡潔で安全なコードを記述する方法を苦労しながらも取得さえすれば、十分に魅力的なツールであることに変わりはないと思います。


一般的な話として、スレッドを利用する際の最善の方法は、concurrent.futures 組み込みモジュールの ThreadPoolExecutor クラスを使用し、submit() に必要な全てのデータを渡してしまうことでしょう。


from concurrent.futures import ThreadPoolExecutor


def worker(data):
...


with ThreadPoolExecutor(max_workers=10) as executor:
args = 0, 1, 2, 3
future = executor.submit(worker, *args)



ThreadPoolExecutor は関数をスレッド実行するための極度に単純化されたインターフェースを提供してくれています。


そして何よりもご機嫌なのは、スレッドプール ( pool of threads ) の代わりにプロセスプール ( pool of processes ) の利用を検討した場合、ただ単純に ThreadPoolExecutor を ProcessPoolExecutor に置き換えるだけで済んでしまう、ということでしょう。


この 2 つのクラスの API は共通しています。つまり、既存のコードはほぼ影響を受けない、ということです。


また、executor API は asyncio でも使用されていますからね。


通常、スレッドで実行するタスクはそれほど大きなものではないのではないでしょうか?


その場合、プログラムを終了する際には、ThreadPoolExecutor.shutdown(wait=True) を呼び出して、あとは executor がタスクを完了するのをちょっとの間待つだけです。


一番重要なことは、スレッドで実行するコードからできる限りグローバル変数にアクセスしたりデータを書き込んだりしないようにする ことです。


=== Drowbacks of Threading ===


1: Threading is difficult

スレッドで発生したバグや、スレッドを利用しているプログラムで発生したデータ競合 ( data race ) は修正が最も難しいバグの 1 種です。経験によって、これらの問題が起こりづらいソフトウェアをデザインすることは可能ですが、nontrivial で naively designed software で発生したバグは、専門家であっても修正が不可能に近いものです。


2: Threads are resource intensive

スレッドは作成時点で、例えば事前に割り当てられるべきスタックスペースとしてかなりのプロセス仮想メモリ領域を消費します。これは 1 つ 1 つのスレッドごとに必要なものです。ただしこの問題は、64-bit OS の普及と、最近の OS が実際に必要となるまでスタックスペース分も含めたリソースを割り当てなくなってきていることから、以前ほどの甚大な影響は及ぼさなくなってきています。


ただし、そうは言っても 1 つのスレッドに付き必要となるスタックスペースは 8 MB ですからかなりの大きさであることは間違いありません。


何もしない 10,000 のスレッドを作成するスニペット (注: 深く考えずに実行しないでください):


import os
from time import sleep
from threading import Thread


threads = [Thread(target=lambda: sleep(5)) for i in range(10000)]

[t.start() for t in threads]

print(f"PID = {os.getpid()}")

[t.join() for t in threads]



もし上の例のように 10,000 のスレッドを作成、実行するには 80 GB もの仮想メモリが必要となります。


32-bit OS 上でこのプログラムを実行するためには、threading.stack_size([size]) メソッドでスタックスペースを制限する必要がありますが、スタックスペースを制限する、ということは、イコール、関数の呼び出しの深さに制限が加わる、ということですから、スレッドで動作させる target 関数が再帰的な動作をしている場合などにはプログラムがクラッシュする恐れが生じてしまいます。


一方シングルスレッドで動作するコルーチン ( coroutines ) にはこのような問題は一切関係なく、I/O 並列処理時のはるかに有望な代替手段です。


3: Threading can affect throughput


スレッドを 5,000 以上使用するような高レベルな並列処理 (それだけ多くのスレッドを作成できるように OS をちゃんと設定することができるのであれば...) においては、コンテキストスイッチ ( context switch ) のコスト増大によりスループット ( throughput ) に悪影響を与える恐れがあります。


4: Threading is inflexible


OS は、どのスレッドが仕事をしていてどのスレッドが待機しているのかに関わらず、全てのスレッドに CPU の実行時間を均等に割り当てようとします。


例えば、あるスレッドがソケットからのデータを待っていても、OS スケジューラはそのスレッドに CPU 時間を割り当て、そのスレッドがデータを受け取り処理を開始してもう少しで処理を終了する、というところで実行権を取り上げて他のスレッドに CPU を割り当てたりしちゃうんですね。


一方 async 世界では、ソケット待ちをしているコルーチンが CPU 割り当てが必要かどうかを select() システムコールを利用してチェックしています。


もしまだ待機中であれば CPU を割り当てるようなことはしません。


これによって無駄なスイッチングコストを省いているんです。



ここまで列挙したスレッドの短所は最近になってワイワイ言われ始めたものでも何でもありませんし、プログラミングモデルとしてのスレッド利用に伴う問題はあるプラットフォームに限られたものでもありません。


Microsoft Visual C++ のドキュメントにおけるスレッドに関する記述はこんな感じです:


「 Windows API において並列処理メカニズムの中核を担っているのはスレッドですし、CreateThread() 関数を利用すれば、スレッドを簡単に作成、利用することが出来ます。ただし OS の観点からすると、スレッドの使用には多くの時間とリソースが必要となります。加えて、同じ優先度を有するスレッド同士には同一の実行時間が割り当てられることが保証されていますが、その切り替えに要するオーバーヘッドを相殺するにはかなり大き目のタスクを実行する必要があります。もし小さめのタスクだけであれば、それらのタスクを並行実行する利点は、それらを並列処理するために伴うオーバーヘッドにかき消されてしまうでしょう。」


それでも、いやいや、これは Windows だからでしょう、Unix システムだったら話は変わるでしょう、とおっしゃるあなた、Mac の開発者ライブラリの「スレッドを利用した開発ガイド ( Threading Programming Guide )」の内容は次の通りです:


「スレッド実行は、メモリ消費とパフォーマンスの面であなたのプログラムだけでなくシステム全体に対してもかなりの負担となります。スレッドは 1 つ 1 つにつきカーネルメモリー ( kernel memory ) とプログラムのメモリー双方の領域を要求してきます。スレッド管理とスレッド間実行のスケジューリングというスレッド実行における胆の部分はカーネル領域に、スレッドのスタックスペースとデータはプログラム領域にそれぞれ保存されます。こういった構成のほとんどは、スレッドを初めて作成する時点で作成、初期化されますが、カーネルとのやり取りが生じるためこのプロセスのコストはかなり高いと言わざる負えません。」


そして、「並列処理プログラミングガイド ( Concurrency Programming Guide )」には次のように述べられています


「今まで紹介されてきた並列処理メカニズムには、多かれ少なかれ追加のスレッド作成が含まれています。残念ながら、スレッドを利用した実装は大変な労力を必要とします。スレッドは低級ツールであり手作業による管理が必須ですから。また、あるアプリケーションで必要とされるスレッドの最適数は、実行されるマシン環境やシステムによって動的に変化するものであることを考えると、スレッドを利用して問題解決のための最適な実装を実現することは、不可能ではないにしても、非常に困難であることに間違いはないでしょう。加えて、スレッドと同時に利用される(主に)同期メカニズムは複雑になりがちで、パフォーマンス的な向上が得られる保証は何らない反面、実装デザイン面でのリスクは高くなりがちです。」


つまりこれらの記事が繰り返し述べているのは:


・スレッドを利用するべき正当な理由付けをするのは難しい

・スレッドを利用して大規模な並列処理モデルを構築するのは効率が悪過ぎる



ということになると思います。


=== Case Study: Robots and Cutlery ===


ここからは、スレッドの利用が安全ではない、と何故考えられているのかを、よく取り上げられるコード例を追いながら見ていきましょう。


以下の例は、ロボットが給仕をするレストランにおけるカトラリー管理プログラムです。


import threading
from queue import Queue


class Cutlery:
"""
kitchen にある全てのカトラリー、もしくは、各ロボットが受け持ちのテーブルで管理しているカトラリーを象徴するクラス
ここではナイフとフォークだけです
"""

def __init__(self, knives=0, forks=0):
self.knives = knives
self.forks = forks

def __str__(self):
return f"Knives: {self.knives}, Forks: {self.forks}"

def give(self, to: 'Cutlery', knives=0, forks=0):
"""
カトラリーの受け渡しをする
主にロボットが kitchen から受け取り、もしくは、kitchen へ返す際に呼び出す

:param to: カトラリーを渡す先の Cutlery クラス。自らの在庫は減少し、渡し先の在庫は増加する
:param knives: 渡すナイフの本数
:param forks: 渡すフォークの本数
"""

self.change(-knives, -forks)
to.change(knives, forks)

def change(self, knives, forks):
"""
在庫の増減を操作する
"""

self.knives += knives
self.forks += forks



このレストランの配膳ロボットは、キッチンからカトラリーを受け取り各テーブルへ配膳し、各テーブルを片付けて回収したカトラリーをキッチンへ返す、という仕事を行います。


プログラムでは配膳ロボット 1 台につきスレッドを 1 つ割り当て並列処理を行います。


class ThreadBot(threading.Thread):
"""
給仕 (wait tables) 係りロボットを象徴するクラス
カトラリー (cutlery) を配膳する (キッチンから、または、キッチンへ)
"""

def __init__(self):
super().__init__(target=self.manage_table) # このスレッド ( ThreadBot ) のタスクはテーブル管理 ( self.manage_table() )
self.cutlery = Cutlery(knives=0, forks=0)
self.tasks = Queue() # このロボットに割り当てるタスク

def manage_table(self):
while True:
task = self.tasks.get()
if task == 'prepare table': # キッチンからカトラリーを受け取り自分が管理します (配膳。1 つのテーブルは 4 席です)
kitchen.give(to=self.cutlery, knives=4, forks=4)
elif task == 'clear table': # 自分が管理するカトラリーをキッチンへ返します (片付け)
self.cutlery.give(to=kitchen, knives=4, forks=4)
elif task == 'shutdown':
return


kitchen = Cutlery(knives=100, forks=100) # キッチンにある全てのカトラリー。グローバル変数として宣言
# 全てのロボットはこの Cutlery インスタンスとカトラリーのやり取りを行います


bots = [ThreadBot() for i in range(10)] # 給仕ロボットは 10 台でテストします


tables_in_restaurant = 100 # このレストランのテーブル数
# それぞれのロボットは全てのテーブルに対して給仕、後片付けをそれぞれ実施します


# 全ての給仕ロボットそれぞれに対して、レストラン内全てのテーブルの準備、片付けタスクを命じる
for bot in bots:
for i in range(tables_in_restaurant):
bot.tasks.put('prepare table')
bot.tasks.put('clear table')
bot.tasks.put('shutdown') # このタスクを受け取ってロボットは停止 ( return ) します -> スレッドの join() が return します



さてテストしてみましょう。


ロボットは 10 台、テーブル数は 100。最初のカトラリー在庫はナイフ、フォークが 100 セットです。


もちろん全てのロボットのタスク終了後には、キッチンに元通り 100 セットのカトラリーがなければいけませんね。


print(f"給仕前の在庫: {kitchen}")


for bot in bots: # ロボット、仕事スタート
bot.start()


for bot in bots: # 全部のロボットの仕事終了待ち、ガンバレ!
bot.join()


print(f"給仕後の在庫: {kitchen}")

# 給仕前の在庫: Knives: 100, Forks: 100
# 給仕後の在庫: Knives: 100, Forks: 100



おおっ、全てのナイフ、フォークはキッチンに戻っています。


ところが、テーブル数を 10000 にしてテストしてみると ...


tables_in_restaurant = 10000

# 給仕前の在庫: Knives: 100, Forks: 100
# 給仕後の在庫: Knives: 96, Forks: 108



複数回実行してみても、エラーはコンスタントに発生します。


レストラン内のテーブル数が 100 の時は何の問題も発生しませんでした。


しかし、テーブル数を 10,000 に増やしたところ、ナイフとフォークの数が合わなくなり始めました


しかも、その 増減の度合いは一定せず、実行するたびに増えたり減ったりしています


ThreadBot の プログラムは単純で、難しく複雑な作業をしているわけではありません


さぁ、データ競合 がこのバグに絡んでいるような臭いがプンプンしています。


そうです、実は Cutlery クラスのちょっとしたコードにこの原因が潜んでいるんです。


def change(self, knives, forks):
self.knives += knives
self.forks += forks



change() 内で記述されている += オペレーター こそ今回のデータ競合の大元です。


この複合オペレーターは内部的に次の 3 つのステップを踏んで処理されます。


1: self.knives の現在の値を読み取りメモリ内の別領域に一時的に保存します。


2: パラメータ値 ( knives ) を一時保存領域内の値に加えます。


3: 合計値を一時的保存領域から元の領域へコピーします。



プリエンプティブなマルチタスク ( preemptive multitasking ) では、あるスレッドがこの複数ステップを一塊とみる処理 ( アトミック処理; atomic operation ) を実行している最中にスレッド切り替えが発生し、切り替わったスレッドも同じ処理にとりかかる可能性が常に存在します。


ですから、ThreadBot A が 1: を実行したときに OS スケジューラによるスレッド切り替えが発生し、ThreadBot B が 1:2: を実行したところで再び ThreadBot A に制御が戻り、2:3: を実行後再び ThreadBot B が動作し 3: を処理したとすると、ThreadBot A が加えた変更は、アレッ!?、どこかへ消えてしまっていますね。


このデータ競合例は非常に単純なケースです。


ですが実際の大きなプロジェクトにおいて、全てのコードをチェックし直し、実環境で稼動させてエラーを発生させて原因を突き止め対処する、ということは非常に困難です。


また、この問題に対しては threading モジュールの Lock クラスを利用して、複数のスレッドが変更を加えるデータに対する操作を排他的にすることで対処することは可能です。


from threading import Lock


class Cutlery:
def __init__(self, knives=0, forks=0):
...
self.lock = Lock()
...

def change(self, knives, forks):
with self.lock:
self.knives += knives
self.forks += forks



ですが、データが複数のスレッドで操作される全ての場所にこの処置を施す必要があります。


そのためには、プログラム内の全てのコードをあなた自身が把握しコントロールできていなければなりません。


しかしサードパーティーから提供されているライブラリを利用することで容易に機能を拡張できる Python の長所がここでは障害になってしまいます。


このようなライブラリを使っていればいるほど、Lock だけに頼ったデータ競合予防策は困難を極めてしまうでしょう。


また、ソースコードを見ただけではデータ競合を予防できないのが現実です。


それは、プログラム実行中の何処でスレッド間の実行権切り替えが行われるのか、ということがソースコードの何処にも記述されていないからです。


問題の根源である OS におけるタスクスイッチはいつ何処でも起こり得るんです。


先程の例でも見たように、複数のスレッドの切り替えは OS のさじ加減 1 つで決まってしまいます。


さらに OS 自体はスレッドがどんな作業を実行しているか、には一切関知していないんです、プログラム側の対処には限界がありますね。


となると、よりよい解決方法は、これが async プログラミングにもつながってくるのですが、1 台の ThreadBot だけですべての給仕作業をこなさせるように設定してしまうことです。


そうすれば、ナイフとフォークをキッチンと受け渡しするスレッドは 1 つだけ、ということになりますからね。


さらに asyn パージョンのプログラムでは、並列処理を実行している複数のコルーチン間でいつコンテキストスイッチ ( context switch ) が実行されるかを明確に把握することが出来ます


なぜなら、await キーワード が記述されている場所こそが、コンテキストスイッチが行われる場所であることを明示していますから。


今後少し asyncio についての勉強が進んだところで、今回のカトラリー管理プログラムの async バージョンを取り上げてみます、お楽しみに!

この投稿をメールでシェアする

0 comments

コメントはまだありません。

コメントを追加する(不適切と思われるコメントは削除する場合があります)