以下の内容はhttps://makky12.hatenablog.com/より取得しました。


【Step Functions】ファイル処理Map系アクションにおける「同時実行数」の設定項目についての詳細と挙動

今回の内容

AWS Step Functions(以下「Step Functions」)のファイル処理系Mapアクション(CSV/JSONファイルなど)における、「同時実行数」関連の設定項目について調べてみました

概要

こんにちは。

Step Functionsで(S3バケットなどにある)、下図のようなCSVやJSONファイルを処理するワークフローを書くことがあるかもしれません(Step Functionsのテンプレートにもあるくらいですし)

その際、各アクションに「同時実行数」に関して、以下の設定項目が用意されています。

親アクション(File Analysys)

  • バッチあたりの最大項目数
  • 同時実行の制限

子アクション(Map)

  • 同時実行数の制限を設定

これらについて、詳細と実際の挙動が良くわからなかったので、調べてみました。

親アクション

バッチあたりの最大項目数

これは、「1回の処理(=子アクションにデータを渡す処理)につき、CSVやJSONファイルから何個のデータを取得するか」を意味します。

例えば、100個のデータがあるCSVファイルについて、「バッチあたりの最大項目数」を10に設定した場合、「子アクションにデータを渡す処理」は10回発生することになります。

なおこの「子アクションにデータを渡す」処理は、ワークフロー全体で1回に処理できるデータ個数(後述)を満たすまで、ずっと続けられます。

同時実行の制限

これは、子アクション(上図で言えば「Map」内の処理)を何個同時に実行するか、を指します。
言い換えれば「子アクションを何並列で実行するか」となります。

例えば「バッチあたりの最大項目数」を5に設定した場合、Map内の処理を5並列同時に実行します。

重要なのは、「ワークフロー全体で1回に処理できるデータ個数 = バッチあたりの最大項目数 * 同時実行の制限」 という事です。

なので先ほどの100個のデータがあるCSVファイルについて、

  • 「バッチあたりの最大項目数」を10に設定
  • 「同時実行の制限」を5に設定

した場合(※1)、ワークフロー全体で1回に処理できるデータ個数は10 * 5 = 50個となりますので、ワークフロー全体を2回実行することで全データ処理を完了できます。

子アクション

同時実行数の制限を設定

これは親アクションの「同時実行の制限」と同じで、孫アクション(=上図で言うLambda実行)を何個同時に実行するか(=何並列で実行するか)になります。

例えば親アクションで(※1)を設定した状態で「同時実行数の制限を設定」を2に設定した場合、10個のデータを2個並列実行で処理するので、各孫アクションにおいて、Lambda実行は5回実行されます。

なお注意点として、ここでの「同時実行数の制限を設定」の値は「ワークフロー全体で1回に処理できるデータ個数」には影響しません。

※親アクションの「バッチあたりの最大項目数」を超える値は設定できないので。例えば「バッチあたりの最大項目数」を10の状態で「同時実行数の制限を設定」を1にしようが10にしようが、ワークフロー全体で1度に処理できる個数は10個で変わりません。

図解

上記を図でまとめると、下図の通りになります。

まとめ

以上、Step Functionsのファイル処理系Mapアクションにおける「同時実行数」の設定項目についての詳細と挙動でした。

自分が業務で触っていてちょっとややこしかった部分だったので、自分の理解の復習もかねて、このブログを書きました。

もし他の方でもこの辺がややこしい、と思っている方がいらっしゃったら、その方にとって参考になれば幸いです。

では、今回はこの辺で

【AWS CDK】Lambda Durable Functionsをトリガ起動で呼び出す

はじめに

前回、AWS CDKでLambda Durable Functionsを作成する方法を紹介しました。

今回はその続きで、前回の最後に記載した「Lambda Durable Functionsをトリガ起動で呼び出す」方法を紹介します。

なお本記事では、Lambdaのトリガ元としてメジャーなAPI Gatewayを例に説明します。

参考サイト

従来の定義ではNG

通常、AWS CDKでAPI GatewayをトリガにLambdaを呼び出すには、下記のようなコードを記載します。(import文やpropsは省略)

// Lambda定義
const sampleFunc = new NodejsFunction(this, 'SampleFunction', {
   ...(略)
})  
  
// API Gateway定義
const restApi = new RestApi(this, `RestApi`, {
   ...(略)
});
  
// メソッド&それをトリガに起動するLambdaを指定  
const sampleResource = restApi.root.addResource('sample');
sampleResource.addMethod('POST', new LambdaIntegration(sampleFunc));  

しかし、Lambda Durable Functionsの場合、上記の定義ではエラーとなり、デプロイできません。

これは前回関係した「修飾ARN」「非修飾ARN」が関係しており、Lambda Durable Functionsは修飾ARN(末尾にバージョン番号やエイリアスなどが付いたARN)を指定しないといけないのですが、上記定義だと非修飾ARN(末尾にバージョン番号やエイリアスなどが付かないARN)で指定してしまうため、エラーとなってしまいます。

実際、エラーメッセージも「You cannot invoke a durable function using an unqualified ARN(=非修飾ARNではDurable Functionを実行できません)」となります。

Aliasを使う

ではどうやるかというと、CDKでエイリアスを作成すればOKです。
一例として、下記のコードでエイリアスを作成できます。

// Lambda Durable Functions定義(詳細は前回を参照)
const sampleFunc = new NodejsFunction(this, 'SampleFunction', {
   ...(略)
})  
  
// エイリアスを追加
const sampleAlias = sampleFunc .addAlias("latest");  

