Tridentの並列数の指定方法についてまとめました.
Tridentでは,ストリームを定義する時にparallelismHint()メソッドを書くことで,直前のOperationの並列数の指定が可能.
ただし,単に記述しただけではダメみたいです.
例えば,以下の様なストリームを定義します.
Stream inputStream = topology
.newStream("sentence", spout)
.each(new Fields("field"), new Function1())
.each(new Fields("field"), new Function2(), new Fields())
.parallelismHint(10);
これを実行すると,Function2()が10並列になるかと思いきや,されません.
しかし,以下のようにFunction1()とFunction2()の間にsuffle()を挟むと,Function2()が10並列で動くようになります.
Stream inputStream = topology
.newStream("sentence", spout)
.each(new Fields("field"), new Function1())
.shuffle()
.each(new Fields("field"), new Function2(), new Fields())
.parallelismHint(10);
以前書いた記事「Apache Storm Trident Tutorial まとめ - しろかい!」に書いてあるように,Tridentでは実行時に自動的にSpout/Boltに各Operationが変換されます.その際,.shuffle()やgroupBy()と言った,ネットワーク通信が必須となる(Tupleを別のノードに送る)操作で区切って,Boltを生成します.
前者のソースコードでは,Function1()とFunction2()は1つのBoltにまとめられてしまい,またSpoutと同一ノードで処理可能であるため,並列化されません(Spoutは1個なので,その数に合わせられてしまう).
一方,後者のソースコードでは.shuffle()はネットワーク通信が必須であるため,Function1()とFunction2()は別々のBoltとして生成されます.その際, parallelismHint(10)を指定しているので,10個に並列されます.
ちなみに,以下のように書くと,Function1()とFunction2()がまとめられたBoltが10個生成されます.
Stream inputStream = topology
.newStream("sentence", spout)
.shuffle()
.each(new Fields("field"), new Function1())
.each(new Fields("field"), new Function2(), new Fields())
.parallelismHint(10);
さらにもう1つ.
以下のcodeを実行すると,Function1()のBoltは10個,Function2()のBoltは1個生成されます.
Stream inputStream = topology
.newStream("sentence", spout)
.shuffle()
.each(new Fields("field"), new Function1())
.parallelismHint(10);
.shuffle()
.each(new Fields("field"), new Function2(), new Fields())
.parallelismHint(1);
以上,Tridentでの並列数の指定方法でした.