以下の内容はhttps://acro-engineer.hatenablog.com/entry/2026/01/30/120000より取得しました。


Lambda Durable Functions で Human-in-the-Loop なAIエージェントを実装する!

1.はじめに

こんにちはデータ分析エンジニアの木介です。
今回は2025年12月にAWSより発表された Lambda Durable Functions を使って Human-in-the-Loop なAgentの構成を紹介します。

Lambda Durable Functions を使うと、複数ステップの処理を 最大1年間 継続できます。
また、承認待ちのように処理を進められない間はLambdaの実行を一度止めた状態で待機でき、その間は実行時間として扱われません。
そのため、Human-in-the-Loop のように「数時間〜数日待つ」ワークフローでも、サーバレスのまま実装しやすくなります。

aws.amazon.com

以下のような人間の承認が必要となるようなAgentを、 Lambda Durable Functions によりサーバレス構成で実現しています。

Human-in-the-Loop を含む Agent の全体構成イメージ

2.概要

1.Lambda Durable Functions とは

まず今回の肝となる Lambda Durable Functions について紹介していきます。
Lambda Durable Functions とは最大1年間実行可能なマルチステップワークフローを構築できる機能です。

通常の Lambda に checkpoint/replay 機能を追加することで、以下の図のように待機や中断された場合から処理を再開する時に、checkpoint 部分の実行をスキップして保存済みの結果の再利用が可能となります。

Lambda Durable Functions による checkpoint / replay の仕組み

docs.aws.amazon.com

また待機や中断状態の間は課金がされないため、API 呼び出しなどによる長時間の待機が必要となる処理ではコストの削減が期待できます。

aws.amazon.com

2.今回実現すること

Lambda Durable Functions により以下のような Human-in-the-Loop なワークフローを実施するような Agent を作成していきます。

今回実装する Human-in-the-Loop ワークフローの全体像

ワークフローの流れ

  1. Agent がユーザからの質問に応じて調査をし、レポートを作成する。
  2. 調査レポートの S3 へのアップロードの是非についてユーザに承認を求める。
  3. 承認されれば Agent がレポートを S3 にアップロードする。

Lambda Durable Functions を使うことで、ユーザ承認のような長時間待機が発生する処理を、サーバレスなワークフローとして実現できます。

また、今回の構成を Stepfunctions 、 Lambda で定義した場合との比較を以下に示します。

Lambda Durable Functions Step Functions Lambda
特徴 Lambdaコード内で複数ステップ・承認待ち・再開を扱える ステートマシン(ASL)としてワークフローを定義・管理 1回の実行で完結する関数実行
メリット 1. 承認待ち処理をコードで自然に表現できる
2. 承認後に同じコードの続きを再開できる
3. AgentロジックをPythonコードに集約できる
ビジュアルエディタによりワークフロー全体を可視化・運用しやすい 1. 実装が最もシンプル
2. 単発のAgent実行に向いている
デメリット リプレイ前提のためコードは決定的に書く必要がある ASLと Lambda に処理が分かれAgent実装と乖離しやすい 15分以上の待機を扱えず、再実行時には最初からの処理となる

3.実装

今回大きく以下の処理をそれぞれのステップとして、Lambda Durable Functions によるワークフローの実装を行いました。

  1. Agentによる調査レポートの作成
  2. 調査レポートアップロードの承認を依頼
  3. 承認を得られるまで待機
  4. 承認結果に基づいたAgentの再実行

以下の形で、aws-durable-execution-sdk-python を利用して、上記のステップをそれぞれ定義しています。

github.com

実装 以下にワークフローの実装部分を示します。 Lambda の Context として渡される DurableContext を使って、それぞれのステップを定義しています。

@durable_step
def create_report(step_context: StepContext, prompt: str, session_id: str) -> Any:
    """1. レポートを作成"""
    step_context.logger.info(
        "【CREATE_REPORT】調査レポートを作成します。",
    )
    session = create_session(session_id=session_id)
    with mcp_client:
        aws_search_tools = mcp_client.list_tools_sync()
        agent: Agent = define_agent(aws_search_tools, memory_session=session)

        result: AgentOutput = agent(prompt).structured_output

    step_context.logger.info(
        "MESSAGE: %s", result.model_dump_json(indent=2, ensure_ascii=False)
    )
    return result.model_dump()