そしてAPI Gateway側で LambdaIntegration に指定する引数に、Lambda関数ではなくエイリアスを指定すればOKです。

// API Gatewayのメソッド&それをトリガに起動するLambdaにエイリアスを指定
sampleResource.addMethod('POST', new LambdaIntegration(sampleAlias));

今度はちゃんとデプロイできますし、もちろんLambda Durable FunctionもRest API経由で正しく実行できます。

ちなみに LambdaIntegration の第一引数の型は Function ではなく IFunction なので、Lambda関数ではないAliasも問題なく指定可能です。

API Gateway以外のトリガは?

S3やDynamoDBなど、API Gateway以外のトリガ元ですが、これも同様にAliasを使用すればOKです。

これらをトリガ元にしてLambda Durable Functionsを起動する場合、最終的に下記コンストラクタを使用すると思います。

  • aws-s3-notifications.LambdaDestination()
  • aws-events-targets.LambdaFunction()

これらコンストラクタも、第一引数に指定するのは IFunction なので、Aliasも問題なく指定可能です。

またAliasは addFunctionUrl() メソッドを持っているので、関数URLも問題なく指定可能です。

参考:class Alias (construct)

まとめ

以上、Lambda Durable Functionsをトリガ起動で呼び出す方法でした。

通常のLambda同様、Lambda Durable Functionsをトリガ起動で呼び出すことも今回記載した方法で可能なので、もし「You cannot invoke a durable function using an unqualified ARN」エラーで困っていた方は、ぜひ参考にしていただければと思います。

宣伝

3/18(水)19時から、JAWS-UG 名古屋で「全員LT登壇会 No Audience, Only Builders」というイベントが開催されます。

まだまだ参加者募集中のようですので、よろしければぜひご参加ください(私もあるCDKネタで話します)

イベントページ

では、今回はこの辺で。

【AWS CDK】AWS CDKでLambda Durable Functionsを作成する

はじめに

皆さん、あけましておめでとうございます。(もう2月ですが...)
今年もよろしくお願い致します。

先日、今年初の活動として、1/24(土)に群馬県高崎市で開催された「JAWS-UG 北陸新幹線 #5 in 群馬」にて、Lambda Durable Functionsについて発表しました。

https://jawsug-gunma.connpass.com/event/373298/jawsug-gunma.connpass.com

※発表資料はこちら
speakerdeck.com

ただこの発表では実装、特にAWS CDKについて触れられなかったので、この記事で「AWS CDKでLambda Durable Functionsを作成する方法」を紹介したいと思います。

参考サイト

やること

  • AWS CDK(aws-cdk-lib)のバージョンアップ&@aws/durable-execution-sdkのインストール
  • CDKのコードを書く
  • 動作確認

AWS CDK(aws-cdk-lib)のバージョンアップ&@aws/durable-execution-sdkのインストール

Lambda Durable Functionsの作成にはAWS CDKのバージョン2.232以上が必要なので、必要に応じてバージョンアップをしてください。

またLambda Durable Functionのソース作成では「@aws/durable-execution-sdk」が必要なので、上記「参考サイト」のLambda durable functions(AWS公式サイト)を参考にインストールを行ってください。(TypeScriptとPythonがあります)

CDKのコードを書く

Lambda Durable Functionsについては、AWS CDKに専用のコンストラクタがあるわけではなく、Lambdaコンストラクタに専用のpropsを設定する形になります。

詳細は下のソースコードをベースに説明します。 (必要な定義のみ記載)

// import文は省略しています
// 1.ロールを作成する
const durableRole = new Role(this, 'DurableFunctionRole', {
  assumedBy: new ServicePrincipal('lambda.amazonaws.com'),
  managedPolicies: [
    ManagedPolicy.fromAwsManagedPolicyName(
      'service-role/AWSLambdaBasicDurableExecutionRolePolicy'
    ),
  ],
});
  
// 2.Lambda Durable Functionsを作成
// aws_lambda.FunctionコンストラクタでもOKです
const durableFunc = new NodejsFunction(this, 'DurableFunction', {
  entry: path.resolve(__dirname, '../lambda/durable.ts'),
  runtime: Runtime.NODEJS_24_X,
  timeout: cdk.Duration.seconds(30),  
  // これが重要!
  durableConfig: {
    executionTimeout: cdk.Duration.minutes(15),
    retentionPeriod: cdk.Duration.days(1),
  },
  role: durableRole,
});
  
// 3. エイリアスの作成
const durableFuncAlias = durableFunc.addAlias("latest");

1.ロールを作成する

Lambda Durable Functionsの実行には、下記アクションの実行権限が必要になります。

  • lambda:CheckpointDurableExecution
  • lambda:GetDurableExecutionState

これはAWS管理ポリシー「AWSLambdaBasicDurableExecutionRolePolicy」で設定されているので、このポリシーをアタッチしたロールを作成しておきます。

※なお、この「AWSLambdaBasicDurableExecutionRolePolicy」ポリシーには「AWSLambdaBasicExecutionRole」ポリシーの設定も含まれています

2. Lambda Durable Functionsを作成

先述の通り、CDKにはLambda Durable Functions専用のコンストラクタはありません。

ポイントは durableConfig propsで、この定義があるLambda関数はLambda Durable Functionsとして認識されます。

この durableConfig で設定する項目は以下2項目です。(最後の「Q&A」で補足説明をしています)

項目 説明 必須or任意 設定範囲
executionTimeout Lambda Durable Functionsで作成したワークフローを実行できる最大期間(=ワークフローの生存期間) 必須 1~31,622,400秒(=366日)
retentionPeriod チェックポイントなどの情報を保存可能な期間 任意 1~90日(デフォルト14日)

