以下の内容はhttps://suzuki-navi.hatenablog.com/entry/2020/10/06/225359より取得しました。


Google Cloud DataflowをJavaで動かしてみる

Google Cloud Dataflowを試してみるべく、GCP公式サイトにあるチュートリアルをやりました。

Java と Apache Maven を使用したクイックスタート  |  Cloud Dataflow  |  Google Cloud

このチュートリアルでダウンロードできるJavaソースコードはファイル数が多く、コードも長いです。そこで、処理内容はできるだけそのままで、必要な要素を理解しやすいようにコードを短くしたサンプルを作成しました。

前提

ソースコード

pom.xmlJavaソースの2ファイルです。

├── pom.xml
└── src
    └── main
        └── java
            └── org
                └── example
                    └── MySample.java

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.example</groupId>
  <artifactId>mysample</artifactId>
  <version>0.1</version>
  <packaging>jar</packaging>
  <properties>
    <beam.version>2.20.0</beam.version>
    <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
    <slf4j.version>1.7.25</slf4j.version>
  </properties>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>${maven-compiler-plugin.version}</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>

  <profiles>
    <profile>
      <id>dataflow-runner</id>
      <activation><activeByDefault>true</activeByDefault></activation>
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>
  </profiles>

  <dependencies>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>${slf4j.version}</version>
      <scope>runtime</scope>
    </dependency>
  </dependencies>
</project>

MySample.java

package org.example;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

public class MySample {

  static class ExtractWordsFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(@Element String element, OutputReceiver<String> receiver) {
      String[] words = element.split(" ", -1);
      for (String word : words) {
        if (!word.isEmpty()) {
          receiver.output(word);
        }
      }
    }
  }

  static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
      return wordCounts;
    }
  }

  static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Required
    String getInputPath();
    void setInputPath(String value);

    @Description("Path of the file to write to")
    @Required
    String getOutputPath();
    void setOutputPath(String value);
  }

  static void runWordCount(WordCountOptions options) {
    Pipeline p = Pipeline.create(options);

    String inputPath = options.getInputPath();
    String outputPath = options.getOutputPath();

    p.apply("ReadLines", TextIO.read().from(inputPath)).
      apply(new CountWords()).
      apply(MapElements.via(new FormatAsTextFn())).
      apply("WriteCounts", TextIO.write().to(outputPath));

    p.run().waitUntilFinish();
  }

  public static void main(String[] args) {
    WordCountOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);

    runWordCount(options);
  }
}

実行方法

$ mvn compile exec:java -Dexec.mainClass=org.example.MySample -Dexec.args="--project=PROJECT_ID --region=us-central1 --stagingLocation=STAGING_GS_PATH --inputPath=gs://apache-beam-samples/shakespeare/kinglear.txt --outputPath=OUTPUT_GS_PATH --runner=DataflowRunner"

上記コマンドの中、以下の3箇所は自分の環境に合わせて書き換えます。

  • PROJECT_ID: GCPのプロジェクトID、
  • STAGING_GS_PATH: Dataflowの実行に必要なjarファイルなどを置くGCSのパス。このパスの中に自動で必要なファイルがアップロードされる。例: gs://sample-bucket/dataflow/staging/
  • OUTPUT_GS_PATH: 出力先GCSパス。 例: gs://sample-bucket/dataflow/output と指定すると gs://sample-bucket/dataflow/output-00000-of-00003 のような名前のファイルが作成される

コマンドの中で入力として指定している gs://apache-beam-samples/shakespeare/kinglear.txt にはApacheが用意しているサンプル入力ファイルがあり、誰でも読み込み可能です。

このコマンドを実行すると、3分弱で完了します。GCPコンソールでDataflowのジョブを見ると次のように表示されています。

f:id:suzuki-navi:20201006224734p:plain

以上。




以上の内容はhttps://suzuki-navi.hatenablog.com/entry/2020/10/06/225359より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14