Bedrock Agentsのマルチステップ設計で本番が崩壊した話|8ヶ月の実装記録
Bedrock Agentsで業務自動化エージェントを本番運用して8ヶ月。状態が消える・コストが読めない・オーケストレーションが崩れる……同じ苦しみを味わった人に届けたい、現場の実装記録です。
Bedrock Agents マルチステップ設計で本番が崩壊した話|状態管理・オーケストレーション実装の現実
先日、社内の業務自動化エージェントを Bedrock Agents で本番構築してから約8ヶ月が経った。最初の3ヶ月は正直ボロボロで、マルチステップのオーケストレーションが思った通りに動かない・状態が消える・コストが読めないという三重苦に苦しんだ。このあたりの「本番で踏んでから気づく」類の話は Bedrock Flows を本番で6ヶ月使って見えてきたこと でも書いたんだけど、Agents固有の問題はまた別の深さがある。今回はその実装記録をまとめておく。
マルチステップ設計に入る前に——Bedrock Agentsの現在地(2026年6月)
2026年6月時点のBedrock Agentsは、2024年GA当初からかなり変わった。特に大きいのは以下の4点だ。
- マルチエージェントコラボレーション(Supervisor + Sub-agents構成) が東京リージョンでも安定稼働
- インラインエージェント(Inline Agents API) でセッションごとにエージェント設定を動的変更可能になった
- Custom Orchestration でLambdaによる独自オーケストレーションロジックが書けるようになった
- Memory(長期記憶) のストレージ設定が柔軟化され、セッション跨ぎの状態保持が現実的になった
正直、1年前の情報で設計すると今は全然違う構成になることが多い。特にCustom Orchestrationの登場は設計の自由度を大きく変えた。変化のスピードが速すぎて、半年前の記事を参照するだけだと普通に詰まる。
| 機能 | 2025年初頭 | 2026年6月 |
|---|---|---|
| マルチエージェントコラボレーション | 一部リージョンのみ | 全主要リージョン対応 |
| Custom Orchestration | なし | Lambda連携で実装可能 |
| インラインエージェント | ベータ | GA済み |
| メモリ(長期記憶) | セッション内のみ | セッション跨ぎ対応(S3/RDS連携) |
| Knowledge Base統合 | 基本RAGのみ | Metadata Filteringが強化 |
この変化を把握した上でアーキテクチャを組み直したのが、僕たちのプロジェクトで転換点になった。
実際のシステム構成——マルチステップ業務自動化エージェント
構築したのは、社内の「受注→在庫確認→発注承認→通知」という4ステップの業務フローをAgentsで自動化するシステムだ。最初は1エージェント・1アクショングループで全部やろうとして盛大に失敗した。
今の構成はこうなっている。
graph TB
subgraph Client["クライアント層"]
WebApp["社内Webアプリ"]
Slack["Slack Bot"]
end
subgraph Bedrock["Amazon Bedrock"]
Supervisor["Supervisorエージェント\nClaude 3.7 Sonnet"]
subgraph SubAgents["Sub-agents"]
OrderAgent["受注処理エージェント"]
InventoryAgent["在庫確認エージェント"]
ApprovalAgent["承認フローエージェント"]
end
KB["Knowledge Base\n(製品・規定情報)"]
Memory["Agent Memory\n(長期記憶)"]
end
subgraph Lambda["Action Group Lambda"]
OrderLambda["order-handler"]
InventoryLambda["inventory-checker"]
ApprovalLambda["approval-workflow"]
NotifyLambda["notification-sender"]
end
subgraph Data["データ層"]
RDS[("Aurora PostgreSQL\n受注DB")]
DynamoDB[("DynamoDB\n在庫テーブル")]
S3[("S3\nメモリストレージ")]
EventBridge["EventBridge\n承認イベント"]
end
subgraph VPC["VPC (10.0.0.0/16)"]
subgraph PrivateSubnet["Private Subnet"]
RDS
DynamoDB
end
end
WebApp -->|InvokeAgent API| Supervisor
Slack -->|InvokeAgent API| Supervisor
Supervisor -->|委譲| OrderAgent
Supervisor -->|委譲| InventoryAgent
Supervisor -->|委譲| ApprovalAgent
OrderAgent -->|Action| OrderLambda
InventoryAgent -->|Action| InventoryLambda
ApprovalAgent -->|Action| ApprovalLambda
ApprovalAgent -->|Action| NotifyLambda
Supervisor <-->|RAG| KB
Supervisor <-->|状態参照| Memory
OrderLambda --> RDS
InventoryLambda --> DynamoDB
ApprovalLambda --> EventBridge
NotifyLambda --> Slack
Memory --> S3
この構成に落ち着くまでに3回設計を壊した。順番に話す。
失敗パターン①:1エージェント全部乗せの地雷
最初の実装は「Supervisorとかオーバーエンジニアリングじゃないか?」という感覚で、1エージェントにアクショングループを4つ付けた構成にした。これが最初の地雷だった。
何が起きたか
受注→在庫→承認の3ステップを一連で処理するとき、モデルが途中のステップ結果を「忘れる」問題が頻発した。具体的にはこういうケースだ。
ユーザー: 「受注ID 10234の在庫確認して、問題なければ承認フローに回して」
期待動作:
Step1: get_order(id=10234) → 受注情報取得
Step2: check_inventory(product_id=A001, qty=100) → 在庫OK確認
Step3: submit_approval(order_id=10234, status="ready") → 承認提出
実際の動作(問題時):
Step1: get_order(id=10234) → 受注情報取得 ✓
Step2: check_inventory(product_id=A001, qty=100) → 在庫OK ✓
Step3: submit_approval(order_id=???, status="ready")
→ order_idがNullになる ✗
原因はPromptのコンテキストウィンドウ管理と、アクション間の変数の受け渡し設計が曖昧だったことだ。Bedrockのオーケストレーションはモデルが自律的に判断するため、ステップ間で明示的に変数を引き継ぐ仕組みを設計しないと容易に情報が落ちる。「ちゃんと覚えてくれてるだろう」という甘い期待が一番ダメだった。
対策:Custom Orchestrationへの切り替え
2026年から使えるCustom Orchestrationを使うと、Lambdaでオーケストレーションロジックを明示的に書ける。実際のコードはこうなった。
import json
import boto3
from typing import Optional
bedrock_agent_runtime = boto3.client('bedrock-agent-runtime', region_name='ap-northeast-1')
def lambda_handler(event: dict, context) -> dict:
"""
Custom Orchestration Lambda
Bedrock Agentsからオーケストレーション制御を受け取る
"""
invocation_type = event.get('actionInvocationType')
if invocation_type == 'ORCHESTRATION':
return handle_orchestration(event)
return {"statusCode": 400, "body": "Unsupported invocation type"}
def handle_orchestration(event: dict) -> dict:
session_attributes = event.get('sessionAttributes', {})
agent_input = event.get('inputText', '')
# セッション状態から引き継ぎデータを取得
order_context = session_attributes.get('order_context', {})
if isinstance(order_context, str):
order_context = json.loads(order_context) if order_context else {}
# ステップを明示的にシーケンス制御
current_step = order_context.get('step', 'ORDER_FETCH')
print(f"Current step: {current_step}, Context: {order_context}")
if current_step == 'ORDER_FETCH':
return build_action_response(
action_group='order-actions',
api_path='/get_order',
parameters={'order_id': extract_order_id(agent_input)},
next_step='INVENTORY_CHECK',
context=order_context
)
elif current_step == 'INVENTORY_CHECK':
order_data = order_context.get('order_data', {})
return build_action_response(
action_group='inventory-actions',
api_path='/check_inventory',
parameters={
'product_id': order_data.get('product_id'),
'quantity': order_data.get('quantity')
},
next_step='APPROVAL_SUBMIT',
context=order_context
)
elif current_step == 'APPROVAL_SUBMIT':
return build_action_response(
action_group='approval-actions',
api_path='/submit_approval',
parameters={
'order_id': order_context.get('order_id'), # 明示的に保持
'inventory_status': order_context.get('inventory_status')
},
next_step='NOTIFY',
context=order_context
)
elif current_step == 'NOTIFY':
return finalize_with_notification(order_context)
# 予期しないステップ
return build_final_response("処理完了", order_context)
def build_action_response(
action_group: str,
api_path: str,
parameters: dict,
next_step: str,
context: dict
) -> dict:
updated_context = {**context, 'step': next_step}
return {
'orchestrationResponse': {
'actionGroupInvocations': [{
'actionGroupName': action_group,
'apiPath': api_path,
'httpMethod': 'POST',
'parameters': parameters
}]
},
'sessionAttributes': {
'order_context': json.dumps(updated_context)
}
}
def extract_order_id(text: str) -> Optional[str]:
"""テキストから受注IDを抽出"""
import re
match = re.search(r'受注ID[\s::]*(\d+)', text)
return match.group(1) if match else None
このアプローチのポイントは sessionAttributes にJSONシリアライズしたコンテキストを明示的に持ち回すことだ。モデルに「覚えておいてくれ」と期待するのをやめた。これだけで「忘れる」系のバグが一気に減った。
失敗パターン②:メモリ設計の甘さで長期記憶が崩壊した
メモリ機能を有効にしたはいいが、デフォルト設定でそのまま使ったら3週間後にメモリストレージが肥大化してレスポンスが劣化した。地味に怖い壊れ方で、最初は何が原因かすらわからなかった。
メモリ取得レイテンシの悪化推移
デフォルト放置の3週間でレイテンシが約14倍まで膨らんだ。Week8で380msに戻ったのはメモリのクリーニングバッチを入れたからだ。
xychart-beta
title "Memory Retrieval Latency (ms) 経過週別"
x-axis ["Week1", "Week2", "Week3", "Week4", "Week5", "Week6", "Week7", "Week8"]
y-axis "Latency (ms)" 0 --> 3000
line [180, 210, 320, 680, 1240, 1890, 2540, 380]
bar [180, 210, 320, 680, 1240, 1890, 2540, 380]
Bedrock Agentsのメモリ機能は、デフォルトだとセッションサマリーをS3に無制限に積み上げる。関係性の薄い古いセッション情報まで全部ベクトル化してRetrievalしてしまうので、時間が経つほど関係ないコンテキストが混入してくる。「長期記憶が使える!」と喜んで有効にするだけでは罠にはまる。
対策として実装したメモリ管理パターンがこれ。
import boto3
from datetime import datetime, timedelta
bedrock_agent = boto3.client('bedrock-agent', region_name='ap-northeast-1')
s3 = boto3.client('s3')
def cleanup_agent_memory(agent_id: str, agent_alias_id: str):
"""
古いメモリセッションを定期クリーンアップする
EventBridge Schedulerで毎日深夜に実行
"""
cutoff_date = datetime.now() - timedelta(days=30) # 30日より古いものを削除
try:
# メモリのリスト取得(ページネーション対応)
paginator = bedrock_agent.get_paginator('list_agent_memory')
pages = paginator.paginate(
agentId=agent_id,
agentAliasId=agent_alias_id
)
deleted_count = 0
for page in pages:
for memory_item in page.get('memoryContents', []):
created_at = memory_item.get('createdAt')
if created_at and created_at < cutoff_date:
memory_id = memory_item['memoryId']
bedrock_agent.delete_agent_memory(
agentId=agent_id,
agentAliasId=agent_alias_id,
memoryId=memory_id
)
deleted_count += 1
print(f"Deleted {deleted_count} old memory items")
return deleted_count
except Exception as e:
print(f"Memory cleanup failed: {e}")
raise
def selective_memory_store(
session_id: str,
content: str,
memory_type: str = 'SESSION_SUMMARY',
importance_score: float = 0.5
):
"""
重要度スコアに基づいて選択的にメモリ保存
全セッションを保存せず、重要なものだけ残す
"""
if importance_score < 0.6: # 閾値以下は保存しない
print(f"Skipping low-importance memory: score={importance_score}")
return None
# DynamoDBにメタデータを保存してメモリ管理を補助
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('agent-memory-metadata')
table.put_item(Item={
'session_id': session_id,
'created_at': datetime.now().isoformat(),
'importance_score': str(importance_score),
'memory_type': memory_type,
'ttl': int((datetime.now() + timedelta(days=30)).timestamp())
})
return session_id
ここは正直まだ改善中で、「重要度スコア」をどう自動計算するかが課題として残っている。今はヒューリスティックで受注金額が一定以上かどうかで判定しているけど、もっとうまいやり方があるはずだ。誰かいい方法知ってたら教えてほしい。
失敗パターン③:Sub-agentsの並列実行でコストが爆発
マルチエージェントコラボレーションの設計で「在庫確認と価格チェックは並列でやれば速い」と思って同時に実行したら、月のBedrockコストが想定の2.3倍になった。気づいたのは請求確定後だったので余計に痛かった。
コスト内訳の比較
xychart-beta
title "月次 Bedrock Agent コスト比較 (USD)"
x-axis ["直列実行", "無計画並列", "最適化後並列"]
y-axis "コスト (USD)" 0 --> 5000
bar [1800, 4140, 2200]
問題の原因は2つあった。
- Supervisorエージェントのトークン消費:Sub-agentsを呼ぶたびにSupervisorが全コンテキストを再読み込みするため、並列数が増えるほど二次関数的にトークンが増える
- エラー時のリトライによるトークン爆発:並列実行中に1つがエラーになると、Supervisorが状況を再確認するために全コンテキストを使った呼び出しを追加で行う
対策として導入したのが「コンテキスト分割パターン」だ。
from dataclasses import dataclass
from typing import List
import asyncio
@dataclass
class AgentTask:
sub_agent_alias_arn: str
input_text: str
session_id: str
context: dict # 必要最小限のコンテキストのみ渡す
async def invoke_sub_agent_minimal_context(
task: AgentTask,
bedrock_runtime_client
) -> dict:
"""
Sub-agentには必要最小限のコンテキストだけ渡す
全履歴を渡さないことでトークン消費を抑制
"""
# コンテキストを必要なフィールドだけに絞る
minimal_context = {
'order_id': task.context.get('order_id'),
'product_id': task.context.get('product_id'),
# タスク固有の情報だけ
}
session_attrs = {
'task_context': json.dumps(minimal_context)
# 全履歴は渡さない!
}
response = bedrock_runtime_client.invoke_agent(
agentId=extract_agent_id(task.sub_agent_alias_arn),
agentAliasId=extract_alias_id(task.sub_agent_alias_arn),
sessionId=task.session_id,
inputText=task.input_text,
sessionState={
'sessionAttributes': session_attrs
},
enableTrace=False # 本番ではトレース無効化でコスト削減
)
return parse_agent_response(response)
async def parallel_sub_agents_with_budget(
tasks: List[AgentTask],
max_concurrent: int = 2, # 同時実行数を制限
bedrock_runtime_client = None
) -> List[dict]:
"""
同時実行数を制限して並列実行
無制限並列はコスト爆発の原因になる
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_invoke(task: AgentTask):
async with semaphore:
return await invoke_sub_agent_minimal_context(
task, bedrock_runtime_client
)
results = await asyncio.gather(
*[bounded_invoke(task) for task in tasks],
return_exceptions=True # 1つの失敗で全体が止まらないように
)
# エラーを分離してハンドリング
successes = []
errors = []
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append({'task_index': i, 'error': str(result)})
else:
successes.append(result)
if errors:
print(f"Sub-agent errors: {errors}")
# エラーが一定以上なら全体失敗として扱う
if len(errors) > len(tasks) * 0.5:
raise RuntimeError(f"Too many sub-agent failures: {errors}")
return successes
これでコストを当初の無計画並列から約47%削減できた。「Sub-agentsには全コンテキストを渡さない」という原則は、Bedrock Agentsを使う上で本当に重要だと思う。AIエージェント本番運用で学んだ痛い失敗でも書いたけど、コンテキストの肥大化はすべての問題の根源になりやすい。
オーケストレーション設計のフロー整理
今の設計がどう動いているかをシーケンスで整理するとこうなる。Custom Orchestration LambdaがすべてのステップでsessionAttributesにコンテキストを明示的に保存しているのがポイントで、これでモデルが「忘れる」問題がほぼなくなった。
sequenceDiagram
actor User
participant Supervisor as Supervisorエージェント
participant CustomOrch as Custom Orchestration Lambda
participant OrderAgent as 受注エージェント
participant InventoryAgent as 在庫エージェント
participant ApprovalAgent as 承認エージェント
participant KB as Knowledge Base
User->>Supervisor: 「受注10234を承認フローに回して」
Supervisor->>KB: 業務ルール検索
KB-->>Supervisor: 承認基準・フロー定義
Supervisor->>CustomOrch: オーケストレーション開始 (step=ORDER_FETCH)
CustomOrch->>OrderAgent: 受注情報取得
OrderAgent-->>CustomOrch: {order_id: 10234, product_id: A001, qty: 100}
CustomOrch->>CustomOrch: sessionAttributesにコンテキスト保存
CustomOrch->>InventoryAgent: 在庫確認 (minimal context)
InventoryAgent-->>CustomOrch: {status: OK, available: 500}
CustomOrch->>ApprovalAgent: 承認提出 (order_id=10234明示)
ApprovalAgent-->>CustomOrch: {approval_id: APR-789, status: pending}
CustomOrch-->>Supervisor: 全ステップ完了
Supervisor-->>User: 「受注10234の承認フローを開始しました(ID: APR-789)」
エラーハンドリングとリトライ戦略
Bedrockのアクション実行でのエラーハンドリングは最初まったく設計していなくて、後から大慌てで追加した。特に重要だと思うのはこの3点だ。
1. アクション冪等性の担保
アクションが途中でエラーになったとき、Bedrockはリトライをかけることがある。受注作成のような書き込み系アクションは必ず冪等にしておく必要がある。これを後回しにすると「同じ受注が2件登録されている」という最悪のインシデントになる。
import hashlib
from datetime import datetime
def create_order_idempotent(
order_data: dict,
idempotency_key: str # Agentがセッションキーとして生成
) -> dict:
"""
同じidempotency_keyでの重複リクエストを防ぐ
"""
dynamodb = boto3.resource('dynamodb')
idem_table = dynamodb.Table('idempotency-keys')
# 既存チェック
existing = idem_table.get_item(
Key={'idempotency_key': idempotency_key}
).get('Item')
if existing:
print(f"Duplicate request detected: {idempotency_key}")
return existing['result'] # 前回の結果をそのまま返す
# 新規実行
result = execute_order_creation(order_data)
# 結果を保存(TTL: 24時間)
idem_table.put_item(Item={
'idempotency_key': idempotency_key,
'result': result,
'created_at': datetime.now().isoformat(),
'ttl': int((datetime.now().timestamp()) + 86400)
})
return result
2. Dead Letter Queue による失敗の補足
Action Group Lambdaの失敗はSQS DLQで受け取って、後から人間がレビューできるようにしている。SQS・Kafka本番2年で後悔した話でも触れたけど、DLQの設計はエージェントシステムでも同じくらい重要だ。むしろAIエージェントの場合、「なぜその判断をしたのか」をDLQのペイロードから追跡できるかどうかが運用の明暗を分ける。
3. Guardrailsによる暴走防止
BedrockのGuardrailsを使って、エージェントが意図しない操作(大量の受注を一度に処理するなど)をしないようにコンテンツフィルタリングを設定した。システムプロンプトだけでは防ぎきれないケースが実際にあったので、Guardrailsは保険として必ず入れておくべきだと思う。
まとめ
8ヶ月本番で動かして、今は概ね安定している。最初に比べてエラー率は1/8くらいになって、コストも当初の想定に収まってきた。振り返ると失敗のほとんどは「モデルへの過信」が原因だったと思う。LLMは賢いけど、ステップ間の変数管理を任せるには向いていない。その部分は人間がコードで制御すべきだというのが、8ヶ月やって出した結論だ。
要点をまとめるとこうなる。
| # | 教訓 | 具体的な対策 |
|---|---|---|
| 1 | ステップ間の変数はコードで管理する | sessionAttributesに明示的にJSONシリアライズして持ち回す |
| 2 | Sub-agentsへのコンテキストは最小限に | 全履歴を渡さず、タスクに必要なフィールドだけ渡す |
| 3 | 並列実行には同時実行数の上限を設ける | asyncio.Semaphoreでmax_concurrent=2〜3に絞る |
| 4 | メモリは定期クリーニング必須 | EventBridge Schedulerで毎日削除バッチを回す |
| 5 | 冪等性とDLQは初日から設計する | 後から追加するのはかなり大変 |
次のアクションとして、現在検討しているのがAmazon Bedrock Inline Agentsを使ったユーザー別エージェント設定の動的変更だ。承認権限が担当者によって違うので、セッション開始時にロールに応じてエージェントの挙動を切り替えられると運用がだいぶ楽になる。うまくいったらまた書く。