durableConfig の注意点は、以下2つです。

  • 該当Lambda関数の新規作成時に設定する必要があります。
    • 一度作成した(デプロイした)Lambda関数に後から「durableConfig」を追加しても、Lambda Durable Functionsとはなりません。
  • 逆に、一度作成したLambda Durable Functionsの定義から「durableConfig」を削除しても、通常のLambda関数になるわけではありません。
    • その場合「Construct IDが同じ別のLambda関数が定義された」という扱いになり、デプロイエラーになります。
    • 上記対策として「Construct IDを変える」「一度Lambda Durable Functionsを削除する」などが必要です。(前者の場合、元のLambda Durable Functionsは手作業で削除が必要)

また実際のLambda Durable Functionsのソースコードですが、下記AWS公式サイトの「2. Paste the following code into the index.mjs tab, replacing the code that Lambda created」に記載のコードをそのままコピペします。

docs.aws.amazon.com

その後 const orderId = event.orderId;event.orderId の箇所に、何か適当なorderIdの値を決め打ちで設定しておきます。

3. エイリアスの作成

Lambda Durable Functionsの実行には「修飾ARN」(バージョン番号付きのARN)が必須となるため、エイリアス(=最新バージョンのエイリアス)を必ず作成しておきます。

なお「修飾ARN」については、最後の「Q&A」で説明します。

動作確認

ここまで来たら、cdk deploy などでデプロイを実施します。

正常にデプロイ出来たら、マネジメントコンソールで該当のLambda Durable Functionsを開き「イベント名」に適当な名前を入れてテストを実行します。

durableConfig.executionTimeout を15分より大きい値に設定した場合、非同期実行しかできません

何もなければ、正常終了するはずです。

これでAWS CDKでのLambda Durable Functionsの作成が完了しました。お疲れさまでした。

Q&A

timeoutとexecutionTimeoutの違いは?

それぞれ、下記のとおりです。

項目 説明 最大値
timeout (Lambda Durable Functionsの)Lambda関数自体の1回あたりの最大実行時間 15分
executionTimeout Lambda Durable Functionsで作成したワークフローを実行できる最大期間 366日(≒1年)

注意すべき点として、Lambda Durable Functionsの「最長1年」というのは「waitなどによる実行→中断→再開→中断→再開...を繰り返しながらワークフローを完了させられる期間」(=ワークフローが存在できる期間)が最長1年という事です。

よく「Lambda単体の実行時間が1年になった」と勘違いされますが「Lambda単体の実行時間」は今まで通り最長15分のままです

retentionPeriodの最大が3か月だけど、ワークフローの実行は最長1年じゃないの?

ワークフローの実行自体は最長1年ですが、各チェックポイントの情報を保存できる期間は(retentionPeriodの)「最長3か月」になります。

これについては「3か月以上も放置されるワークフローなら、むしろ一度リセットして最初からやり直した方が良い」という事かもしれません。(あるいはチェックポイントの情報の保存にかかるコストを加味しての挙動かも)

「修飾ARN」と「非修飾ARN」って何?

「3. エイリアスの作成」で触れた「修飾ARN」について、ARNには「修飾ARN」と「非修飾ARN」の2つがあり、それぞれ以下の通りです。

項目 説明
修飾ARN 末尾にバージョン番号やエイリアスなどが付いたARN arn:aws:lambda:us-east-1:123456789012:function:hogehogefunction:10
被修飾ARN 末尾にバージョン番号やエイリアスなどが付かないARN arn:aws:lambda:us-east-1:123456789012:function:hogehogefunction

一般的に「ARN」としてよく利用されるのは「非修飾ARN」ですが、Lambda Durable Functionsの実行には「修飾ARN」での指定が必須となります。

ただ、Lambda関数を変更するたびに変わるバージョン番号を毎回設定するのは面倒なので、エイリアスを作成して毎回最新バージョンが指定されるようにしています。

ちなみに、被修飾ARNでLambda Durable Functionsを実行しようとすると「You cannot invoke a durable function using an unqualified ARN」というエラーが発生します。

また通常のLambdaトリガ起動の設定(API Gateway, S3, Lambda関数URLなど)をCDKに設定した際も、デプロイ時にこのエラーが出ます。

この対策については、次回のブログで記載予定です。

まとめ

以上、AWS CDKでLambda Durable Functionsを作成する方法でした。

今回色々書きましたが、正直AWS CDKでLambda Durable Functionsを作成すること自体はそこまで難しくないと思います。(むしろ難しいのはソースコードの方)

なので、これを機会にまだLambda Durable Functionsに触れたことがない方は、ぜひLambda Durable Functionsに触れてみてはいかがでしょうか?

宣伝

今週2/16(月)~ 2/20(金)のお昼に、弊社(KDDIアジャイル開発センター)にて「KAG AI WEEK」というオンラインイベントを5日連続で開催します。

https://kddi-agile.connpass.com/event/384248/kddi-agile.connpass.com

私も4日目の2/19(木)に「ブログを手書きするのが大変だから、音声AIツール『Wispr Flow』を使って音声入力させてみた話」というタイトルでお話しさせていただきますので、よろしくお願いいたします。(なおタイトルからもわかる通り、AI技術にdeep diveするような内容ではありません)

では、今回はこの辺で。

【AWS】DynamoDBのグローバルセカンダリインデックス(GSI)に複数キーを指定する

はじめに

