Pythonで並列処理・並行処理を提供する標準モジュールは数多くあり、初めてだと違いを理解するのは困難です。この記事では、それぞれの違いについて調べました。
threadモジュール(Python 2), _threadモジュール(Python 3)
かつてPython 2にはthreadモジュールという複数のスレッドを扱うためのモジュールが存在していましたが、Python 3でdeprecated扱いになりました。一応_threadモジュールという名前で残っています。公式でも述べられているように、一般には、thread/_threadモジュールではなく、より高レベルなthreadingモジュールの使用が推奨されるようです。
threadingモジュール
threadingモジュールは、先述の通り、複数のスレッドを扱うためのモジュールです。thread/_threadモジュールより高レベルとはいうものの、この後に紹介するモジュールに比べるとまだまだ低レベルで、C++11のthreadライブラリと同程度の印象を受けます。
コード例を以下に示します。threading.Threadクラスを継承したクラスを作るのが常套手段のようです。
#!/usr/bin/python3 import threading import time class MyThread(threading.Thread): def __init__(self, name, sleep_time): threading.Thread.__init__(self) self.name = name self.sleep_time = sleep_time def run(self): print ("Starting " + self.name) time.sleep(self.sleep_time) print ("Exiting " + self.name) thread_num = 3 threads = [] for i in range(thread_num): threads.append(MyThread("Thread-{}".format(i), 5 - i)) for th in threads: th.start() for th in threads: th.join() print("end")
この例では、3つのスレッドをstart()でほぼ同時に立ち上げます。3つのスレッドは、それぞれ5秒、4秒、3秒待機したのちに終了します。すべてのスレッドが終了するのをjoin()で待ってからプログラムを終了します。このプログラムを実行するとちょうど約5秒で終わることが確かめられます。
ここで、Pythonでのマルチスレッド処理は、C++とのマルチスレッド処理とは大きく異なることを知っておくのは重要です。Pythonの主要な実装系の一つであるCPythonにはGIL(Global Interpreter Lock)という機構があり、複数のスレッドが同時にPythonのバイトコードを実行することを許しません(参考:GlobalInterpreterLock - Python Wiki)。なので、例えばCPU速度がボトルネックになるような重い計算処理(いわゆるCPU boundな処理)を、このthreadingを使って複数のスレッドに割り振って動かしたとしても、実際にはGILの制約のために複数のスレッドが同時に実行されることはなく、処理時間は期待したように短くならないはずです。
一方、ディスクの読み出し・書き込みなどのI/O待ち時間が大量に発生するような処理(いわゆるI/O boundな処理)であれば、GILは問題にならないので、このthreadingを使ってマルチスレッド化することで処理時間が早くなる可能性があります。
threadingモジュールは後述する他のモジュールに比べて自由度が高い分、デッドロックやデータ競合が起こらないように十分考慮してプログラムを組む必要があります。以下は、リソースをロックする順番をめぐってデッドロックしてしまう例です。
# cf. http://dabeaz.blogspot.jp/2009/11/python-thread-deadlock-avoidance_20.html import threading a_lock = threading.Lock() b_lock = threading.Lock() def foo(): with a_lock: with b_lock: print ("a -> b") def bar(): with b_lock: with a_lock: print ("b -> a") class MyThread(threading.Thread): def __init__(self, func): threading.Thread.__init__(self) self.func = func def run(self): while True: self.func() th1 = MyThread(foo) th2 = MyThread(bar) th1.start() th2.start() th1.join() th2.join()
multiprocessingモジュール
multiprocessingモジュールは、複数のプロセスを扱うためのモジュールです。スレッドの代わりにサブプロセスを立ち上げてそちらで処理させることで、GILの問題を回避することができます。ただし、サブプロセスの立ち上げは、スレッドの立ち上げに比べると重い処理なので、本当にプロセス単位での並列化が必要なのか、言い換えると、スレッド単位の並列化で十分ということはないか、一考が必要です。
multiprocessing.Process
multiprocessing.Processを使った例を以下に示します。先に示した、threading.Threadクラスを用いた例と使い方は同じです。こちらもプログラマが使い方を誤るとデッドロックやデータ競合を引き起こすので、要注意です。
#!/usr/bin/python3 import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, name, sleep_time): multiprocessing.Process.__init__(self) self.name = name self.sleep_time = sleep_time def run(self): print ("Starting " + self.name) time.sleep(self.sleep_time) print ("Exiting " + self.name) process_num = 3 processs = [] for i in range(process_num): processs.append(MyProcess("Process-{}".format(i), 5 - i)) for th in processs: th.start() for th in processs: th.join() print("end")
multiprocessing.Pool
さらに、multiprocessingモジュールは、「データを複数プロセスにばらまいて、複数プロセスで計算させ、結果を集める」(fork-join)という、並列処理でよくあるユースケースを実現する専用のAPIを追加で提供しています。それがmultiprocessing.Poolです。
例えば、「実数からなるあるリストが与えられたとき、リストの各要素を2乗したリストを出力」する処理を、multiprocessing.Poolを用いて4プロセス並列で実行する例を以下に示します。
import multiprocessing def pow2(n): return n * n before = list(range(100000000)) with multiprocessing.Pool(4) as p: after = p.map(pow2, before) print(before[:5]) # [0, 1, 2, 3, 4] print(after[:5]) # [0, 1, 4, 9, 16]
このプログラムを実行中にpsコマンドなどでプロセスを見ると、メインプロセスに加えてサブプロセスが4つ立ち上がっているのを観察できると思います。
より複雑な機能に関しては公式ドキュメントをご覧ください。
multiprocessing.dummy.Pool
multiprocessingには、あまり知られていないmultiprocessing.dummy.Poolモジュールが存在しています。前の節で紹介したmultiprocessing.Poolはプロセス単位で処理を並列化したのに対し、このmultiprocessing.dummy.Poolはスレッド単位で処理を並列化します。ともにAPIは同じです。
前節のコードを、スレッド単位で並列化するように変更した例を以下に示します。
import multiprocessing.dummy def pow2(n): return n * n before = list(range(100000000)) with multiprocessing.dummy.Pool(4) as p: # この行が変わっただけです after = p.map(pow2, before) print(before[:5]) print(after[:5])
プロセス単位での並列化と、スレッド単位での並列化を、1行の書き換えのみで簡単に切り替えられるので、並列化の対象となる処理がCPU boundかI/O boundかを実際に確かめたいときに使えるテクニックだと思います。参考:multithreading - How to use threading in Python? - Stack Overflow
concurrent.futuresモジュール
Python3.2からconcurrent.futuresモジュールが提供されるようになりました。Python 2.x系でもPyPIから同名のパッケージを取得可能です。
これまで見てきたようなマルチスレッドやマルチプロセスの処理を隠蔽して、複数の処理を同時に行うための抽象度の高い機能を提供します。具体的には、Futureと呼ばれるクラスを提供することで、「非同期処理が完了した状態、または、未完了の状態」を表すことができるようになります。このモジュールの導入の経緯については、PEP 3148 -- futures - execute computations asynchronously | Python.org に詳しいです。
concurrent.futuresが提供するメインの機能は、futures.ThreadPoolExecutorとfutures.ProcessPoolExecutorです。それぞれ、マルチスレッド処理、マルチプロセス処理を扱いたい時に用います。まず、futures.ProcessPoolExecutorを使って、1000個のタスクをプロセス並列で同時に実行する例を以下に示します。(最初はmultiprocessing.Pool()の例と同じくサイズ1億で試したのですが、メモリ使用量が際限なく増えてしまいました)
from concurrent import futures def pow2(n): return n * n before = list(range(1000)) with futures.ProcessPoolExecutor(max_workers=4) as executor: after = executor.map(pow2, before) print(before[:5]) print(after[:5])
これは今まで見てきたmultiprocessing.Pool()とかなり似た書き方です。
上の例ではFutureという概念は隠蔽されていてますが、陽に扱うこともできます。以下に、map()を使わずFutureオブジェクトを用いて並列処理する例を示します。
#!/usr/bin/python3 from concurrent import futures def pow2(n): return n * n before = list(range(1000)) with futures.ThreadPoolExecutor(max_workers=4) as executor: print("submission starts") to_do = [] for num in before: future = executor.submit(pow2, num) to_do.append(future) print("submission ends") after = [] for future in futures.as_completed(to_do): res = future.result() after.append(res) print(before[:5]) print(after[:5])
実行例は以下です。処理順は不定です。
submission starts submission ends [0, 1, 2, 3, 4] [262144, 244036, 122500, 163216, 280900]
concurrent.futuresモジュール
concurrent.futuresモジュールはPython 3.4から導入された、イベントループに基づく非同期処理を行うためのモジュールです。
このモジュールは公式ドキュメントの量が半端ではなく、自分もあまり理解できていないため、紹介のみにとどめます。
まとめ
以下の方針でモジュールを選ぶのがよいと思います。
- 処理はI/O boundである
- 処理はfork-joinモデルで並列化できる
- →
multiprocessing.dummy.Poolを使う - → または、
futures.ThreadPoolExecutorのmap関数を使う
- →
- 処理はもっと複雑
- →
futures.ThreadPoolExecutorのsubmit関数を使って、タスク単位に処理を行う - → または、
threadingを使って、さらに柔軟にモデルを組み立てる
- →
- 処理はfork-joinモデルで並列化できる
- 処理はCPU boundである
- 処理はfork-joinモデルで並列化できる
- →
multiprocessing.Poolを使う - → または、
futures.ProcessPoolExecutorのmap関数を使う
- →
- 処理はもっと複雑
- →
futures.ProcessPoolExecutorのsubmit関数を使って、タスク単位に処理を行う - → または、
multiprocessingを使って、さらに柔軟にモデルを組み立てる
- →
- 処理はfork-joinモデルで並列化できる