検索ガイド -Search Guide-

単語と単語を空白で区切ることで AND 検索になります。
例: python デコレータ ('python' と 'デコレータ' 両方を含む記事を検索します)
単語の前に '-' を付けることで NOT 検索になります。
例: python -デコレータ ('python' は含むが 'デコレータ' は含まない記事を検索します)
" (ダブルクオート) で語句を囲むことで 完全一致検索になります。
例: "python data" 実装 ('python data' と '実装' 両方を含む記事を検索します。'python data 実装' の検索とは異なります。)
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
  • ただいまサイドメニューのテスト中/ただいまサイドメニューのテスト中
>>
effective_python

【 Effective Python, 2nd Edition 】threading モジュールの Lock クラスを利用してマルチスレッド実行時のデータ競合 ( data races ) を予防しよう! GIL はデータ構造 ( data structure ) の安全性まで面倒を見てくれません 投稿一覧へ戻る

Published 2020年8月17日15:59 by mootaro23

SUPPORT UKRAINE

- Your indifference to the act of cruelty can thrive rogue nations like Russia -

こちらの記事 で、Python におけるマルチスレッド実行は GIL ( Global Interpreter Lock ) の制約を受けてある瞬間に動作している CPU コアは 1 つだけであり、シリアル実行とトータルな実行時間は変わらない、それは、予期しない割り込みによってインタプリタが保持しておくべき状態が影響を受けないようにするためだ、という話をしました。


であれば、プログラム側で相互排他ロック ( mutual-exclusion lock; mutex ) を実装する必要などなさそうです。


だって、GIL がマルチコアによる並行実行を許可していない以上、プログラムのデータ構造も競合による影響は受けないはずですから。


確かに、list 型や dictionary 型などを用いたテストではそのようなのですが、実は「絶対に」守られているか、というと決してそんなことはないんです!!


ある瞬間では 1 つのスレッドしか動作していないとしても、そのスレッドが、複数の命令を 1 セットとしてデータ構造を操作している最中に「動作スレッドの切り替え」という割り込みが入ってしまう可能性は常に存在しています。


そしてこの可能性は、複数のスレッドで動作している複数のタスクが同じオブジェクトに対して処理を行っているシチュエーションでは非常に危険です。


例えば、複数のセンサーの計測カウントを集計するプログラムを考えます。


class Counter:
def __init__(self):
self.count = 0

def increment(self, adding):
self.count += adding



それぞれのセンサーはそれぞれ別のワーカースレッドで処理します。


だって、センサーとのやり取りはブロッキング I/O ( blocking I/O ) を伴いますから、Python においてもマルチスレッド実行の恩恵を受けられますからね!


それぞれのワーカースレッドではセンサーからの信号を受ける度にカウンターをインクリメントしていきます
( この例では、 最初に与えた数値までカウントをインクリメントし続けることで代替しています )。


def worker(sensor_index, how_many, counter):
for _ in range(how_many):
# センサーとのやり取りをしています、少々お待ちください
# ...
counter.increment(1)


from threading import Thread


how_many = 10 ** 5
counter = Counter()
threads = []


for i in range(5):
thread = Thread(target=worker,
args=(i, how_many, counter))
thread.start()
threads.append(thread)


for thread in threads:
thread.join()


expected = how_many * 5
found = counter.count


print(f"予想されるカウンター数値: {expected}, 実際の数値: {found}")

# 予想されるカウンター数値: 500000, 実際の数値: 356681



 ????


100000 回インクリメントを実行するワーカースレッドが 5 つ動いているわけですから、最終的な count の値は 100000 * 5 = 500000 のはずです。


さて、こんな単純なプログラムのどの部分で何が起こっているのでしょうか?


しかもある瞬間に動作しているスレッドは1つだけなんですよ! 同時に動いているスレッドはないんですよ!


マルチスレッド動作時には、Python インタプリタは、全てのスレッドの処理時間がほぼ均等になるように操作します。


ある意味「公平性の押し売り」をするワケです。


そのために、動作中のスレッドを停止し、待機中のスレッドを再実行する、というプロセスを繰り返します。


問題は、いつ Python が動作中のスレッドを停止するか分からない、ということです。


つまり、複数の操作が一塊りとなってある 1 つの処理を成し遂げるようなアトミック操作 ( 不可分操作; atomic operation ) の中途で、この「公平性の押し売り」操作が発生する可能性は常にある、ということなんです。


そして今回の結果もまさにそれが起こった証拠なんです。


Counter クラスの increment() メソッドでは count インスタンス変数をインクリメントしています。


class Counter:
def __init__(self):
self.count = 0

def increment(self, adding):
self.count += adding



各ワーカースレッドからはこの increment メソッドを呼び出してカウンタをインクリメントしているわけですが、


counter.increment(1)



という関数呼び出しは、ワーカースレッド側からすれば、


counter.count += 1



という式を実行しているのと等しいといえます。


そして、この += オペレータは、Python に対して 3 つの別々の操作を一塊りとして実行してください、とお願いしていることです。


value = getattr(counter, 'count')


result = value + 1


setattr(counter, 'count', result)