こんにちは。もうすぐ2025年も終わりますね。

そんな2025年最後のブログは、もう完全にre:InventのS3 オブジェクト50TB対応やLambda Durable Functionで影が薄くなってしまった感がある「DynamoDBのグローバルセカンダリインデックスにおける複合キーサポート」になります。

aws.amazon.com

概要

いままでグローバルセカンダリインデックス(以下「GSI」と記載)においてプライマリキー(値を一意に特定出来るキー*1)として設定できるのは、以下のパターンのみで、パーティションキーやソートキーは1つしか設定できませんでした。

それが今回の変更で、パーティションキーやソートキーをそれぞれ4つまで設定可能になりました。

確認

※GSIやプライマリーキーへキー設定はすでにあるので、今回は概要だけにとどめます

マネジメントコンソールからGSIを作成すると、確かにパーティションキーやソートキーをそれぞれ4つまで指定可能になっています。

またCloudFormationやAWS CDKもこれに対応済で、AWS CDKでGSIを作成する addGlobalSecondaryIndex メソッドの引数である GlobalSecondaryIndexPropsV2 interfaceでも、 partitionKeys および sortKeys という、 Attribute[] (=Attributeインターフェースの配列)を指定するプロパティが追加されており、ここにAttribureを複数指定することでそれらがパーティションキー&ソートキーとしてGSIに設定されます。*2

docs.aws.amazon.com

この機能が嬉しい点

複数キーを指定できるので、「#」などで値だけ区切る必要がなくなった(特にシングルテーブル設計の場合)

DynamoDBの設計パターン一つとして「シングルテーブル設計」という、「全エンティティのデータを1テーブルに集約すべき」という設計パターンがあり、以前がこれが強く推奨されていました。*3

参考:DynamoDB のデータモデリングの基盤

この「シングルテーブル設計」を適用した場合、パーティションキーをエンティティ種類(=RDBで言う「テーブル名」)に割くことになるため、その結果「(情報を一意にするための)キーが足りない」という問題が発生します。

現在は「ORDER#1A2B3C」のように「#で値を区切って1つのキーに複数の値を格納する」方法がよく用いられますが、こういったことをせず「1キーに1つの値」でも情報を一意に特定しやすくなる...というメリットがあります。

プライマリキー項目の指定だけでフィルタ可能になり、コストを抑えられる
(Queryなどで)フィルタを行う際、課金対象になるのは「プライマリキーの条件でフィルタした結果、抽出されたデータ」です。
仮に「プライマリキー以外の条件でフィルタした結果、除外されるデータ」があったとしても、それらは課金対象になります。

今まではプライマリキーのキー数が少なかったため、プライマリキー項目の指定だけでそれらを除外しにくかったのですが、今回の変更でそれがやりやすくなり(=プライマリキーのキー数が増えたため)、フィルタ実行時のコストを抑えることが可能になります。

まとめ

ちょっと駆け足になりましたが「DynamoDBのグローバルセカンダリインデックス(GSI)に複数キーを指定する」について概要を説明しました。

今回諸事情であまり細かい説明を入れられなかったのですが、もし「詳細を知りたい」「内容がいまいちわからない」などあれば質問いただければ回答させていただきますので、よろしくお願いいたします。

それでは、よいお年を。

*1:ただし、GSIやLSIでは結果的に一意でなくてもOKです

*2:今回は省略しますが、正常動作することは業務にて確認済です

*3:なお現在は「マルチテーブル設計」の記載もあることから、シングルテーブル設計だけが強く推奨されているわけではないようです

【AWS CDK】Workflow StudioのプロパティにCDKの値を動的に設定するには?

はじめに

※この記事はAWS CDK Advent Calendar 2025 10日目の記事です。

qiita.com

皆さん、Workflow Studio 使ってますか?

AWS Step Functions(以下「Step Functions」)のワークフローをGUIで編集できる、めちゃくちゃ便利なツールです。

VS Code「AWS Toolkit」拡張機能をインストールすれば、ローカルのVS Code上でも実行できるので、ますますStep Functionsのワークフロー作成が便利になりました。

ただ、Workflow StudioとCDKを一緒に使う場合、下記の点が問題になります。

  1. Workflow StudioからCDKの値(プロパティなど)を参照できない
    • Workflow StudioとCDKは連動しているわけではない
  2. Workflow StudioにCDKの動的な値(例えばCDKで作成したS3バケットのARNなど)は設定できない
    • あくまで動的な値が作成されるのはデプロイ後なので

しかしWorkflow Studioを使用していると「Workflow StudioのプロパティにCDKの動的な値を設定したい」というケースが出てきます。

今回はその「Workflow StudioのプロパティにCDKの動的な値を設定する方法」についてお話しします。

前提

Workflow Studioのワークフローファイルの拡張子は*.asl.json, *.asl.yaml, *.asl.yml の3つがありますが、この記事では *.asl.json で統一します。(他の拡張子でも記載内容はそのまま適用できます)

例えばどういうケース?

ワークフローからWeb APIリクエストを行う際に使用する「Call HTTPS APIs」アクションでは「接続ARN」として、Web APIリクエストで使用するEventBridge ConnectionのARNを指定する必要があります。*1

このEventBridge ConnectionをCDKで定義している場合、「接続ARN」にこのEventBridge ConnectionのARN(connectionArn)を設定したいのですが、先述の理由でそれができません。

ただ、これができないと

  1. 一度「接続ARN」にダミーの値を設定してデプロイする
  2. デプロイ後、マネジメントコンソールからEventBridge ConnectionのARNを確認する
  3. 2で確認したEventBridge ConnectionのARNを「接続ARN」に設定して再度デプロイする

