インクリメント・デクリメントの非アトミック性
最初に前置きとして、インクリメントとデクリメントは分割不可能の最小単位の操作(=アトミックな操作)じゃないですよと言う話をしたいと思います。
C#にインクリメント演算子(p++)とデクリメント演算子(p--)という操作がありますが、実装上の見た目は1行で記述できるのでこの操作って分割不可能の最小の処理単位で一発で値が変更されていると思うかもしれません。
が、実際は「MSDNのページ」によると以下の3ステップで計算が実行されます。
// このインクリメント処理は、、、 i++; // ★実際は以下の3ステップで実行される! // 1) インスタンス変数からレジスタに値を読み込む // ↓ // 2) 値をインクリメントする // ↓ // 3) 値をインスタンス変数に格納する
なので、複数スレッドで変数をインクリメントしまくると (1) ~ (3) の間にそれぞれの処理が入り込んで全部足しても合計値が足りない、、、という状態になります。
この現象は以下のような実装で簡単に確認できます。
internal class Program { // 確認用の処理 static void BadTest() { // インクリメントの合計 int total = 0; // 大量に(1万回)並列実行を行ってインクリメントを実行する Parallel.For(0, 10000, i => { total++; }); // 結果をコンソールに表示 Console.WriteLine($"total={total}"); } static void Main(string[] args) { for (int i = 0; i < 10; i++) { BadTest(); } // 【★実行結果】 //> total=2669 //> total=5180 //> total=5942 //> total=4120 //> total=4420 //> total=6078 //> total=5785 //> total=3578 //> total=5913 //> total=6990 // → ★不安定に10000回未満の結果になる } }
毎回足りない上に、全く違う値が表示されます。にひどいと2000台になったりしています。(これはPCの構成や状況にもよりますが、、、)
こういう処理は実は隙間がある
例えば同時実行数1で、処理中に追加できた要求は捨てたいのような処理があった時に以下のように実装しますがこれは実際は、隙間がある処理(正確には競合状態が発生する処理)で短時間に同時に実行される普通に同時実行されてしまいます。
if (_flag) { return; } _flag = true; // フラグ立てたし同時に1つしか実行できないでしょ風の実装 // この後に1つしか実行されない予定の処理が続く
これも以下のように簡単に確認することができます。
internal class Program { static bool _flag; static int _count; static void BadTest2() { _count = 0; Parallel.For(0, 100000, i => { if (_flag) { return; // 1つしか実行してほしくない風 } _flag = true; try { _count++; } finally { _count--; _flag = false; } }); Console.WriteLine($"_count={_count}"); } static void Main(string[] args) { for (int i = 0; i < 10; i++) { BadTest2(); } // 【★実行結果】 //> _count=107 //> _count=201 //> _count=75 //> _count=375 //> _count=362 //> _count=196 //> _count=354 //> _count=113 //> _count=335 //> _count=249 // → ★0が期待値だが不安定に0より大きい結果になる } }
相当なすり抜けが発生しているようでカウンターがおかしな値になっていることが分かると思います。
隙間を避けるためのInterlocked
最初の話に戻りますが、インクリメントとデクリメントを完全に1つのアトミック操作として実行するための「Interlocked」クラスがあります。
概ね以下のような操作があってそれぞれを割り込まれないアトミック操作として処理を実行できます。
| メソッド名 | 概要 |
|---|---|
| Increment | 1足す |
| Decrement | 1引く |
| Add | 指定した数を足す(マイナスもあり) |
| CompareExchange | 指定した値と比較して一致したら値を更新する |
このクラスでインクリメントすると最初とは違って、何度実行しても期待値が得られます。
internal class Program { static void GoodTest() { // インクリメントの合計 int total = 0; // 大量に並列実行を行ってインクリメントを実行する Parallel.For(0, 10000, i => { Interlocked.Increment(ref total); }); // 結果の出力 Console.WriteLine($"total={total}"); } static void Main(string[] args) { for (int i = 0; i < 10; i++) { GoodTest(); } // 出力: //> total=10000 //> total=10000 //> total=10000 //> total=10000 //> total=10000 //> total=10000 //> total=10000 //> total=10000 //> total=10000 //> total=10000 // → ★必ず1万回になる } }
で、隙間からのすり抜けを防止して同時実行数を1で後から来た処理は無視する実装は以下の通りです。
internal class Program { static void GoodGoodTest() { // 排他制御されてるかどうか確認する用の変数 int currentRunning = 0; int violationCount = 0; int temp = 0; // 排他制御に使うカウンタ int _flagIndex = 0; // 大量に並列実行を行ってインクリメントを実行する Parallel.For(0, 10000, i => { // ★「今が0だったら1を_flagIndexに代入する」というアトミックな処理 if (Interlocked.CompareExchange(ref _flagIndex, 1, 0) != 0) { return; // 0じゃない(つまり何か実行中)ので処理をスキップ } try { // レポート用のチェック処理 int running = Interlocked.Increment(ref currentRunning); if (running > 1) { Interlocked.Increment(ref violationCount); } temp++; // 適当な処理を実行する } finally { // レポート用のチェック処理を戻す Interlocked.Decrement(ref currentRunning); Interlocked.Exchange(ref _flagIndex, 0); // 最後に変数を元に戻す } }); // 結果の出力(同時に侵入してきた違反件数の出力) Console.WriteLine($"Count={violationCount}"); } static void Main(string[] args) { for (int i = 0; i < 10; i++) { GoodGoodTest(); } // 【★実行結果】 //> count=0 //> count=0 //> count=0 //> count=0 //> count=0 //> count=0 //> count=0 //> count=0 //> count=0 //> count=0 // → 違反件数は0なので同時実行されていない } }
と言う風に安全に同時実行数を制限することができました。
正直このままだと適用できる局面が限られるというか、実際にシステム内で使用するときは、処理が捨てられたことを検出して何かする、だとか、Dipose をどうするのかとか考慮事項がかなりあるのでそれらの要件は追加と言う形で別途検討ということになりますが、基本はこんな形です。