以下の内容はhttps://kazuhira-r.hatenablog.com/entry/2024/11/16/212101より取得しました。


Pythonでスレッドに関する標準ライブラリー(スレッドローカルデータ、Lock、Condition、Semaphore、Event、Timer、Barrier)を扱う

これは、なにをしたくて書いたもの?

Pythonで、マルチスレッドに関する標準ライブラリーを知っておきたいなと思いまして。

ちなみにスレッド自体は過去にも扱っています。

Pythonのスレッドは、ネイティブスレッドなのか? - CLOVER🍀

PythonのTCPServer/HTTPServerをマルチスレッドで使う - CLOVER🍀

今回はロックやセマフォといったスレッド間に関する標準ライブラリーを見ていきます。

threadingライブラリー

threadingライブラリーのページはこちら。

threading --- スレッドベースの並列処理 — Python 3.10.15 ドキュメント

こちらには以下のAPIやクラスが含まれています。

  • スレッドローカルデータ … スレッドごとに固有の値を設定する
  • Lock … ロック、アンロックが可能で、特定のスレッドがロックを獲得している時に他のスレッドがロックを獲得しようとすると、先に獲得されたロックがアンロックされるまで待機する
  • RLock … 再入可能ロック(Reentrant Lock)。Lockと異なり同じスレッドが再帰的にロックを獲得可能
  • Condition … ロックに関連付けられたうえで、waitnotifynotify_all)でスレッドの待機/起動を操作できるオブジェクト
  • Semaphore … いわゆるセマフォで、ある範囲に対して同時に実行できるスレッド数を制限する仕組み
  • Event … あるスレッドがイベントを発信し、他のスレッドはイベントの発信を待つというスレッド間通信を行う仕組み
  • Timer … 一定時間後にスレッドを実行する仕組み
  • Barrier … 複数のスレッドの待ち合わせを行う仕組み

なお、LockRLockConditionSemaphoreはコンテキストマネージャーとして使えます。

with 文でのロック・条件変数・セマフォの使い方

具体的にはロックの獲得をacquireで行い、解放にreleaseを使うものはコンテキストマネージャーとして使えるようになっていて、
withブロックに入る時にacquireが呼び出されwithブロックを抜ける時にreleaseが呼び出されます。

ところで、PythonにおけるスレッドはGILがあるので1プロセス内で同時に実行できるスレッドはひとつだけです。Pythonでマルチスレッドが
有効なのはIOバウンドな処理を並列して実行したい時ですね。

CPython 実装の詳細: CPython は Global Interpreter Lock のため、ある時点で Python コードを実行できるスレッドは1つに限られます (ただし、いくつかのパフォーマンスが強く求められるライブラリはこの制限を克服しています)。アプリケーションにマルチコアマシンの計算能力をより良く利用させたい場合は、 multiprocessing モジュールや concurrent.futures.ProcessPoolExecutor の利用をお勧めします。 ただし、I/Oバウンドなタスクを並行して複数走らせたい場合においては、 マルチスレッドは正しい選択肢です。

ちなみにスレッド自体を直接扱うのではなく、concurrent.futuresThreadPoolExecutorを使うのがよいと思います。

concurrent.futures -- 並列タスク実行 — Python 3.10.15 ドキュメント

今回はこのあたりを試してみたいと思います。

環境

今回の環境はこちら。

$ python3 --version
Python 3.10.12


$ pip3 --version
pip 22.0.2 from /usr/lib/python3/dist-packages/pip (python 3.10)

準備

確認はpytestで行いたいと思います。型チェックにmypyも入れておきます。

$ pip3 install pytest mypy

インストールされたライブラリーの一覧。

$ pip3 list
Package           Version
----------------- -------
exceptiongroup    1.2.2
iniconfig         2.0.0
mypy              1.13.0
mypy-extensions   1.0.0
packaging         24.2
pip               22.0.2
pluggy            1.5.0
pytest            8.3.3
setuptools        59.6.0
tomli             2.1.0
typing_extensions 4.12.2

動作確認はpytestを使ったテストコードで行いますが、雛形はこちらです。

tests/test_threading.py

from concurrent.futures import ThreadPoolExecutor
import datetime
import threading
import time