という3ステップが必要になり面倒なので*2、これを解決する方法はないか...というのが今回の本題になります。

解決方法

この問題の解決方法は、以下の2つです。(他にもあるかもしれませんが)

  1. *.asl.json ファイルの内容をデプロイ時に置換する
  2. JSONataで実行時の入力文字列を取得するようにする

1. *.asl.jsonファイルの内容をデプロイ時に置換する

Workflow Studioのワークフローの実態は *.asl.json というjsonファイルです。
つまり、このjsonファイルの内容をデプロイ時に動的に書き換えてしまえばよいわけです。

具体的には、このようなソースを記載します。(なおこの時、「接続ARN」にダミーの決め打ち値を設定しておきます)

import fs from "node:fs";  
import path from "node:path";  
import { StateMachine, DefinitionBody } from "aws-cdk-lib/aws-stepfunctions";  
import { Connection, Authorization } from "aws-cdk-lib/aws-events";  
  
// EventBridge Connectionを作成    
const connection = new Connection(this, 'Connection', {
  authorization: Authorization.apiKey(...),
});
  
// *.asl.jsonファイルを読み込み、内容を置換。
// CONNECTION_DUMMY_ARNには先ほど「接続ARN」に設定したダミーの決め打ち値を定義しておく
const stateMachineDefinition = fs
      .readFileSync(path.resolve(__dirname, "./asl/example.asl.json"), "utf8")
      .replaceAll(CONNECTION_DUMMY_ARN, connection.connectionArn);
  
// ステートマシンを定義する。
// この時definitionBodyを「DefinitionBody.fromString」とし、引数に
// 置換後の*.asl.jsonファイルの文字列を設定する。
const stateMachine = new StateMachine(this, 'StateMachine', {
      definitionBody: DefinitionBody.fromString(stateMachineDefinition),
      ...(以下略)
});

ポイントは DefinitionBody.fromString() で、これを使用する事で definitionBody の値を文字列で指定することが可能です。(大抵の定義では DefinitionBody.fromFile()*.asl.json ファイルを読み込むことがほとんどですが)

2. JSONataで実行時の入力文字列を取得するようにする

これは使用ケースが限定されますが、「接続ARN」の値をJSONataで設定しておき、EventBridge ConnectionのARNをステートマシン実行時に受け取るように設定する...というものです。

例えばAPI Gateway経由でステートマシンを実行する場合は、以下になります。

1 「接続ARN」の値を {% $states.input.connectionArn %}と設定する

2 AwsIntegration などIntegration系クラスのリクエストテンプレート(requestTemplates)に、下記のように設定する

// import文は省略
const stateMachineIntegration = new AwsIntegration({
  service: "states",
  action: "StartExecution",
  integrationHttpMethod: "POST",
  options: {
    requestTemplates: {
      // API Gatewayにリクエストが送られたときにEventBridge ConnectionのARNをinput.connectionArnとしてStep Functionsに渡す
      "application/json": `{
        "input": `$util.escapeJavaScript({'connectionArn' : ${connection.connectionArn}})`,
        "stateMachineArn": "${stateMachineStandard.stateMachineArn}"
      }`,
    },
  }
});

ただし先述の通り、この方法は「使用ケースが限定的」「設定が少々複雑(リクエストテンプレートを設定する、など)」などの点があるため、個人的には「1. *.asl.jsonファイルの内容をデプロイ時に置換する」の方法が良いと思います。

まとめ

以上「Workflow StudioのプロパティにCDKの値を動的に設定する」方法でした。

正直、1の方法も最初は微妙かな...と思ったのですが*3、いろいろ調べた結果、現在はこの方法が一番のようです。(AWSのSAさんにも確認しました)

なおこの話に関しては、2025/11/21(金)開催の JAWS-UG CDK支部#23 札幌でもCDKしたいっしょ でも発表しておりますので、もしよろしければそちらの登壇資料も併せてご確認ください。

speakerdeck.com

では、今回はこの辺で。

*1:他には「CloudFormation置換」を入力する方法もあります

*2:しかも、開発と本番でアカウントが違う場合はさらに面倒になる

*3:CDKのコード内にPure TypeScriptな処理を挟むのはあまり推奨されないので...

【AWS Step Functions】「AWS Step Functions 入門ハンズオン」をやってみた感想&JSONata対応

はじめに

みなさんこんにちは。お久しぶりです。

最近、業務で久しぶりにAWS Step Functions(以下「Step Functions」)を使うことになったので、復習もかねてAWS公式の「AWS Step Functions 入門ハンズオン」をやってみることにしました。

今回はその「AWS Step Functions 入門ハンズオン」に関するお話しです。

AWS Step Functions 入門ハンズオン」とは

AWS公式が提供する「AWS Hands-on for Beginners」シリーズ*1のStep Function編で、Step Functionsについて下記の操作を行いながら、Step Functionsの基本的な機能・操作を理解しようというハンズオンです。

pages.awscloud.com

  • DynamoDBから文章データを取得する
  • 文章データを取得できた場合のみ、下記の処理を並列で実施する(できなかった場合はそこで終了)
    • 文章データの内容をAmazon Translateで英訳し、英訳した文章をDynamoDBに登録する
    • 文章データの内容をAmazon Pollyで音声データにし、音声データの保存先(S3バケット)の情報をDynamoDBに登録する

ハンズオンのアジェンダ

