cover_effective_python

【 Effective Python, 2nd Edition 】threading モジュールの Lock クラスを利用してマルチスレッド実行時のデータ競合 ( data races ) を予防しよう! GIL はデータ構造 ( data structure ) の安全性まで面倒を見てくれません 投稿一覧へ戻る

Tags: Python , Effective , muttual-exclusion lock , Lock , multi threading

Published 2020年8月17日15:59 by T.Tsuyoshi

こちらの記事 で、Python におけるマルチスレッド実行は GIL ( Global Interpreter Lock ) の制約を受けてある瞬間に動作している CPU コアは 1 つだけであり、シリアル実行とトータルな実行時間は変わらない、それは、予期しない割り込みによってインタプリタが保持しておくべき状態が影響を受けないようにするためだ、という話をしました。


であれば、プログラム側で相互排他ロック ( mutual-exclusion lock; mutex ) を実装する必要などなさそうです。


だって、GIL がマルチコアによる並行実行を許可していない以上、プログラムのデータ構造も競合による影響は受けないはずですから。


確かに、list 型や dictionary 型などを用いたテストではそのようなのですが、実は「絶対に」守られているか、というと決してそんなことはないんです!!


ある瞬間では 1 つのスレッドしか動作していないとしても、そのスレッドが、複数の命令を 1 セットとしてデータ構造を操作している最中に「動作スレッドの切り替え」という割り込みが入ってしまう可能性は常に存在しています。


そしてこの可能性は、複数のスレッドで動作している複数のタスクが同じオブジェクトに対して処理を行っているシチュエーションでは非常に危険です。


例えば、複数のセンサーの計測カウントを集計するプログラムを考えます。


class Counter:
def __init__(self):
self.count = 0

def increment(self, adding):
self.count += adding



それぞれのセンサーはそれぞれ別のワーカースレッドで処理します。


だって、センサーとのやり取りはブロッキング I/O ( blocking I/O ) を伴いますから、Python においてもマルチスレッド実行の恩恵を受けられますからね!


それぞれのワーカースレッドではセンサーからの信号を受ける度にカウンターをインクリメントしていきます
( この例では、 最初に与えた数値までカウントをインクリメントし続けることで代替しています )。


def worker(sensor_index, how_many, counter):
for _ in range(how_many):
# センサーとのやり取りをしています、少々お待ちください
# ...
counter.increment(1)


from threading import Thread


how_many = 10 ** 5
counter = Counter()
threads = []


for i in range(5):
thread = Thread(target=worker,
args=(i, how_many, counter))
thread.start()
threads.append(thread)


for thread in threads:
thread.join()


expected = how_many * 5
found = counter.count


print(f"予想されるカウンター数値: {expected}, 実際の数値: {found}")

# 予想されるカウンター数値: 500000, 実際の数値: 356681



 ????


100000 回インクリメントを実行するワーカースレッドが 5 つ動いているわけですから、最終的な count の値は 100000 * 5 = 500000 のはずです。


さて、こんな単純なプログラムのどの部分で何が起こっているのでしょうか?


しかもある瞬間に動作しているスレッドは1つだけなんですよ! 同時に動いているスレッドはないんですよ!


マルチスレッド動作時には、Python インタプリタは、全てのスレッドの処理時間がほぼ均等になるように操作します。


ある意味「公平性の押し売り」をするワケです。


そのために、動作中のスレッドを停止し、待機中のスレッドを再実行する、というプロセスを繰り返します。


問題は、いつ Python が動作中のスレッドを停止するか分からない、ということです。


つまり、複数の操作が一塊りとなってある 1 つの処理を成し遂げるようなアトミック操作 ( 不可分操作; atomic operation ) の中途で、この「公平性の押し売り」操作が発生する可能性は常にある、ということなんです。


そして今回の結果もまさにそれが起こった証拠なんです。


Counter クラスの increment() メソッドでは count インスタンス変数をインクリメントしています。


class Counter:
def __init__(self):
self.count = 0