def log(message: str) -> None:
    print(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {threading.current_thread().name} - {message}")

またテスト中に標準出力への書き出しを行うので、pytestは--capture=noオプションを指定して実行します。

$ pytest --capture=no

では、試していってみましょう。

スレッドローカルデータ

最初はスレッドローカルデータから。

スレッドローカルデータ

スレッドローカルデータは、そのスレッド固有のデータを持たせる仕組みです。あたかも単一スレッド前提のような使い方をするコードで
複数スレッドで実行しても、それぞれのデータが独立して扱えるので便利です。

サンプルコード。

def test_thread_local_data() -> None:
    results = {}

    localdata = threading.local()

    def thread1() -> None:
        time.sleep(3)

        localdata.mydata = "Hello from thread1"

        time.sleep(2)

        log(f"thread1 data = {localdata.mydata}")

        assert localdata.mydata == "Hello from thread1"

        results[threading.current_thread().name] = "done"

    def thread2() -> None:
        time.sleep(2)

        localdata.mydata = "Hello from thread2"

        time.sleep(3)

        log(f"thread2 data = {localdata.mydata}")

        assert localdata.mydata == "Hello from thread2"

        results[threading.current_thread().name] = "done"

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(thread1))
        futures.append(executor.submit(thread2))

        [f.result() for f in futures]

        assert len(results) == 2

スレッドローカルデータは、threading.localで取得したオブジェクトで表現されます。

    localdata = threading.local()

スレッドローカルデータは辞書のように扱えます。

同じオブジェクトに各スレッドが同じ属性に書き込んでいますが、それぞれのスレッドが設定した値がしっかり残っています。

    def thread1() -> None:
        time.sleep(3)

        localdata.mydata = "Hello from thread1"

        time.sleep(2)

        log(f"thread1 data = {localdata.mydata}")

        assert localdata.mydata == "Hello from thread1"

        results[threading.current_thread().name] = "done"

    def thread2() -> None:
        time.sleep(2)

        localdata.mydata = "Hello from thread2"

        time.sleep(3)

        log(f"thread2 data = {localdata.mydata}")

        assert localdata.mydata == "Hello from thread2"

        results[threading.current_thread().name] = "done"

標準出力の結果。

[2024-11-16 20:32:29] ThreadPoolExecutor-0_0 - thread1 data = Hello from thread1
[2024-11-16 20:32:29] ThreadPoolExecutor-0_1 - thread2 data = Hello from thread2

このように、スレッドごとに固有の値を管理できる仕組みです。

ちなみにスレッドローカルデータはlocalを継承することで独自のスレッドローカルデータを作れたりするのですが、ドキュメントにほとんど
説明がありません。APIの説明もないですね。

詳しくはソースコードを見ること、だそうです。

詳細と例題については、 _threading_local モジュールのドキュメンテーション文字列を参照してください。

https://github.com/python/cpython/blob/v3.10.12/Lib/_threading_local.py

Lock、RLock

次はLockRLockです。

まずはLockから。

def test_lock() -> None:
    lock = threading.Lock()

    results = {}
    
    def with_lock() -> None:
        log("try lock")

        lock.acquire()

        try:
            log("start")

            time.sleep(2)

            log("end")

            results[threading.current_thread().name] = "done"
        finally:
            lock.release()

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(with_lock))
        futures.append(executor.submit(with_lock))

        [f.result() for f in futures]

        assert len(results) == 2

Lockを作成して

    lock = threading.Lock()

Lock#acquireでロックを獲得できます。ロックを獲得できるスレッドはひとつだけで、他のスレッドがLock#acquireを呼び出した場合は
Lock#releaseでロックが解放されるまで待たされることになります。

    def with_lock() -> None:
        log("try lock")

        lock.acquire()

        try:
            log("start")

            time.sleep(2)

            log("end")

            results[threading.current_thread().name] = "done"
        finally:
            lock.release()

なのでfinallyで確実にロックを解放する必要があります。

標準出力に書き出された結果を見ると、最初にロックを取得したスレッドがロックを開放するまで2つ目のスレッドが待たされているのが
確認できます。

[2024-11-16 20:41:09] ThreadPoolExecutor-0_0 - try lock
[2024-11-16 20:41:09] ThreadPoolExecutor-0_0 - start
[2024-11-16 20:41:09] ThreadPoolExecutor-0_1 - try lock
[2024-11-16 20:41:11] ThreadPoolExecutor-0_0 - end
[2024-11-16 20:41:11] ThreadPoolExecutor-0_1 - start
[2024-11-16 20:41:13] ThreadPoolExecutor-0_1 - end