@durable_step
def send_for_approval(
    step_context: StepContext, callback_id: str, file_name: str
) -> dict:
    """2. 承認リクエストをユーザに送信"""
    step_context.logger.info(
        "【SEND_APPROVAL】%sのS3へのアップロードの承認をお願いします。 %s",
        file_name,
        callback_id,
    )

    # 実際に利用する場合には、ここでメール送信やSNS通知などを実装する
    # send_email(to=approver_email, subject="レポート保存承認リクエスト", body=message)

    return {
        "file_name": file_name,
        "callback_id": callback_id,
        "status": "sent_for_approval",
    }


@durable_step
def save_report(
    step_context: StepContext,
    session_id,
) -> Any:
    """3. レポートをS3に保存する"""
    step_context.logger.info(
        "【SAVE_REPORT】S3にレポートを保存します。",
    )
    session = create_session(session_id=session_id)
    agent: Agent = define_agent([upload_report], memory_session=session)

    result: AgentOutput = agent("作成したレポートを保存してください").structured_output

    step_context.logger.info(
        "MESSAGE: %s", result.model_dump_json(indent=2, ensure_ascii=False)
    )
    return result.model_dump()


@durable_execution
def lambda_handler(event, context: DurableContext):
    ...

    # Step1: レポートを作成
    created_report = context.step(
        create_report(prompt, session_id), name="create-report"
    )

    # Step2: コールバックを作成する
    context.logger.info("コールバックを作成開始")
    callback = context.create_callback(
        name="user_approval",
        config=CallbackConfig(
            timeout=Duration.from_hours(24), heartbeat_timeout=Duration.from_hours(1)
        ),
    )
    context.logger.info("コールバックを作成完了: %s", callback.callback_id)
    
    context.logger.info("承認リクエストを送信開始")
    approval_request = context.step(
        send_for_approval(callback.callback_id, created_report["file_name"]),
    )
    context.logger.info("承認リクエストを送信完了: %s", approval_request)

    # Step3: コールバックの結果を取得
    context.logger.info("承認結果を取得開始")
    approval_result = callback.result()
    context.logger.info("承認結果を取得完了: %s", approval_result)

    # Step4: 承認結果に基づいて条件分岐 - Remove lambda wrappers
    if approval_result:
        context.logger.info("承認されました。レポートを保存します。")
        saved_report = context.step(save_report(session_id), name="save-report")
        return saved_report
    else:
        context.logger.info("承認されませんでした。レポートは保存しません。")
        saved_report = context.step(denied_report(session_id), name="denied-report")
        return saved_report

send_for_approval でユーザへの承認リクエストを行っています。
今回は省略していますが、メール通知などでユーザに対して承認リクエストを送信することが出来ます。
ユーザは受け取った承認リクエストに応じて、以下の API から承認・否認を選択することが出来ます。

承認時

lambda_client = boto3.client("lambda")

lambda_client.send_durable_execution_callback_success(
    CallbackId=callback_id,
    Result=json.dumps({"approved": True}).encode("utf-8"),
)

否認時

lambda_client = boto3.client("lambda")

lambda_client.send_durable_execution_callback_failure(
    CallbackId=callback_id,
    Error={
        "ErrorMessage": "否認します",
        "ErrorType": "Rejected",
        "ErrorData": json.dumps({"approved": False}),
    },
)

また、今回の実装ではただ承認・否認をしているだけですが、上記 API のリクエスト部分を Lambda で受け取り、 Agent の動作を制御することも出来ます。
これにより例えば否認時の Agent への次の処理の依頼などを実現できます。

boto3.amazonaws.com boto3.amazonaws.com

全文は以下です。

実装全文を表示

import logging
import re
from textwrap import dedent
from typing import Any, Optional, Tuple
from uuid import uuid4