この 3 段階のどこかのステージでスレッドの切り替えが発生し、しばらく後に処理途中だった元のスレッドが復帰して「古い」値を count 変数へセットしてしまったら ...


お、おっー、恐ろしい ... !!!


スレッド A が実行中


value_a = getattr(counter, 'count') # counter.count: 10 としましょう、もちろん、value_a: 10 です



スレッド B への切り替えが発生


value_b = getattr(counter, 'count') # value_b: 10 ですね


result_b = value_b + 1 # result_b: 11 です


setattr(counter, 'count', result_b) # count: 11 になりました



スレッド A への切り替えが発生


result_a = value_a + 1 # result_a: 11 です


setattr(counter, 'count', result_a) # count: 11 です。本当はスレッド B の変更を反映して 12 でなければダメですよね。



スレッド A の 3 段階アトミック操作が終了する途中でスレッド B に切り替わってしまいました。


このときスレッド B もちょうど count 変数を操作する処理をしていたとします。


スレッド B はめでたくこの処理を完遂し、スレッド A に操作が戻ります。スレッド A は中断された次の処理から再開します。


ここでスレッド A はスレッド B の結果を上書きする形になりますよね。スレッド B が実行したことは無に帰してしまったわけです。


そして、まさにこれが上の例で起こっていたことなんです。


このようなデータ操作競合 ( data race ) 、データ構造破損 ( data structure corruption ) を予防するために、Python の threading 組み込みモジュールにはいくつかのツールが用意されています。


その中でも最も利用しやすいものは相互排他ロックを実現するための Lock クラスです。


Lock クラスオブジェクトでアトミック処理を「囲む」ことで、この操作中に他のスレッドへ切り替えられる心配がなくなります。


つまり、複数のスレッドが同時に同じデータ構造へアクセスすることを防止することが出来るようになります。


from threading import Thread, Lock


class LockingCounter:
def __init__(self):
self.lock = Lock()
self.count = 0

def increment(self, adding):
with self.lock:
self.count += adding



Lock オブジェクトは with 文のコンテキストマネージャ ( context manager ) として利用できますから実装もとっても簡単!


あとは LockingCounter クラスを使ってオブジェクトを作れば作業は終了です。


def worker(sensor_index, how_many, counter):
for _ in range(how_many):
# センサーとのやり取りをしています、少々お待ちください ...
counter.increment(1)


how_many = 10 ** 5
counter = LockingCounter()
threads = []


for i in range(5):
thread = Thread(target=worker,
args=(i, how_many, counter))
thread.start()
threads.append(thread)


for thread in threads:
thread.join()


expected = how_many * 5
found = counter.count


print(f"予想されるカウンター数値: {expected}, 実際の数値: {found}")

# 予想されるカウンター数値: 500000, 実際の数値: 500000



まとめ:

1: Python におけるマルチスレッド実行では GIL の影響により複数スレッドの同時実行は禁止されていますが、スレッド間におけるデータ競合の面倒までは見てくれません。

2: 相互排他ロック ( mutexes ) を実装せずに複数スレッドに対して同一オブジェクトへの変更を許可していると、プログラムにおけるデータ構造の崩壊を招く恐れがあります。

3: threading 組み込みモジュールで提供されている Lock クラスを利用することで、簡単な実装でマルチスレッド間におけるデータ競合を防止することが出来ます。

この記事に興味のある方は次の記事にも関心を持っているようです...
- People who read this article may also be interested in following articles ... -
【 Python Data Structures and Algorithms 】Python とデータ構造 ( data structure ) - linked list を実装してみよう! Linked List 実装 Python Programming Interview 模擬問題
【 Effective Python, 2nd Edition 】Queue クラスを利用した producer-consumer パイプライン ( pipelines ) を構築して、マルチスレッドシーケンス処理をエレガントに管理しよう! 並行実行 ( parallelism ) と並列処理 ( concurrency ) もついでにちゃんとイメージしよう!
【Python 雑談・雑学 + coding challenge】collections モジュールの Counter クラスと most_common メソッドを利用してシーケンス内の最頻出要素を取得しよう!
【 Effective Python, 2nd Edition 】ブロッキング I/O ( blocking I/O ) とスレッド ( thread ) を利用しているプログラムを、asyncio 組み込みモジュールを利用してコルーチン ( coroutine ) と非同期 I/O ( asyncronous I/O ) を利用したプログラムにリファクタリング ( refactoring ) しよう!
【 Effective Python, 2nd Edition 】プログラムを並列処理 ( concurrency ) パターンへ移行するタイミングとツールを考えるシリーズ 第 5 回 - 並列処理 ( concurrency ) のためにスレッド ( thread ) を利用する場合は concurrent.futures モジュールの ThreadPoolExecutor の導入を検討しましょう、の巻
【 Effective Python, 2nd Edition 】入力元のデータサイズが大きい場合は、リスト内包表記 ( list comprehension ) ではなくジェネレータ式 ( generator expression ) の利用を検討しよう!
【 Effective Python, 2nd Edition 】デスクリプタ ( descriptor ) を利用して @property で行っていた属性値への操作を再利用できるようにしよう!