以下の内容はhttps://htn20190109.hatenablog.com/entry/2025/02/01/140454より取得しました。


{GCP Dataflow}Python を使用して Dataflow パイプラインを作成する

 


https://cloud.google.com/dataflow/docs/quickstarts/create-pipeline-python?hl=ja

 

-- 1. 前作業

gcloud init
gcloud auth list

gcloud --version

gcloud projects create project01-9999999 \
--name="project01"

gcloud config list
gcloud config set project project01-9999999
gcloud config set compute/region us-central1 --quiet
gcloud config set compute/zone us-central1-a --quiet

gcloud beta billing accounts list
gcloud beta billing projects link project01-9999999 --billing-account=111111-111111-111111

gcloud services enable compute.googleapis.com --project project01-9999999

gcloud components update

gcloud auth application-default login


-- 2. バケット作成


gsutil mb -l US gs://bucket99999999/
gsutil ls

-- 3. テストデータ作成


bq extract bigquery-public-data:bbc_news.fulltext \
gs://bucket99999999/bbc_news_fulltext.csv


gsutil ls gs://bucket99999999/bbc_news_fulltext.csv


-- 4. APIの有効化

 

gcloud services enable dataflow \
compute_component \
logging \
storage_component \
storage_api \
bigquery \
pubsub \
datastore.googleapis.com \
cloudresourcemanager.googleapis.com


-- 5. Python仮想環境作成


virtualenv .env
source .env/bin/activate


pip install wheel
pip install apache-beam[gcp]
pip install future


-- 6. Compute Engine のデフォルト サービス アカウントにロールを付与


PROJECT_ID=$(gcloud config get-value project)
PROJECT_NUMBER=$(gcloud projects list --filter=$PROJECT_ID \
--format="value(PROJECT_NUMBER)")

GCE_SERVICE_ACCOUNT=$PROJECT_NUMBER-compute@developer.gserviceaccount.com


echo ${PROJECT_ID}
echo ${PROJECT_NUMBER}
echo ${GCE_SERVICE_ACCOUNT}

 

ROLES="storage.admin dataflow.admin dataflow.worker"
for argument in $ROLES; do
  gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member=serviceAccount:${GCE_SERVICE_ACCOUNT} \
    --role=roles/$argument
done

 

-- 7. パイプラインスクリプト作成

cat <<-'EOF' > wordcount.py


# -*- coding: utf-8 -*-

# 単語毎に数をカウントするジョブ

import argparse
import logging
import re

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

 

def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', required=True)
    parser.add_argument('--output', required=True)
    known_args, pipeline_args = parser.parse_known_args(argv)

    beam_options = PipelineOptions(pipeline_args)

 

    # パイプラインの作成
    with beam.Pipeline(options=beam_options) as p:

        # テキストファイルを読み込んでPCollectionとしてlinesを生成
        lines = p | ReadFromText(known_args.input)

        # 文字を単語ごとにカウントするTransform処理
        counts = (
            lines
            # Split : 分を単語ごとに分割
            | 'Split' >> (
                beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)).
                with_output_types(unicode))
            # PairWithOne : (単語, 1)というマップを生成
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            # GroupAndSum : 単語をキーにして、件数を集計
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

        # PCollectionであるcountsをオブジェクトストレージに出力
        counts | WriteToText(known_args.output)


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

EOF

cat wordcount.py

 

 

-- 8. パイプラインスクリプト実行

python wordcount.py \
--project ${PROJECT_ID} \
--region='us-central1' \
--runner DataflowRunner \
--input gs://bucket99999999/bbc_news_fulltext.csv \
--output gs://bucket99999999/wordcount_out \
--staging_location gs://bucket99999999/staging/ \
--temp_location gs://bucket99999999/temp/ 

 

gcloud dataflow jobs list \
--region us-central1


gsutil ls gs://bucket99999999/wordcount_out*

gsutil cat gs://bucket99999999/wordcount_out* | head

 


-- 9. クリーンアップ

 


gcloud storage rm gs://bucket99999999 --recursive
gcloud storage ls


gcloud projects list
gcloud projects delete project01-9999999 \
--quiet


gcloud beta billing projects unlink project01-9999999

 

 

 




以上の内容はhttps://htn20190109.hatenablog.com/entry/2025/02/01/140454より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14