■ はじめに
Flink の Accumulator (アキュムレータ) に関して 徐々にメモっていく
目次
【1】Accumulator
1)補足事項
【2】Accumulatorの使い方
【3】REST API
1)/jobs/{jobid}/accumulators
【1】Accumulator
* 最終的に集約された結果を保持するオブジェクト cf. Accumulator = 蓄圧器
1)補足事項
* ジョブが終了した後で利用可能 * 全てのアキュムレータはジョブごとに1つの名前空間を共有
【2】Accumulatorの使い方
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/user_defined_functions/#accumulators--counters
https://mogile.web.fc2.com/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
1)アキュムレータオブジェクトを作成
// ここではカウンタ private val lineCounter = new IntCounter()
2)addAccumulator で追加
getRuntimeContext().addAccumulator("num-lines", this.numLine)
【3】REST API
* REST API を通じて、Accumulator に保持している値を取得可能
https://mogile.web.fc2.com/flink/flink-docs-release-1.3/monitoring/rest_api.html
https://mogile.web.fc2.com/flink/flink-docs-release-1.1/internals/monitoring_rest_api.html
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/rest_api/
1)/jobs/{jobid}/accumulators
{ "job-accumulators":[], "user-task-accumulators": [ { "name": "avglen", "type": "DoubleCounter", "value": "61.5162972" }, { "name": "rows_count", "type": "IntCounter", "value": "1" } ], "serialized-user-task-accumulators":[] }
関連記事
Apache Flink ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/05/235755
Apache Flink ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/03/01/235100
Apache Flink ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/29/000000
Apache Flink ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/07/23/161621