import boto3
from aws_durable_execution_sdk_python import (
    DurableContext,
    StepContext,
    durable_execution,
    durable_step,
)
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
from bedrock_agentcore.memory import MemoryClient
from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig
from bedrock_agentcore.memory.integrations.strands.session_manager import (
    AgentCoreMemorySessionManager,
)
from mcp.client.streamable_http import streamable_http_client
from pydantic import BaseModel, Field
from strands import Agent, tool
from strands.models import BedrockModel
from strands.tools.mcp import MCPClient

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

BUCKET = "your-bucket"
PREFIX = "agent-reports/"
MODEL_ID = "global.anthropic.claude-haiku-4-5-20251001-v1:0"
MEM_ID = "your-memory-id"
ACTOR_ID = "sample-htl-agent"

s3_client = boto3.client("s3")
memory_client = MemoryClient()

mcp_client = MCPClient(
    lambda: streamable_http_client("https://knowledge-mcp.global.api.aws")
)

ACTOR_ID_REGEX = re.compile(
    r"^[A-Za-z0-9][A-Za-z0-9\-_\/]*(?::[A-Za-z0-9\-_\/]+)*[A-Za-z0-9\-_\/]*$"
)
SESSION_ID_REGEX = re.compile(r"^[A-Za-z0-9][A-Za-z0-9\-_]*$")


def sanitize_identifier(
    raw_value: str,
    *,
    allowed_specials: str,
    pattern: re.Pattern[str],
    fallback_prefix: str,
) -> str:
    """IDのサニタイズ."""

    allow_colon = ":" in allowed_specials
    allowed_chars = set(allowed_specials)
    if allow_colon:
        allowed_chars.add(":")

    cleaned = "".join(ch for ch in raw_value if ch.isalnum() or ch in allowed_chars)

    if allow_colon and ":" in cleaned:
        cleaned = ":".join(segment for segment in cleaned.split(":") if segment)
    elif not allow_colon and ":" in cleaned:
        cleaned = cleaned.replace(":", "")

    cleaned = cleaned.lstrip(allowed_specials.replace(":", ""))
    if not cleaned or not cleaned[0].isalnum():
        cleaned = ""

    if cleaned and pattern.fullmatch(cleaned):
        return cleaned

    return f"{fallback_prefix}-{uuid4().hex[:10]}"


def ensure_safe_ids(actor_id: str, session_id: str) -> Tuple[str, str]:
    """actor_idとsession_idのサニタイズ."""

    safe_actor_id = sanitize_identifier(
        actor_id,
        allowed_specials="-_/:",
        pattern=ACTOR_ID_REGEX,
        fallback_prefix="actor",
    )

    safe_session_id = sanitize_identifier(
        session_id,
        allowed_specials="-_",
        pattern=SESSION_ID_REGEX,
        fallback_prefix="session",
    )

    return safe_actor_id, safe_session_id


class AgentOutput(BaseModel):
    """レポート作成結果のスキーマ定義"""

    report_content: str = Field(
        description="レポートの内容。マークダウン形式で作成すること"
    )
    file_name: str = Field(description="ファイル名。拡張子.mdをつけること")
    denied_message: Optional[str] = Field(
        default=None,
        description="S3への保存が承認されなかった場合のメッセージ。保存しようとしていた内容の概要と、承認されなかったことを簡潔に説明すること。",
    )


@tool(context=True)
def upload_report(doc: str, file_name) -> bool:
    """S3に調査レポートをアップロードします

    Args:
        doc (str): レポート内容
        file_name (_type_): アップロードするファイル名

    Returns:
        bool: アップロード成功有無
    """
    logger.info("Uploading report to %s/%s", BUCKET, PREFIX + file_name)
    try:
        s3_client.put_object(
            Bucket=BUCKET,
            Key=f"{PREFIX}{file_name}",
            Body=doc.encode("utf-8"),
            ContentType="text/markdown",
        )
    except Exception:
        logger.exception("Failed to upload report to %s/%s", BUCKET, PREFIX + file_name)
        return False
    return True


