「Groovy で Apache Spark を使用」と同様の処理を Apache Flink で試してみました。
今回のソースは http://github.com/fits/try_samples/tree/master/blog/20170311/
サンプルスクリプト
今回はローカルで実行するだけなので ExecutionEnvironment.createLocalEnvironment() で取得した LocalEnvironment を使用します。
map メソッドの引数へ Groovy のクロージャを使ったところ、org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction is not serializable. ・・・ となってしまい駄目でしたので、MapFunction の実装クラスを定義しました。
その場合、MapFunction の型引数をきちんと指定する必要があります。(そうしないと InvalidTypesException が発生)
なお、flink-clients_2.10 を使用する場合、scala-library の @Grab 定義は不要でした。(flink-clients_2.11 の場合のみ scala-library が必要)
money_count.groovy
@Grapes([
@Grab('org.apache.flink:flink-java:1.2.0'),
@GrabExclude('io.netty#netty;3.7.0.Final')
])
@Grab('org.apache.flink:flink-clients_2.11:1.2.0')
@Grab('org.scala-lang:scala-library:2.11.8')
@Grab('org.jboss.netty:netty:3.2.10.Final')
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.java.tuple.Tuple2
import groovy.transform.CompileStatic
// @CompileStatic は必須ではない(無くても動作する)
@CompileStatic
class ToTuple implements MapFunction<String, Tuple2<String, Integer>> {
Tuple2 map(String v) {
new Tuple2(v, 1)
}
}
def env = ExecutionEnvironment.createLocalEnvironment()
env.readTextFile(args[0]).map(new ToTuple()).groupBy(0).sum(1).print()
groupBy メソッドではグルーピング対象とする項目を、sum メソッドでは合計する項目を数値で指定します。
実行
Groovy のデフォルト設定では java.lang.IllegalArgumentException: Size of total memory must be positive. が発生しましたので、JAVA_OPTS 環境変数で最大メモリサイズ (-Xmx) を変更して実行します。
実行結果
> set JAVA_OPTS=-Xmx512m > groovy money_count.groovy input_sample.txt Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1033779003] 03/08/2017 00:56:11 Job execution switched to status RUNNING. ・・・ (10000,2) (10,2) (100,2) (50,1) (500,1) (1,2) (1000,3) (2000,1) (5,3)
input_sample.txt の内容は以下の通りです。
input_sample.txt
100 1 5 50 500 1000 10000 1000 1 10 5 5 10 100 1000 10000 2000