なお、Lockはコンテキストマネージャーに対応しているのでwithを使ってシンプルに書くことができます。

        with lock:
            log("start")

            time.sleep(2)

            log("end")

            results[threading.current_thread().name] = "done"

こちらの方がLock#releaseの呼び出し忘れなどがなくてよいでしょう。以降はwithでロックを扱います。

なお、Lockを使ったロックの場合、ロックを獲得したスレッドであってもロック解放前にLock#acquireを呼び出した場合はロックを取得できず
待たされることになります。

つまり、以下のようなコードを書いてしまうとロックを取得したスレッドが止まってしまいます。

def test_lock_reentrant() -> None:
    lock = threading.Lock()

    results = {}
    
    def with_lock() -> None:
        log("try lock")

        with lock:
            log("start")

            time.sleep(2)

            log("reentrant lock")

            with lock:  # ここで進まなくなる
                log("do something")

            log("release reentrant lock")

            log("end")

            results[threading.current_thread().name] = "done"

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(with_lock))
        futures.append(executor.submit(with_lock))

        [f.result() for f in futures]

        assert len(results) == 2

実行した場合は、標準出力がここで停止します。

[2024-11-16 20:44:08] ThreadPoolExecutor-0_0 - try lock
[2024-11-16 20:44:08] ThreadPoolExecutor-0_0 - start
[2024-11-16 20:44:08] ThreadPoolExecutor-0_1 - try lock
[2024-11-16 20:44:10] ThreadPoolExecutor-0_0 - reentrant lock

つまり、Lockは最入可能ではありません。

最入可能なロックが必要な場合はRLockを使います。

先程のコードをRLockを使って書き直したものがこちらです。

def test_rlock() -> None:
    lock = threading.RLock()

    results = {}
    
    def with_lock() -> None:
        with lock:
            log("start")

            time.sleep(2)

            log("reentrant lock")

            with lock:
                log("do something")

            log("release reentrant lock")

            log("end")

            results[threading.current_thread().name] = "done"

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(with_lock))
        futures.append(executor.submit(with_lock))

        [f.result() for f in futures]

        assert len(results) == 2

Lockインスタンスを作成していたところをRLockにするだけで、あとはLockと使い方は同じですね。

    lock = threading.RLock()

ただしRLockは最入可能なので、先ほどはLockで動作しなかったひとつのスレッドが同じロックインスタンスに対して2回acquire
呼び出すようなコードであっても

    def with_lock() -> None:
        with lock:
            log("start")

            time.sleep(2)

            log("reentrant lock")

            with lock:
                log("do something")

            log("release reentrant lock")

            log("end")

            results[threading.current_thread().name] = "done"

このように動くようになります。