ハンズオンの正式なアジェンダは以下の通りです。

  1. 今回のハンズオンのシナリオ/構成の紹介 + AWS Step Functions の基本
  2. ステートマシーンの作成 + 「アクション」を使ってみる
  3. Input の受け取り + Choice ステートを使ってみる
  4. Parallel ステートで処理を並列に実行する
  5. Output の調整 + DB(Amazon DynamoDB)の更新
  6. Parallel ステートのもう一方の実装 - テキストを音声化する
  7. ステートで「タスクが終わるまで待つ」を実装する
  8. 音声ファイルの出力先情報を DynamoDB テーブルに格納する
  9. リソースの削除 + まとめ + Next Action 案

上記アジェンダはすべて公式Youtube動画&公式資料を見ながら実施できるので、初めてでもわかりやすい内容です。

またどの章も各章の動画自体は短いので(10分弱)、学ぶべきポイントをコンパクトに学べるようになっています。

なお事前にAWSアカウント作成が必要なので、AWSアカウントを持ってない人は先に作成しておいてください。(検証用の一時的アカウントなどは払い出されない)*2

やってみた感想

良かった点

Step Functionの基本的な使い方を学べる
Step Functions でステートマシンを作成する際の基本である「アクション(各AWSサービスを利用する機能)」や「フロー(条件分岐・並列処理などのロジックを定義する機能)」の使い方を学ぶことができます。*3

またステートマシン編集環境であるWorkflow Studioを操作するためWorkflow Studio、Workflow Studioの操作に慣れることもできます。(ちなみにこのハンズオンではマネジメントコンソールを使用しますが、VS CodeでもWorkflow Studioでステートマシンを編集可能です)

Amazon TranslateやAmazon Pollyについても知ることができる
このハンズオンではAmazon TranslateやAmazon Pollyも取り扱うため、普段触れることが少ない(と思う)これらのサービスについて知ることもできます。

ハンズオン内で「Amazon TranslateやAmazon PollyのAPIドキュメントを調べる」という作業があるため、それを通してそれらの機能を知ることができますし、ハンズオンが終わった後で「Amazon TranslateやAmazon Pollyを自分でもいろいろ触ってみようかな」というきっかけになるかもしれません。

注意する点

ちょっと内容が古い
このハンズオンの作成が2021年12月という事もあり、動画の内容が古いものになっています。
そのため、マネジメントコンソールのUIや操作手順で、一部現在のものとは異なるものがあるため、注意が必要です。

「JSONata」や「変数」について学べない
先ほど説明した通り、内容が古いものになっていることもあり、最近サポートされた「JSONata」や「変数」についてはこのハンズオンでは学べません。この点は注意が必要です。(特にJSONataはとても便利なので、個人的にはそろそろハンズオンをアップデートして頂けると嬉しいです)

なおJSONataや変数については、こちらのサイトが参考になります。

参考情報:ハンズオンの内容をJSONataで記載する

「注意する点」で記載の通り、このハンズオンではJSONataは扱っていません。

しかし現在はJSONataで記載するのが基本なので(デフォルトの「ステートマシンクエリ言語」もJSONata)「このハンズオンの内容をJSONataではどう書くか」を記載したいと思います。(一部「変数」も扱います)

※記載がない章は、特にJSONataでの記載が必要ない章になります

3.Input の受け取り + Choice ステートを使ってみる

「DynamoDB GetItem」ブロックの「APIパラメータ」
以下のように記載

{
  "TableName": "Article",
  "Key": {
    "ArticleID": {
      "S": "{% $states.input.ArticleID %}"
    }
  }
}

ちなみにハンズオンではDynamoDB関連ブロックの「統合タイプ」に「AWS SDK」を設定してますが「最適化」でも問題ないです。(指定する引数は同じ)

「Choice」ブロックの「Rule #1」の「Condition」
{% $exists($states.input.Item) %} と記載

4.Parallel ステートで処理を並列に実行する

「TranslateText」ブロックの「引数と出力」の「Text」
"{% $states.input.Item.Detail.S %}" と記載

5.Output の調整 + DB(Amazon DynamoDB)の更新

「TranslateText」ブロックの「引数と出力」の「出力」
以下のように記載

{
  "Item": "{% $states.input.Item %}",
  "Result": "{% $states.result%}"
}

「DynamoDB UpdateItem」ブロックの「引数と出力」の「引数」
以下のように記載(「ArticleID」と「:EnglishVersionRef」がポイント)

{
  "TableName": "article",
  "Key": {
    "ArticleID": {
      "S": "{% $states.input.Item.ArticleID.S %}"
    }
  },
  "UpdateExpression": "SET EnglishVersion = :EnglishVersionRef",
  "ExpressionAttributeValues": {
    ":EnglishVersionRef": {
      "S": "{% $states.input.Result.TranslatedText %}"
    }
  }
}

6.Parallel ステートのもう一方の実装 - テキストを音声化する

「StartSpeechSynthesisTask」ブロックの「引数と出力」の「Text」
{% $states.input.Item.Detail.S %} と記載

7.ステートで「タスクが終わるまで待つ」を実装する

「Choice」ブロックの「Rule #1」の「Condition」
{% $contains($states.input.TaskStatus, 'completed') %} と記載({% $states.input.TaskStatus = 'completed' %} でもOKだと思う)

「変数」の利用
また「TaskId」と「ArticleID」はハンズオンでは数珠つなぎのように渡していますが、ここは「変数」機能を使うと楽です。
「StartSpeechSynthesisTask」ブロックの「変数」に以下のように設定します。(変数名はルールに沿っていれば何でもいいです。)