def increment(self, adding):
self.count += adding



各ワーカースレッドからはこの increment メソッドを呼び出してカウンタをインクリメントしているわけですが、


counter.increment(1)



という関数呼び出しは、ワーカースレッド側からすれば、


counter.count += 1



という式を実行しているのと等しいといえます。


そして、この += オペレータは、Python に対して 3 つの別々の操作を一塊りとして実行してください、とお願いしていることです。


value = getattr(counter, 'count')


result = value + 1


setattr(counter, 'count', result)



この 3 段階のどこかのステージでスレッドの切り替えが発生し、しばらく後に処理途中だった元のスレッドが復帰して「古い」値を count 変数へセットしてしまったら ...


お、おっー、恐ろしい ... !!!


スレッド A が実行中


value_a = getattr(counter, 'count') # counter.count: 10 としましょう、もちろん、value_a: 10 です



スレッド B への切り替えが発生


value_b = getattr(counter, 'count') # value_b: 10 ですね


result_b = value_b + 1 # result_b: 11 です


setattr(counter, 'count', result_b) # count: 11 になりました



スレッド A への切り替えが発生


result_a = value_a + 1 # result_a: 11 です


setattr(counter, 'count', result_a) # count: 11 です。本当はスレッド B の変更を反映して 12 でなければダメですよね。



スレッド A の 3 段階アトミック操作が終了する途中でスレッド B に切り替わってしまいました。


このときスレッド B もちょうど count 変数を操作する処理をしていたとします。


スレッド B はめでたくこの処理を完遂し、スレッド A に操作が戻ります。スレッド A は中断された次の処理から再開します。


ここでスレッド A はスレッド B の結果を上書きする形になりますよね。スレッド B が実行したことは無に帰してしまったわけです。


そして、まさにこれが上の例で起こっていたことなんです。


このようなデータ操作競合 ( data race ) 、データ構造破損 ( data structure corruption ) を予防するために、Python の threading 組み込みモジュールにはいくつかのツールが用意されています。


その中でも最も利用しやすいものは相互排他ロックを実現するための Lock クラスです。


Lock クラスオブジェクトでアトミック処理を「囲む」ことで、この操作中に他のスレッドへ切り替えられる心配がなくなります。


つまり、複数のスレッドが同時に同じデータ構造へアクセスすることを防止することが出来るようになります。


from threading import Thread, Lock


class LockingCounter:
def __init__(self):
self.lock = Lock()
self.count = 0

def increment(self, adding):
with self.lock:
self.count += adding



Lock オブジェクトは with 文のコンテキストマネージャ ( context manager ) として利用できますから実装もとっても簡単!


あとは LockingCounter クラスを使ってオブジェクトを作れば作業は終了です。


def worker(sensor_index, how_many, counter):
for _ in range(how_many):
# センサーとのやり取りをしています、少々お待ちください ...
counter.increment(1)


how_many = 10 ** 5
counter = LockingCounter()
threads = []


for i in range(5):
thread = Thread(target=worker,
args=(i, how_many, counter))
thread.start()
threads.append(thread)


for thread in threads:
thread.join()


expected = how_many * 5
found = counter.count


print(f"予想されるカウンター数値: {expected}, 実際の数値: {found}")

# 予想されるカウンター数値: 500000, 実際の数値: 500000



まとめ:

1: Python におけるマルチスレッド実行では GIL の影響により複数スレッドの同時実行は禁止されていますが、スレッド間におけるデータ競合の面倒までは見てくれません。

2: 相互排他ロック ( mutexes ) を実装せずに複数スレッドに対して同一オブジェクトへの変更を許可していると、プログラムにおけるデータ構造の崩壊を招く恐れがあります。

3: threading 組み込みモジュールで提供されている Lock クラスを利用することで、簡単な実装でマルチスレッド間におけるデータ競合を防止することが出来ます。

この投稿をメールでシェアする

0 comments

コメントはまだありません。

コメントを追加する(不適切と思われるコメントは削除する場合があります)