[2024-11-16 20:50:49] ThreadPoolExecutor-0_0 - start
[2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - reentrant lock
[2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - do something
[2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - release reentrant lock
[2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - end
[2024-11-16 20:50:51] ThreadPoolExecutor-0_1 - start
[2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - reentrant lock
[2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - do something
[2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - release reentrant lock
[2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - end

Condition

Conditionはロックに関連付けられるオボジェクトで、スレッドを待機させたり起こしたりできます。

サンプルはこちら。

def test_condition() -> None:
    condition = threading.Condition()

    results = {}

    def wait_task() -> None:
        with condition:
            log("waiting...")

            condition.wait()

            log("wakeup")

            results[threading.current_thread().name] = "done"

    def notify_task() -> None:
        with condition:
            log("notify")

            condition.notify_all()

            log("done")

            results[threading.current_thread().name] = "done"

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(wait_task))
        futures.append(executor.submit(wait_task))

        time.sleep(3)

        futures.append(executor.submit(notify_task))

        [f.result() for f in futures]

        assert len(results) == 3

Conditionコンストラクターインスタンスを取得しますが、引数を指定しない場合は内部的にRLockインスタンスを作成します。

    condition = threading.Condition()

引数を指定する場合は、LockまたはRLockインスタンスを渡す必要があります。

Condition#waitでスレッドを待機させます。Conditionに対する操作は、ロックを獲得したうえで行う必要があります。

    def wait_task() -> None:
        with condition:
            log("waiting...")

            condition.wait()

            log("wakeup")

            results[threading.current_thread().name] = "done"

そしてCondition#nofityまたはCondition#notify_allで待機しているスレッドを起こすことができます。

    def notify_task() -> None:
        with condition:
            log("notify")

            condition.notify_all()

            log("done")

            results[threading.current_thread().name] = "done"

Condition#nofityではひとつまたは指定した数のスレッドを、Condition#notify_allでは待機しているスレッドすべてを起こすことができます。

標準出力の結果はこちら。

[2024-11-16 20:59:00] ThreadPoolExecutor-0_0 - waiting...
[2024-11-16 20:59:00] ThreadPoolExecutor-0_1 - waiting...
[2024-11-16 20:59:03] ThreadPoolExecutor-0_2 - notify
[2024-11-16 20:59:03] ThreadPoolExecutor-0_2 - done
[2024-11-16 20:59:03] ThreadPoolExecutor-0_0 - wakeup
[2024-11-16 20:59:03] ThreadPoolExecutor-0_1 - wakeup

ちなみに、Condition#wait_forという引数に指定した関数の戻り値がTrueになるとスレッドが起きるようにするAPIもあるようです。

Semaphore

Semaphoreは、ある範囲を同時に実行できるスレッドの数を制限する仕組みです。

Semaphore

サンプルコードはこちら。

def test_semaphore() -> None:
    semaphore = threading.Semaphore(2)

    results = {}

    def with_semaphore() -> None:
        log("acquire semaphore")

        with semaphore:
            log("enter semaphore")

            time.sleep(2)

            log("leave semaphore")

            results[threading.current_thread().name] = "done"

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(with_semaphore))
        futures.append(executor.submit(with_semaphore))
        futures.append(executor.submit(with_semaphore))
        futures.append(executor.submit(with_semaphore))

        [f.result() for f in futures]

        assert len(results) == 4

Semaphoreは、コンストラクターに同時に実行できるスレッド数を指定してインスタンスを生成します。ここでは2を指定しています。

    semaphore = threading.Semaphore(2)

あとはLockRLockのようにロックしたい範囲を指定して使います。

    def with_semaphore() -> None:
        log("acquire semaphore")

        with semaphore:
            log("enter semaphore")

            time.sleep(2)

            log("leave semaphore")

            results[threading.current_thread().name] = "done"

今回は4つのスレッドを実行しているのですが、最初に入った2つのスレッドのどちらかが抜けるまでは3つ目、4つ目のスレッドはロックを
獲得できず待機しています。

[2024-11-16 21:02:53] ThreadPoolExecutor-0_0 - acquire semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_0 - enter semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_1 - acquire semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_1 - enter semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_2 - acquire semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_3 - acquire semaphore
[2024-11-16 21:02:55] ThreadPoolExecutor-0_0 - leave semaphore
[2024-11-16 21:02:55] ThreadPoolExecutor-0_2 - enter semaphore
[2024-11-16 21:02:55] ThreadPoolExecutor-0_1 - leave semaphore
[2024-11-16 21:02:55] ThreadPoolExecutor-0_3 - enter semaphore
[2024-11-16 21:02:57] ThreadPoolExecutor-0_3 - leave semaphore
[2024-11-16 21:02:57] ThreadPoolExecutor-0_2 - leave semaphore

Event

Eventは、Eventというオブジェクトを仲介してあるスレッドがイベントを発信した時に、Eventからの通知を待っているスレッドを起動する
しくみです。

Event

サンプルコードはこちら。

def test_event() -> None:
    event = threading.Event()

    results = {}

    def wait_event() -> None:
        log("wait...")

        event.wait()

        log("wake up")

        results[threading.current_thread().name] = "done"

    def set_event() -> None:
        log("before set event")

        time.sleep(2)
        
        event.set()

        log("after set event")

        results[threading.current_thread().name] = "done"

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(wait_event))
        futures.append(executor.submit(wait_event))

        time.sleep(2)

        futures.append(executor.submit(set_event))

        [f.result() for f in futures]

        assert len(results) == 3

Eventの作成。

    event = threading.Event()

待機するスレッドは、Event#waitで通知を待ちます。

    def wait_event() -> None:
        log("wait...")

        event.wait()

        log("wake up")

        results[threading.current_thread().name] = "done"

そしてイベントを送るスレッドは、Event#setで待機しているスレッドをEvent#waitから抜けさせることができます。

    def set_event() -> None:
        log("before set event")

        time.sleep(2)
        
        event.set()

        log("after set event")

        results[threading.current_thread().name] = "done"

実行結果。

[2024-11-16 21:08:51] ThreadPoolExecutor-0_0 - wait...
[2024-11-16 21:08:51] ThreadPoolExecutor-0_1 - wait...
[2024-11-16 21:08:53] ThreadPoolExecutor-0_2 - before set event
[2024-11-16 21:08:55] ThreadPoolExecutor-0_2 - after set event
[2024-11-16 21:08:55] ThreadPoolExecutor-0_0 - wake up
[2024-11-16 21:08:55] ThreadPoolExecutor-0_1 - wake up

2つのスレッドがEvent#setを待っていることがわかります。

Barrier

Barrierを使うと、複数のスレッドの待ち合わせができるようになります。

Barrier

サンプルコードはこちら。

def test_barrier() -> None:
    barrier = threading.Barrier(3)

    results = {}

    def thread1() -> None:
        log("thread1 waiting 3sec...")
        time.sleep(3)

        barrier.wait()
        log("thread1 wakeup")

        results["thread1"] = "done"

    def thread2() -> None:
        log("thread2 waiting 2sec...")
        time.sleep(2)

        barrier.wait()
        log("thread2 wakeup")

        results["thread2"] = "done"

    def thread3() -> None:
        log("thread3 waiting 5sec...")
        time.sleep(5)

        barrier.wait()
        log("thread3 wakeup")

        results["thread3"] = "done"

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(thread1))
        futures.append(executor.submit(thread2))
        futures.append(executor.submit(thread3))

        [f.result() for f in futures]

        assert results["thread1"] == "done"
        assert results["thread2"] == "done"
        assert results["thread3"] == "done"

Barrierは、コンストラクターに待ち合わせるスレッドの数を指定してインスタンスを生成します。

    barrier = threading.Barrier(3)

あとはBarrier#waitを呼び出すとそこで待機し、コンストラクターに指定した数のスレッドがBarrier#waitの呼び出しに到達すると
動き始めます。

    def thread1() -> None:
        log("thread1 waiting 3sec...")
        time.sleep(3)

        barrier.wait()
        log("thread1 wakeup")

        results["thread1"] = "done"

    def thread2() -> None:
        log("thread2 waiting 2sec...")
        time.sleep(2)

        barrier.wait()
        log("thread2 wakeup")

        results["thread2"] = "done"

    def thread3() -> None:
        log("thread3 waiting 5sec...")
        time.sleep(5)

        barrier.wait()
        log("thread3 wakeup")

        results["thread3"] = "done"

つまり、こういう動作結果になります。

[2024-11-16 21:13:53] ThreadPoolExecutor-0_0 - thread1 waiting 3sec...
[2024-11-16 21:13:53] ThreadPoolExecutor-0_1 - thread2 waiting 2sec...
[2024-11-16 21:13:53] ThreadPoolExecutor-0_2 - thread3 waiting 5sec...
[2024-11-16 21:13:58] ThreadPoolExecutor-0_2 - thread3 wakeup
[2024-11-16 21:13:58] ThreadPoolExecutor-0_1 - thread2 wakeup
[2024-11-16 21:13:58] ThreadPoolExecutor-0_0 - thread1 wakeup

最後のスレッドがBarrier#waitを呼び出すまで他のスレッドが待機し、最後のスレッドがBarrier#waitを呼び出したところで待機していた
スレッドすべてが一気に動き出します。

Timer

最後はTimerです。これは、指定した時間の後にタスクを実行する仕組みですね。

Timer

サンプルコードはこちら。スレッドの待ち合わせにはBarrierを使いました。

def test_timer() -> None:
    results = {}

    barrier = threading.Barrier(2)
    
    def task() -> None:
        log("execute task")
        
        results[threading.current_thread().name] = "done"

        barrier.wait()
        
    log("register task")

    timer = threading.Timer(3, task)
    timer.start()

    barrier.wait()

    assert len(results) == 1

Timerは、コンストラクターにタスクを起動するまでの秒数と起動するタスクを関数として指定します。

    timer = threading.Timer(3, task)

実行結果はこちら。3秒後にタスクが実行されています。

[2024-11-16 21:18:39] MainThread - register task
[2024-11-16 21:18:42] Thread-1 - execute task

こんなところでしょうか。

おわりに

Pythonでスレッドに関する標準ライブラリーをいろいろ試してみました。

使う頻度はそう多くないと思いますが、マルチスレッドを扱う時には押さえておいた方がよさそうなものばかりなので覚えておきましょう。




以上の内容はhttps://kazuhira-r.hatenablog.com/entry/2024/11/16/212101より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14