{
  "varArticleId": "{% $states.input.Item.ArticleID.S %}",
  "varTaskId": "{% $states.result.SynthesisTask.TaskId %}"
}

そして「GetSpeechSynthesisTask」ブロックの「引数と出力」の「Text」に {% $varTaskId %} と設定すればOKです。

8.音声ファイルの出力先情報を DynamoDB テーブルに格納する

「DynamoDB UpdateItem」ブロックの「引数と出力」の「引数」
以下のように記載(「ArticleID」と「:S3URLRef」がポイント)

{
  "TableName": "article",
  "Key": {
    "ArticleID": {
      "S": "{% $varArticleId %}"
    }
  },
  "UpdateExpression": "SET S3URL = :S3URLRef",
  "ExpressionAttributeValues": {
    ":S3URLRef": {
      "S": "{% $SynthesisTask.OutputUri %}"
    }
  }
}

「ArticleID」は先ほど「StartSpeechSynthesisTask」ブロックの「変数」に設定した「varArticleId」を使用しています。
このように「変数」を使用する事で、わざわざ数珠つなぎをしなくても、他のブロックの値を参照できるようになります。

宣伝

今週10/11(土)に金沢・近江町市場で「JAWS Festa 2025」が開催されます!

jawsfesta2025.jaws-ug.jp

そしてその中で業務効率化をさらに加速させる、ノーコードツールとStep Functionsのハイブリッド化というタイトルで、ノーコードツールおよびStep Functionを使用した業務効率化やStep Functionsの効果的な使用方法などについてお話しさせていただきますので、よろしくお願いいたします。

それでは、今回はこの辺で。

*1:公式ページ「AWS Hands-on for Beginnersシリーズ一覧」というリンクがあるのですが、現在はAWS Hands-on for Beginners専用ページ自体がなくなっているようです

*2:ハンズオンで行うステートマシン実行も一応課金対象になるので、気になる人は注意しておきましょう(間違いなく無料枠で収まる範囲ですが...)

*3:ただし、Lambdaの実行とMapフロー(配列などのデータセット内データの並列処理ワークフロー)はこのハンズオンでは触りません

【AWS CDK】AWS CDK Toolkit LibraryがGAされました(その3)

今回のお話

8月にとある大型本番リリース作業があったためとても多忙で、ずいぶん間が空いてしまいました。すいません。

前々回および前回の記事で、AWS CDK Toolkit Libraryの基本、および「CDK CLIで行っていたあの機能、AWS CDK Toolkit Libraryではどうやるの?」という事を書きました。

最終回となる今回は、エラーハンドリング、そしてちょっと高度な「IIoHost」について紹介しようと思います。

AWS CDK Toolkit LibraryのAWS公式サイトはこちら。 aws.amazon.com

エラーハンドリング

CDK Toolkit Libraryには、ToolkitError クラスにあらかじめエラーハンドリング用のヘルパー関数が用意されており、 容易にエラーハンドリング処理を実装できます。

具体的には以下の通りです。(すべてstatic関数です)*1

ヘルパー関数 説明 備考
isToolkitError(error) CDK Toolkitライブラリのエラーかどうかを判定する 以下エラーの親クラス
isAuthenticationError(error) クレデンシャル違いなど、認証に関するエラーかどうかを判定する
isAssemblyError(error) CDKの定義間違いなので発生したエラーかどうかを判定する。
isContextProviderError(error) ContextProvider(≒cintext)に起因するエラーかどうか判定する ContextProviderは、synth時のcdk.context.json出力などに関係するクラス。
withCause(message, error) errorをcause(エラーの根本原因)としてオリジナルエラーをToolkitErrorとして返す これはエラー種類の判定ではない

下記がサンプルコードになります(一部AWS公式サイトより)
下記サンプルコードの通り、ToolkitErrorは、コンストラクタなどから自分で生成することもできます。

ちなみにCDK Toolkit Libraryのエラーは、エラータイプのinstanceofチェックに頼らず、ヘルパー関数を使用する事AWS公式より案内されています。

import { ToolkitError } from '@aws-cdk/toolkit-lib';
  
try {
  // なんか処理
  await toolkit.deploy(cloudAssemblySource, {
    stacks: { strategy: StackSelectionStrategy.ALL_STACKS }
  });
  
} catch (error) {
  if (ToolkitError.isAuthenticationError(error)) {
    // 認証エラー(クレデンシャル不一致など)
    console.error('Authentication failed. Check your AWS credentials.');
  
  } else if (ToolkitError.isAssemblyError(error)) {
    // CDKの定義が違う
    console.error('CDK app error:', error.message);
  
  } else if (ToolkitError.isToolkitError(error)) {
    // context周りでエラー
    console.error('CDK Toolkit error:', error.message);
  
  } else if (ToolkitError.isToolkitError(error)) {
    // 上記に該当しないToolkitErrorのエラー
    console.error('CDK Toolkit error:', error.message);
    
    // withCauseで、cause付きのToolkitErrorを取得する
    throw ToolkitError.withCause(error.message, error);
    
  } else {
    // ToolkitErrorではない、予期しないエラー
    console.error('Unexpected error:', error);
    
    // こんな感じで、ToolkitErrorエラーの作成もできる
    throw new ToolkitError('予期しないエラー', 'unknown', error)
  }
}

IIoHostによるメッセージとインタラクション

CDK Toolkit Libraryではオペレーションを実行時、下記「メッセージ」および「リクエスト」という2つの主要なメカニズムを介して通信します。

  • メッセージ:オペレーションの実行状況などを通知する。(例:「デプロイの進行中」など)
  • リクエスト:ユーザーからの入力または確認が必要な決定ポイントで、これらを確認する機会を提供する。(例:「このスタックをデプロイしていいですか?」など)

