並行処理を実現する機能として最初に触れたのがOSのスレッド(あとメモリ保護の無い環境のリアルタイムOSのタスク)である。そのせいか、スレッドの生成・終了のコストが気になってしまい、ワンショットの処理をスレッドで行わせる(≒頻繁にスレッドを生成して終了させる)ことについて、未だに抵抗がある。
(軽量スレッドの類を使える環境にない、ということもあるが……)
そんな訳で、一度生成したスレッドを使い回すコードを書くことが多い。スレッドの中でループさせておいて、外部から「実行する処理」を依頼する構造だ。
クラスの中に閉じ込めるのならば、コンストラクタでスレッドを生成して、デストラクタで終了させる、といった感じになる。
ここで、デストラクタにおいて「どうやってスレッドを終了させるか?」という問題が生じる。頻繁にスレッド外からスレッド内に処理を依頼するケースでは、高確率でスレッド間通信の仕組みを用意することになるから、それに乗っかって「スレッドを終了しろ」というメッセージを送ればよい。でも、そのような仕組みが用意されていないのなら、どうだろうか?
伝統的なスレッド・プログラミングに不慣れな開発者が書いてしまいがちなコードは、こんな感じだろうか?
#include <chrono> #include <iostream> #include <thread> using namespace std::literals::chrono_literals; using std::this_thread::sleep_for; class Foo final { private: bool m_stop { false }; std::thread m_thread; public: Foo() { using std::cout, std::endl; m_thread = std::thread([&]{ cout << "start thread" << endl; while (!m_stop) { cout << "loop" << endl; sleep_for(1ms); } cout << "stop thread" << endl; }); } ~Foo() { m_stop = true; m_thread.join(); } };
スレッドの終了を通知するためにbool型の変数m_stopを用いている。スレッド内部で、周期的にこの変数の変化を監視する訳だ。
このコードには問題がある。C++の仕様に少しばかり詳しいならば、あるメモリ上のオブジェクトにたいして排他制御無しで「値を参照するスレッド」と「値を変更するスレッド」が存在するコードは未定義の動作(データ競合)を引き起こす、ということに気づくだろう。
安直に直すなら、例えば変数m_stopをbool型ではなくstd::atomic_bool型にすればよいだろう。
#include <atomic> #include <chrono> #include <iostream> #include <thread> using namespace std::literals::chrono_literals; using std::this_thread::sleep_for; class Foo final { private: std::atomic_bool m_stop { false }; std::thread m_thread; public: Foo() { using std::cout, std::endl; m_thread = std::thread([&]{ cout << "start thread" << endl; while (!m_stop.load()) { cout << "loop" << endl; sleep_for(1ms); } cout << "stop thread" << endl; }); } ~Foo() { m_stop.store(true); m_thread.join(); } };
もう1つ気になることがある。スレッドの中でwhile (!m_stop)といった感じでm_stopを監視している訳だが、スレッドの中にも、スレッドを生成しているコンストラクタの中にも、変数m_stopを書き換えるコードが1つも存在しない。
そのため、もしかしたらコンパイル時に次のようなコードに最適化されてしまうかもしれない、という疑念が生じる。
cout << "start thread" << endl; if (m_stop) { cout << "stop thread" << endl; return; } while (1) { cout << "loop" << endl; sleep_for(1ms); }
このような最適化を邪魔するにはvolatileを使えばよいだろう。
#include <chrono> #include <iostream> #include <thread> using namespace std::literals::chrono_literals; using std::this_thread::sleep_for; class Foo final { private: volatile bool m_stop { false }; std::thread m_thread; public: Foo() { using std::cout, std::endl; m_thread = std::thread([&]{ cout << "start thread" << endl; while (!m_stop) { cout << "loop" << endl; sleep_for(1ms); } cout << "stop thread" << endl; }); } ~Foo() { m_stop = true; m_thread.join(); } };
std::atomicとvolatileは別の機能で、かつ併用することが可能だ。今回のケースでは変数m_stopをvolatile std::atomic_bool型にすればよさそうだ。
#include <atomic> #include <chrono> #include <iostream> #include <thread> using namespace std::literals::chrono_literals; using std::this_thread::sleep_for; class Foo final { private: volatile std::atomic_bool m_stop { false }; std::thread m_thread; public: Foo() { using std::cout, std::endl; m_thread = std::thread([&]{ cout << "start thread" << endl; while (!m_stop.load()) { cout << "loop" << endl; sleep_for(1ms); } cout << "stop thread" << endl; }); } ~Foo() { m_stop.store(true); m_thread.join(); } };
これで終わりだろうか? 問題なく動作するように見えるコードだが、同時に、ちょっとばかり泥臭いコードにも見える。もう少しスマートな方法はないだろうか?
最近気づいたのは、1回限りのスレッド間通信にはstd::futureとstd::promiseを使えばよい、ということだ。
#include <chrono> #include <future> #include <iostream> #include <thread> using namespace std::literals::chrono_literals; using std::this_thread::sleep_for; class Foo final { private: std::promise<void> m_pr_stop; std::thread m_thread; public: Foo() { using std::cout, std::endl; m_thread = std::thread([&, fu_stop = std::move(m_pr_stop.get_future())]{ cout << "start thread" << endl; while (fu_stop.wait_for(0ms) == std::future_status::timeout) { cout << "loop" << endl; sleep_for(1ms); } cout << "stop thread" << endl; }); } ~Foo() { m_pr_stop.set_value(); m_thread.join(); } };
元のコードの構造を保持するために、while文の判定式ではwait_for(0ms)でスリープさせず、while文の最後でsleep_for(1ms)している。初回の処理のタイミングが多少遅れても構わないのならば、wait_for(1ms)にして末尾のsleep_for(1ms)を取り除いてしまえばよいだろう。