https://cloud.google.com/dataflow/docs/quickstarts/create-pipeline-python?hl=ja
-- 1. 前作業
gcloud init
gcloud auth list
gcloud --version
gcloud projects create project01-9999999 \
--name="project01"
gcloud config list
gcloud config set project project01-9999999
gcloud config set compute/region us-central1 --quiet
gcloud config set compute/zone us-central1-a --quiet
gcloud beta billing accounts list
gcloud beta billing projects link project01-9999999 --billing-account=111111-111111-111111
gcloud services enable compute.googleapis.com --project project01-9999999
gcloud components update
gcloud auth application-default login
-- 2. バケット作成
gsutil mb -l US gs://bucket99999999/
gsutil ls
-- 3. テストデータ作成
bq extract bigquery-public-data:bbc_news.fulltext \
gs://bucket99999999/bbc_news_fulltext.csv
gsutil ls gs://bucket99999999/bbc_news_fulltext.csv
-- 4. APIの有効化
gcloud services enable dataflow \
compute_component \
logging \
storage_component \
storage_api \
bigquery \
pubsub \
datastore.googleapis.com \
cloudresourcemanager.googleapis.com
-- 5. Python仮想環境作成
virtualenv .env
source .env/bin/activate
pip install wheel
pip install apache-beam[gcp]
pip install future
-- 6. Compute Engine のデフォルト サービス アカウントにロールを付与
PROJECT_ID=$(gcloud config get-value project)
PROJECT_NUMBER=$(gcloud projects list --filter=$PROJECT_ID \
--format="value(PROJECT_NUMBER)")
GCE_SERVICE_ACCOUNT=$PROJECT_NUMBER-compute@developer.gserviceaccount.com
echo ${PROJECT_ID}
echo ${PROJECT_NUMBER}
echo ${GCE_SERVICE_ACCOUNT}
ROLES="storage.admin dataflow.admin dataflow.worker"
for argument in $ROLES; do
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member=serviceAccount:${GCE_SERVICE_ACCOUNT} \
--role=roles/$argument
done
-- 7. パイプラインスクリプト作成
cat <<-'EOF' > wordcount.py
# -*- coding: utf-8 -*-
# 単語毎に数をカウントするジョブ
import argparse
import logging
import re
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument('--input', required=True)
parser.add_argument('--output', required=True)
known_args, pipeline_args = parser.parse_known_args(argv)
beam_options = PipelineOptions(pipeline_args)
# パイプラインの作成
with beam.Pipeline(options=beam_options) as p:
# テキストファイルを読み込んでPCollectionとしてlinesを生成
lines = p | ReadFromText(known_args.input)
# 文字を単語ごとにカウントするTransform処理
counts = (
lines
# Split : 分を単語ごとに分割
| 'Split' >> (
beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)).
with_output_types(unicode))
# PairWithOne : (単語, 1)というマップを生成
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
# GroupAndSum : 単語をキーにして、件数を集計
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# PCollectionであるcountsをオブジェクトストレージに出力
counts | WriteToText(known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
EOF
cat wordcount.py
-- 8. パイプラインスクリプト実行
python wordcount.py \
--project ${PROJECT_ID} \
--region='us-central1' \
--runner DataflowRunner \
--input gs://bucket99999999/bbc_news_fulltext.csv \
--output gs://bucket99999999/wordcount_out \
--staging_location gs://bucket99999999/staging/ \
--temp_location gs://bucket99999999/temp/
gcloud dataflow jobs list \
--region us-central1
gsutil ls gs://bucket99999999/wordcount_out*
gsutil cat gs://bucket99999999/wordcount_out* | head
-- 9. クリーンアップ
gcloud storage rm gs://bucket99999999 --recursive
gcloud storage ls
gcloud projects list
gcloud projects delete project01-9999999 \
--quiet
gcloud beta billing projects unlink project01-9999999