これらを構成するのに必要なのが IIoHost インターフェースです

IIoHostインターフェースについて

IIoHost インターフェースは、以下2つの構成で成り立っています。

  • notify:上記「メッセージ」を処理します。
  • requestResponse:上記「リクエスト」を処理します。

ソースコードでは、下記の内容になっています。

import type { IoMessage, IoRequest } from './io-message';
export interface IIoHost {
    notify(msg: IoMessage<unknown>): Promise<void>;
    requestResponse<T>(msg: IoRequest<unknown, T>): Promise<T>;
}

またCDK Toolkit Libraryのコンストラクタでは、以下のようにIIoHostを定義します。

const toolkit = new Toolkit({
  ioHost: {
    notify: async function (msg) {...(なんか処理)},
    requestResponse: async function (msg) {...(なんか処理)},
  }
});

notifyについて

notify には、IoMessage 型の引数msg の内容に応じたメッセージ処理を記載します。

AWS公式のサンプルでは下記のように msg.level に応じたログを出力する処理が記載されており、アプリのログ出力とも似通った分かりやすい内容となっています。

notify: async function (msg) {
  // notify関数の例
    switch (msg.level) {
      case 'error':
        console.error(`[${msg.time}] ERROR: ${msg.message}`);
        break;
      case 'warning':
        console.warn(`[${msg.time}] WARNING: ${msg.message}`);
        break;
      case 'info':
        console.info(`[${msg.time}] INFO: ${msg.message}`);
        break;
      case 'debug':
        console.debug(`[${msg.time}] DEBUG: ${msg.message}`);
        break;
      case 'trace':
        console.debug(`[${msg.time}] TRACE: ${msg.message}`);
        break;
      default:
        console.log(`[${msg.time}] ${msg.level}: ${msg.message}`);
      }
    },

ちなみにmsgにはlevelの他にも、以下のようなプロパティがあります。

プロパティ 説明 備考
time 日時情報(Date型)
action 実行した処理(deploy, synth, listなど)
code メッセージコード
message メッセージ本文
span メッセージスパン 同じ操作に意味的に関連した複数メッセージのグループ。それ以外では無意味
data メッセージに添付されたデータ

requestResponseについて

requestResponse は、ユーザーからの入力または確認が必要な場合に送信されるリクエストに対する挙動(=レスポンス処理)を記載します。

なおこのrequestResponse は、CDK Toolkit Library において非常に重要です

なぜならCDK Toolkit Libraryでは、このような処理に対する初期値が「何も確認しない」になっているからです。(CDK CLIで言えば、--require-approval の 初期値が never になっているのと同じ)

そのため、例えば toolkit.destroy(=スタックの削除)のような危険な処理もユーザーに確認することなく、ダイレクトで実行されてしまいます

そこで、これを防ぐために requestResponse でちゃんと確認するように設定する必要があります。

サンプルとしては以下です。(AWS公式より)

requestResponse: async (msg) => {

  // (例)本番だけは何も確認しない場合        
  if (environment === 'production') {
    console.log(`Auto-approving for production: ${msg.message}`);
      // msg.defaultResponseは「デフォルトの挙動を行う(=何も確認しない)」ための専用のプロパティ
      return msg.defaultResponse;
  }
  
    // 何かしら確認のためのプロンプトなどを出す処理を行う
    // ちなみに引数のmsgは、defaultResponseプロパティを持つ事以外notifyのmsgと同じです。 
    return showPromptForApproval(msg.message);
}

なおこのIIoHost ですが、下記のいずれかの方法で定義できます。

  • Toolkit コンストラクタに notifyrequestResponse を直で定義する
  • NonInteractiveIoHost クラスを継承したカスタムクラスを自分で定義し、それをToolkit コンストラクタのIIoHostに設定する

ちなみに後者を使用すると「変更する必要がある動作のみをカスタマイズしながら、既存の実装を活用できる」というメリットがあります。

これはAWS公式にサンプルコードがありますので、そちらを参照ください。

まとめ

以上、AWS CDK Toolkit LibraryのエラーハンドリングとIIoHostについての設定についてでした。

今日の内容、特にIIoHostRequestResponse はちょっと難しい内容かもしれませんが、RequestResponse は非常に重要な設定なので、必ず最初に設定しておいた方が良いと思います。

ちなみに今回RequestResponseの部分について説明が駆け足になってしまいましたが、詳しく知りたい方は、山梨 蓮さん(X:@ren_yamanashi)がブログで非常に詳しく説明してくれていますので、ぜひそちらもご参照ください。

zenn.dev

最後に

9/12(金)に大阪で開催される「JAWS-UG CDK支部#22 大阪でもCDKしたいねん」にて、LTをさせていただきます。

jawsug-cdk.connpass.com

内容としては、前回&今回ブログで書いたような「CDK CLIでできてたあの機能、CDK Toolkit Libraryではどうやるの?」という内容になると思います。

また10/11(土)に金沢で開催される「JAWS Festa 2025」において、ありがたいことにCFPが採用され、登壇させていただくことになりました。

jawsfesta2025.jaws-ug.jp

正式なタイトルは、公式ページをお楽しみください。

それでは、今回はこの辺で。

*1:公式ページに記載のToolkitError.isDeploymentErrorは、実際のソースコードでは定義されていませんでした。




以上の内容はhttps://makky12.hatenablog.com/より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14