def create_session(session_id: str) -> AgentCoreMemorySessionManager:
    # AgentCore Memoryの設定値
    safe_actor_id, safe_session_id = ensure_safe_ids(ACTOR_ID, session_id)
    agentcore_memory_config = AgentCoreMemoryConfig(
        memory_id=MEM_ID, session_id=safe_session_id, actor_id=safe_actor_id
    )

    # セッションマネージャーを作成
    session_manager = AgentCoreMemorySessionManager(
        agentcore_memory_config=agentcore_memory_config, region_name="us-east-1"
    )

    return session_manager


def define_agent(tools: Any, memory_session: AgentCoreMemorySessionManager) -> Agent:
    agent = Agent(
        model=BedrockModel(
            model_id=MODEL_ID,
            streaming=False,
        ),
        system_prompt=dedent("""
                                AWSについて調査レポートを作成してください。
                                調査時には必ずAWS Knowledge MCPを使用してください。
                                マークダウン形式で出力し、レポート結果をS3にアップロードしてください。
                                """),
        tools=tools,
        session_manager=memory_session,
        callback_handler=None,
        structured_output_model=AgentOutput,
    )
    return agent


@durable_step
def create_report(step_context: StepContext, prompt: str, session_id: str) -> Any:
    step_context.logger.info(
        "【CREATE_REPORT】調査レポートを作成します。",
    )
    session = create_session(session_id=session_id)
    with mcp_client:
        aws_search_tools = mcp_client.list_tools_sync()
        agent: Agent = define_agent(aws_search_tools, memory_session=session)

        result: AgentOutput = agent(prompt).structured_output

    step_context.logger.info(
        "MESSAGE: %s", result.model_dump_json(indent=2, ensure_ascii=False)
    )
    return result.model_dump()


@durable_step
def send_for_approval(
    step_context: StepContext, callback_id: str, file_name: str
) -> dict:
    """承認リクエストをユーザに送信"""
    step_context.logger.info(
        "【SEND_APPROVAL】%sのS3へのアップロードの承認をお願いします。 %s",
        file_name,
        callback_id,
    )

    # 実際に利用する場合には、ここでメール送信やSNS通知などを実装する
    # send_email(to=approver_email, subject="レポート保存承認リクエスト", body=message)

    return {
        "file_name": file_name,
        "callback_id": callback_id,
        "status": "sent_for_approval",
    }


@durable_step
def save_report(
    step_context: StepContext,
    session_id,
) -> Any:
    """レポートをS3に保存する"""
    step_context.logger.info(
        "【SAVE_REPORT】S3にレポートを保存します。",
    )
    session = create_session(session_id=session_id)
    agent: Agent = define_agent([upload_report], memory_session=session)

    result: AgentOutput = agent("作成したレポートを保存してください").structured_output

    step_context.logger.info(
        "MESSAGE: %s", result.model_dump_json(indent=2, ensure_ascii=False)
    )
    return result.model_dump()


@durable_step
def denied_report(
    step_context: StepContext,
    session_id: str,
) -> Any:
    """レポートをS3に保存する"""
    step_context.logger.info(
        "【DENIED_REPORT】S3への保存が承認されませんでした。",
    )
    session = create_session(session_id=session_id)
    agent: Agent = define_agent([], memory_session=session)

    result: AgentOutput = agent(
        "レポートの保存が承認されませんでした。"
    ).structured_output

    step_context.logger.info(
        "MESSAGE: %s", result.model_dump_json(indent=2, ensure_ascii=False)
    )
    return result.model_dump()


