1.はじめに
こんにちはデータ分析エンジニアの木介です。
今回は2025年12月にAWSより発表された Lambda Durable Functions を使って Human-in-the-Loop なAgentの構成を紹介します。
Lambda Durable Functions を使うと、複数ステップの処理を 最大1年間 継続できます。
また、承認待ちのように処理を進められない間はLambdaの実行を一度止めた状態で待機でき、その間は実行時間として扱われません。
そのため、Human-in-the-Loop のように「数時間〜数日待つ」ワークフローでも、サーバレスのまま実装しやすくなります。
以下のような人間の承認が必要となるようなAgentを、 Lambda Durable Functions によりサーバレス構成で実現しています。
2.概要
1.Lambda Durable Functions とは
まず今回の肝となる Lambda Durable Functions について紹介していきます。
Lambda Durable Functions とは最大1年間実行可能なマルチステップワークフローを構築できる機能です。
通常の Lambda に checkpoint/replay 機能を追加することで、以下の図のように待機や中断された場合から処理を再開する時に、checkpoint 部分の実行をスキップして保存済みの結果の再利用が可能となります。
また待機や中断状態の間は課金がされないため、API 呼び出しなどによる長時間の待機が必要となる処理ではコストの削減が期待できます。
2.今回実現すること
Lambda Durable Functions により以下のような Human-in-the-Loop なワークフローを実施するような Agent を作成していきます。
ワークフローの流れ
- Agent がユーザからの質問に応じて調査をし、レポートを作成する。
- 調査レポートの S3 へのアップロードの是非についてユーザに承認を求める。
- 承認されれば Agent がレポートを S3 にアップロードする。
Lambda Durable Functions を使うことで、ユーザ承認のような長時間待機が発生する処理を、サーバレスなワークフローとして実現できます。
また、今回の構成を Stepfunctions 、 Lambda で定義した場合との比較を以下に示します。
| Lambda Durable Functions | Step Functions | Lambda | |
|---|---|---|---|
| 特徴 | Lambdaコード内で複数ステップ・承認待ち・再開を扱える | ステートマシン(ASL)としてワークフローを定義・管理 | 1回の実行で完結する関数実行 |
| メリット | 1. 承認待ち処理をコードで自然に表現できる 2. 承認後に同じコードの続きを再開できる 3. AgentロジックをPythonコードに集約できる |
ビジュアルエディタによりワークフロー全体を可視化・運用しやすい | 1. 実装が最もシンプル 2. 単発のAgent実行に向いている |
| デメリット | リプレイ前提のためコードは決定的に書く必要がある | ASLと Lambda に処理が分かれAgent実装と乖離しやすい | 15分以上の待機を扱えず、再実行時には最初からの処理となる |
3.実装
今回大きく以下の処理をそれぞれのステップとして、Lambda Durable Functions によるワークフローの実装を行いました。
- Agentによる調査レポートの作成
- 調査レポートアップロードの承認を依頼
- 承認を得られるまで待機
- 承認結果に基づいたAgentの再実行
以下の形で、aws-durable-execution-sdk-python を利用して、上記のステップをそれぞれ定義しています。
実装 以下にワークフローの実装部分を示します。 Lambda の Context として渡される DurableContext を使って、それぞれのステップを定義しています。
@durable_step def create_report(step_context: StepContext, prompt: str, session_id: str) -> Any: """1. レポートを作成""" step_context.logger.info( "【CREATE_REPORT】調査レポートを作成します。", ) session = create_session(session_id=session_id) with mcp_client: aws_search_tools = mcp_client.list_tools_sync() agent: Agent = define_agent(aws_search_tools, memory_session=session) result: AgentOutput = agent(prompt).structured_output step_context.logger.info( "MESSAGE: %s", result.model_dump_json(indent=2, ensure_ascii=False) ) return result.model_dump() @durable_step def send_for_approval( step_context: StepContext, callback_id: str, file_name: str ) -> dict: """2. 承認リクエストをユーザに送信""" step_context.logger.info( "【SEND_APPROVAL】%sのS3へのアップロードの承認をお願いします。 %s", file_name, callback_id, ) # 実際に利用する場合には、ここでメール送信やSNS通知などを実装する # send_email(to=approver_email, subject="レポート保存承認リクエスト", body=message) return { "file_name": file_name, "callback_id": callback_id, "status": "sent_for_approval", } @durable_step def save_report( step_context: StepContext, session_id, ) -> Any: """3. レポートをS3に保存する""" step_context.logger.info( "【SAVE_REPORT】S3にレポートを保存します。", ) session = create_session(session_id=session_id) agent: Agent = define_agent([upload_report], memory_session=session) result: AgentOutput = agent("作成したレポートを保存してください").structured_output step_context.logger.info( "MESSAGE: %s", result.model_dump_json(indent=2, ensure_ascii=False) ) return result.model_dump() @durable_execution def lambda_handler(event, context: DurableContext): ... # Step1: レポートを作成 created_report = context.step( create_report(prompt, session_id), name="create-report" ) # Step2: コールバックを作成する context.logger.info("コールバックを作成開始") callback = context.create_callback( name="user_approval", config=CallbackConfig( timeout=Duration.from_hours(24), heartbeat_timeout=Duration.from_hours(1) ), ) context.logger.info("コールバックを作成完了: %s", callback.callback_id) context.logger.info("承認リクエストを送信開始") approval_request = context.step( send_for_approval(callback.callback_id, created_report["file_name"]), ) context.logger.info("承認リクエストを送信完了: %s", approval_request) # Step3: コールバックの結果を取得 context.logger.info("承認結果を取得開始") approval_result = callback.result() context.logger.info("承認結果を取得完了: %s", approval_result) # Step4: 承認結果に基づいて条件分岐 - Remove lambda wrappers if approval_result: context.logger.info("承認されました。レポートを保存します。") saved_report = context.step(save_report(session_id), name="save-report") return saved_report else: context.logger.info("承認されませんでした。レポートは保存しません。") saved_report = context.step(denied_report(session_id), name="denied-report") return saved_report
send_for_approval でユーザへの承認リクエストを行っています。
今回は省略していますが、メール通知などでユーザに対して承認リクエストを送信することが出来ます。
ユーザは受け取った承認リクエストに応じて、以下の API から承認・否認を選択することが出来ます。
承認時
lambda_client = boto3.client("lambda") lambda_client.send_durable_execution_callback_success( CallbackId=callback_id, Result=json.dumps({"approved": True}).encode("utf-8"), )
否認時
lambda_client = boto3.client("lambda") lambda_client.send_durable_execution_callback_failure( CallbackId=callback_id, Error={ "ErrorMessage": "否認します", "ErrorType": "Rejected", "ErrorData": json.dumps({"approved": False}), }, )
また、今回の実装ではただ承認・否認をしているだけですが、上記 API のリクエスト部分を Lambda で受け取り、 Agent の動作を制御することも出来ます。
これにより例えば否認時の Agent への次の処理の依頼などを実現できます。
boto3.amazonaws.com boto3.amazonaws.com
全文は以下です。
実装全文を表示
import logging import re from textwrap import dedent from typing import Any, Optional, Tuple from uuid import uuid4 import boto3 from aws_durable_execution_sdk_python import ( DurableContext, StepContext, durable_execution, durable_step, ) from aws_durable_execution_sdk_python.config import CallbackConfig, Duration from bedrock_agentcore.memory import MemoryClient from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig from bedrock_agentcore.memory.integrations.strands.session_manager import ( AgentCoreMemorySessionManager, ) from mcp.client.streamable_http import streamable_http_client from pydantic import BaseModel, Field from strands import Agent, tool from strands.models import BedrockModel from strands.tools.mcp import MCPClient logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) BUCKET = "your-bucket" PREFIX = "agent-reports/" MODEL_ID = "global.anthropic.claude-haiku-4-5-20251001-v1:0" MEM_ID = "your-memory-id" ACTOR_ID = "sample-htl-agent" s3_client = boto3.client("s3") memory_client = MemoryClient() mcp_client = MCPClient( lambda: streamable_http_client("https://knowledge-mcp.global.api.aws") ) ACTOR_ID_REGEX = re.compile( r"^[A-Za-z0-9][A-Za-z0-9\-_\/]*(?::[A-Za-z0-9\-_\/]+)*[A-Za-z0-9\-_\/]*$" ) SESSION_ID_REGEX = re.compile(r"^[A-Za-z0-9][A-Za-z0-9\-_]*$") def sanitize_identifier( raw_value: str, *, allowed_specials: str, pattern: re.Pattern[str], fallback_prefix: str, ) -> str: """IDのサニタイズ.""" allow_colon = ":" in allowed_specials allowed_chars = set(allowed_specials) if allow_colon: allowed_chars.add(":") cleaned = "".join(ch for ch in raw_value if ch.isalnum() or ch in allowed_chars) if allow_colon and ":" in cleaned: cleaned = ":".join(segment for segment in cleaned.split(":") if segment) elif not allow_colon and ":" in cleaned: cleaned = cleaned.replace(":", "") cleaned = cleaned.lstrip(allowed_specials.replace(":", "")) if not cleaned or not cleaned[0].isalnum(): cleaned = "" if cleaned and pattern.fullmatch(cleaned): return cleaned return f"{fallback_prefix}-{uuid4().hex[:10]}" def ensure_safe_ids(actor_id: str, session_id: str) -> Tuple[str, str]: """actor_idとsession_idのサニタイズ.""" safe_actor_id = sanitize_identifier( actor_id, allowed_specials="-_/:", pattern=ACTOR_ID_REGEX, fallback_prefix="actor", ) safe_session_id = sanitize_identifier( session_id, allowed_specials="-_", pattern=SESSION_ID_REGEX, fallback_prefix="session", ) return safe_actor_id, safe_session_id class AgentOutput(BaseModel): """レポート作成結果のスキーマ定義""" report_content: str = Field( description="レポートの内容。マークダウン形式で作成すること" ) file_name: str = Field(description="ファイル名。拡張子.mdをつけること") denied_message: Optional[str] = Field( default=None, description="S3への保存が承認されなかった場合のメッセージ。保存しようとしていた内容の概要と、承認されなかったことを簡潔に説明すること。", ) @tool(context=True) def upload_report(doc: str, file_name) -> bool: """S3に調査レポートをアップロードします Args: doc (str): レポート内容 file_name (_type_): アップロードするファイル名 Returns: bool: アップロード成功有無 """ logger.info("Uploading report to %s/%s", BUCKET, PREFIX + file_name) try: s3_client.put_object( Bucket=BUCKET, Key=f"{PREFIX}{file_name}", Body=doc.encode("utf-8"), ContentType="text/markdown", ) except Exception: logger.exception("Failed to upload report to %s/%s", BUCKET, PREFIX + file_name) return False return True def create_session(session_id: str) -> AgentCoreMemorySessionManager: # AgentCore Memoryの設定値 safe_actor_id, safe_session_id = ensure_safe_ids(ACTOR_ID, session_id) agentcore_memory_config = AgentCoreMemoryConfig( memory_id=MEM_ID, session_id=safe_session_id, actor_id=safe_actor_id ) # セッションマネージャーを作成 session_manager = AgentCoreMemorySessionManager( agentcore_memory_config=agentcore_memory_config, region_name="us-east-1" ) return session_manager def define_agent(tools: Any, memory_session: AgentCoreMemorySessionManager) -> Agent: agent = Agent( model=BedrockModel( model_id=MODEL_ID, streaming=False, ), system_prompt=dedent(""" AWSについて調査レポートを作成してください。 調査時には必ずAWS Knowledge MCPを使用してください。 マークダウン形式で出力し、レポート結果をS3にアップロードしてください。 """), tools=tools, session_manager=memory_session, callback_handler=None, structured_output_model=AgentOutput, ) return agent @durable_step def create_report(step_context: StepContext, prompt: str, session_id: str) -> Any: step_context.logger.info( "【CREATE_REPORT】調査レポートを作成します。", ) session = create_session(session_id=session_id) with mcp_client: aws_search_tools = mcp_client.list_tools_sync() agent: Agent = define_agent(aws_search_tools, memory_session=session) result: AgentOutput = agent(prompt).structured_output step_context.logger.info( "MESSAGE: %s", result.model_dump_json(indent=2, ensure_ascii=False) ) return result.model_dump() @durable_step def send_for_approval( step_context: StepContext, callback_id: str, file_name: str ) -> dict: """承認リクエストをユーザに送信""" step_context.logger.info( "【SEND_APPROVAL】%sのS3へのアップロードの承認をお願いします。 %s", file_name, callback_id, ) # 実際に利用する場合には、ここでメール送信やSNS通知などを実装する # send_email(to=approver_email, subject="レポート保存承認リクエスト", body=message) return { "file_name": file_name, "callback_id": callback_id, "status": "sent_for_approval", } @durable_step def save_report( step_context: StepContext, session_id, ) -> Any: """レポートをS3に保存する""" step_context.logger.info( "【SAVE_REPORT】S3にレポートを保存します。", ) session = create_session(session_id=session_id) agent: Agent = define_agent([upload_report], memory_session=session) result: AgentOutput = agent("作成したレポートを保存してください").structured_output step_context.logger.info( "MESSAGE: %s", result.model_dump_json(indent=2, ensure_ascii=False) ) return result.model_dump() @durable_step def denied_report( step_context: StepContext, session_id: str, ) -> Any: """レポートをS3に保存する""" step_context.logger.info( "【DENIED_REPORT】S3への保存が承認されませんでした。", ) session = create_session(session_id=session_id) agent: Agent = define_agent([], memory_session=session) result: AgentOutput = agent( "レポートの保存が承認されませんでした。" ).structured_output step_context.logger.info( "MESSAGE: %s", result.model_dump_json(indent=2, ensure_ascii=False) ) return result.model_dump() @durable_execution def lambda_handler(event, context: DurableContext): prompt = event["prompt"] session_id = context.state.durable_execution_arn.split(":")[-1] # Step1: レポートを作成 created_report = context.step( create_report(prompt, session_id), name="create-report" ) # Step2: コールバックを作成する context.logger.info("コールバックを作成完了") callback = context.create_callback( name="user_approval", config=CallbackConfig( timeout=Duration.from_hours(24), heartbeat_timeout=Duration.from_hours(1) ), ) context.logger.info("コールバックを作成完了: %s", callback.callback_id) context.logger.info("承認リクエストを送信開始") approval_request = context.step( send_for_approval(callback.callback_id, created_report["file_name"]), ) context.logger.info("承認リクエストを送信完了: %s", approval_request) # Step3: コールバックの結果を取得 context.logger.info("コールバック結果を取得: %s", callback.callback_id) approval_result = callback.result() context.logger.info("承認結果を取得: %s", approval_result) # Step4: 承認結果に基づいて条件分岐 - Remove lambda wrappers if approval_result: context.logger.info("承認されました。レポートを保存します。") saved_report = context.step(save_report(session_id), name="save-report") return saved_report else: context.logger.info("承認されませんでした。レポートは保存しません。") saved_report = context.step(denied_report(session_id), name="denied-report") return saved_report
また、デプロイはAWS SAMで実施しました。
通常のLambdaの定義に以下の「DurableConfig」を追加することで設定可能です。
DurableFunctionsHtlAgentFunction: Type: AWS::Serverless::Function Properties: FunctionName: durable-functions-htl-agent CodeUri: src/ Handler: main.lambda_handler DurableConfig: ExecutionTimeout: 7776000 # 90 days Role: Fn::GetAtt: - DurableFunctionsHtlAgentRole - Arn
4.実行結果
4.1 Lambda Durable Functions を使った Human-in-the-Loop な Agent の実行
ではデプロイされた Lambda を実行してみます。
AWS コンソールの Lambda の画面から以下の入力で Lambda を実行しました。
一点注意点としては、 Lambda Durable Functions でも同期実行では Lambda の最大実行時間は15分となります。
それ以上時間がかかる場合は非同期での呼び出しが推奨されています。
実行結果は以下の形で表示されます。
上記実装で定義したレポート作成の実施後にユーザへの承認を送信して、待機していることが分かります。
待機状態の間は課金がされないため、長時間の待機時間があるワークフロー処理を安価に実現できます。
4.2 Agent のアクションを承認する
承認結果を送信するにはコンソール上から以下の「送信成功」/「送信失敗」のどれかを選択することで実施できます。
この操作を行わない限り Lambda は待機状態を維持します。
では「送信成功」を選択した結果です。
以下の形で調査レポートのアップロード処理が行われているとがわかります。
また、S3 上へもレポートが想定通り出力されていました。
4.2 Agent のアクションを否認する
また「送信失敗」を選択した結果は以下となります。
承認が拒否されたため、否認時の処理を行い S3 へのアップロードは行われていませんでした。
5. まとめ
Lambda Durable Functions を使った Human-in-the-Loop な Agent を作成しました。
特にAgent周りでは今回実施したユーザの承認を求める場合以外にも、長時間実行されるツールを利用する際などでも利用できる機能だと感じました。
マルチステップな Agent を実装する際には利用する機会が増えていきそうです。
Acroquest Technologyでは、キャリア採用を行っています。
- Azure OpenAI/Amazon Bedrock等を使った生成AIソリューションの開発
- ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
- マイクロサービス、DevOps、最新のOSSやクラウドサービスを利用する開発プロジェクト
- 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。