@durable_execution
def lambda_handler(event, context: DurableContext):
    prompt = event["prompt"]
    session_id = context.state.durable_execution_arn.split(":")[-1]

    # Step1: レポートを作成
    created_report = context.step(
        create_report(prompt, session_id), name="create-report"
    )

    # Step2: コールバックを作成する
    context.logger.info("コールバックを作成完了")
    callback = context.create_callback(
        name="user_approval",
        config=CallbackConfig(
            timeout=Duration.from_hours(24), heartbeat_timeout=Duration.from_hours(1)
        ),
    )
    context.logger.info("コールバックを作成完了: %s", callback.callback_id)
    
    context.logger.info("承認リクエストを送信開始")
    approval_request = context.step(
        send_for_approval(callback.callback_id, created_report["file_name"]),
    )
    context.logger.info("承認リクエストを送信完了: %s", approval_request)

    # Step3: コールバックの結果を取得
    context.logger.info("コールバック結果を取得: %s", callback.callback_id)
    approval_result = callback.result()
    context.logger.info("承認結果を取得: %s", approval_result)

    # Step4: 承認結果に基づいて条件分岐 - Remove lambda wrappers
    if approval_result:
        context.logger.info("承認されました。レポートを保存します。")
        saved_report = context.step(save_report(session_id), name="save-report")
        return saved_report
    else:
        context.logger.info("承認されませんでした。レポートは保存しません。")
        saved_report = context.step(denied_report(session_id), name="denied-report")
        return saved_report

また、デプロイはAWS SAMで実施しました。
通常のLambdaの定義に以下の「DurableConfig」を追加することで設定可能です。

docs.aws.amazon.com

DurableFunctionsHtlAgentFunction:
Type: AWS::Serverless::Function
Properties:
    FunctionName: durable-functions-htl-agent
    CodeUri: src/
    Handler: main.lambda_handler
    DurableConfig:
        ExecutionTimeout: 7776000  # 90 days
    Role:
        Fn::GetAtt:
            - DurableFunctionsHtlAgentRole
            - Arn

4.実行結果

4.1 Lambda Durable Functions を使った Human-in-the-Loop な Agent の実行

ではデプロイされた Lambda を実行してみます。
AWS コンソールの Lambda の画面から以下の入力で Lambda を実行しました。

Lambda Durable Functions 実行時の入力例

一点注意点としては、 Lambda Durable Functions でも同期実行では Lambda の最大実行時間は15分となります。
それ以上時間がかかる場合は非同期での呼び出しが推奨されています。

docs.aws.amazon.com

実行結果は以下の形で表示されます。
上記実装で定義したレポート作成の実施後にユーザへの承認を送信して、待機していることが分かります。
待機状態の間は課金がされないため、長時間の待機時間があるワークフロー処理を安価に実現できます。

ユーザ承認待ち状態で停止している Durable Execution

4.2 Agent のアクションを承認する

承認結果を送信するにはコンソール上から以下の「送信成功」/「送信失敗」のどれかを選択することで実施できます。
この操作を行わない限り Lambda は待機状態を維持します。

コンソールから承認結果を送信する操作画面

では「送信成功」を選択した結果です。
以下の形で調査レポートのアップロード処理が行われているとがわかります。

承認成功後に実行されるレポート保存処理

また、S3 上へもレポートが想定通り出力されていました。

S3 にアップロードされた調査レポート

4.2 Agent のアクションを否認する

また「送信失敗」を選択した結果は以下となります。
承認が拒否されたため、否認時の処理を行い S3 へのアップロードは行われていませんでした。

承認失敗時にレポート保存が行われない実行結果

5. まとめ

Lambda Durable Functions を使った Human-in-the-Loop な Agent を作成しました。
特にAgent周りでは今回実施したユーザの承認を求める場合以外にも、長時間実行されるツールを利用する際などでも利用できる機能だと感じました。
マルチステップな Agent を実装する際には利用する機会が増えていきそうです。

Acroquest Technologyでは、キャリア採用を行っています。

  • Azure OpenAI/Amazon Bedrock等を使った生成AIソリューションの開発
  • ディープラーニング等を使った自然言語/画像/音声/動画解析の研究開発
  • マイクロサービス、DevOps、最新のOSSクラウドサービスを利用する開発プロジェクト
  • 書籍・雑誌等の執筆や、社内外での技術の発信・共有によるエンジニアとしての成長

少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。

www.wantedly.com




以上の内容はhttps://acro-engineer.hatenablog.com/entry/2026/01/30/120000より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14