Bedrock Flows本番運用3ヶ月で気づいた、LLMワークフロー自動化の地味だけど痛い落とし穴
AIワークフロー自動化って聞くと華やかだけど、Bedrock Flows本番運用で直面した現実。LLMのばらつき、エラーハンドリング、想定外のコスト構造。実装前に知っておくべき課題を共有します。
Bedrock Flows、マジで導入する前に知っておくべきことがある
正直に言うと、うちのチームがBedrock Flowsを本番導入したのは去年の10月。それから3ヶ月間、地味だけど重い課題の連続に直面しました。「AIワークフロー自動化」って聞くと華やかに聞こえるけど、実際に運用すると、LLMのばらつき、エラーハンドリング、そしてコスト構造の複雑さに悩まされるんですよ。今回は、その辺をぶっちゃけで話したいと思う。
僕たちが導入した背景としては、複雑な営業フォローアップメール生成(複数モデルの判定→ドラフト生成→レビュー→送信)を自動化するためでした。従来のLambda+API呼び出しだと、エラーハンドリングやリトライロジックが散在していて、保守性が地獄。Flowsなら「ビジュアルワークフロー」で全体像を把握できるし、ノード単位でエラーハンドリングを設定できるんじゃないか、という期待でした。
結論からいうと、期待値の50%は達成できたけど、残りの50%は予期しない問題で消えた。その理由を、これからお話しします。
Bedrock Flowsの基本構造(と、うちの実装)
まず、Bedrock Flowsの全体像を図に落としておきます。これが実装するときの基本フローです。
graph TB
Start([フロー開始<br/>入力データ]) --> Input[入力ノード]
Input --> Router{ルーティング<br/>判定}
Router -->|高優先度| Model1["Claude 3.7 Sonnet<br/>詳細生成"]
Router -->|標準| Model2["Claude 3.5 Sonnet<br/>軽量生成"]
Model1 --> Review[レビュー<br/>ノード]
Model2 --> Review
Review --> Validator{バリデーション<br/>通過?}
Validator -->|失敗| Retry[リトライ]
Retry --> Model1
Validator -->|成功| Store["DynamoDB<br/>保存"]
Store --> SNS["SNS通知<br/>メール送信"]
SNS --> End([完了])
実装としてはこんな感じで、AWS全体の構成図も入れておきます。
graph TB
subgraph "API層"
APIGateway["API Gateway"]
end
subgraph "AWS リージョン"
subgraph "Bedrock Flows"
Flow1["フロー: メール生成"]
Flow1 -->|ノード群| Node1["入力・ルーティング"]
Node1 -->|条件分岐| Node2["モデル呼び出し"]
Node2 -->|結果処理| Node3["バリデーション"]
Node3 -->|エラー時| Node4["リトライ・ロギング"]
end
subgraph "LLMモデル層"
Claude37["Claude 3.7 Sonnet<br/>ap-northeast-1"]
Claude35["Claude 3.5 Sonnet<br/>ap-northeast-1"]
end
subgraph "ストレージ・通知"
DynamoDB["DynamoDB<br/>実行結果・履歴"]
SNS["SNS<br/>メール通知"]
CloudWatch["CloudWatch Logs<br/>デバッグ"]
end
end
subgraph "管理層"
CloudWatchMetrics["CloudWatch<br/>メトリクス"]
end
APIGateway --> Flow1
Flow1 --> Claude37
Flow1 --> Claude35
Flow1 --> DynamoDB
Flow1 --> SNS
Flow1 --> CloudWatch
CloudWatch --> CloudWatchMetrics
シンプルに見えるけど、ここからが地獄ですよ。
実装してハマった3つの落とし穴
1. LLMの出力がマジで不安定。バリデーションの沼
最初は甘く見てました。「Sonnetで1000回呼び出せば、900回は想定通りの出力が得られるだろう」って思ってた。現実は違った。
営業フォローアップメール生成で、返すべき形式は以下の通り:
{
"subject": "string",
"body": "string",
"sentiment": "positive|neutral|negative",
"action_required": "boolean"
}
当然のことながら、LLMは時々こんなのを返すんですよ:
{
"subject": "Follow-up on Your Recent Inquiry",
"body": "...",
"sentiment": "POSITIVE",
"action_required": "yes"
}
sentimentが大文字だし、action_requiredは文字列です。または完全にこれ:
{
"subject": "Follow-up",
"body": "...",
"action_required": true
}
sentimentが丸々抜けてる。Bedrock Flowsのバリデーションノードって、JSON Schemaで厳密に定義できるんですけど、ここが大事:落ちたときのリトライ戦略が弱いんですよ。
うちが実装した対策がこれ:
# Lambda内で、バリデーション失敗時の処理
def validate_and_normalize(llm_output: dict) -> dict:
"""LLM出力を正規化。失敗時は例外を上げてリトライへ"""
errors = []
# sentimentの正規化
if 'sentiment' in llm_output:
sentiment = str(llm_output['sentiment']).lower()
if sentiment not in ['positive', 'neutral', 'negative']:
errors.append(f"Invalid sentiment: {sentiment}")
else:
llm_output['sentiment'] = sentiment
else:
errors.append("Missing sentiment")
# action_requiredの正規化
if 'action_required' in llm_output:
ar = llm_output['action_required']
if isinstance(ar, bool):
pass # OK
elif isinstance(ar, str):
if ar.lower() in ['true', 'yes']:
llm_output['action_required'] = True
elif ar.lower() in ['false', 'no']:
llm_output['action_required'] = False
else:
errors.append(f"Cannot parse action_required: {ar}")
else:
errors.append(f"Invalid type for action_required")
else:
errors.append("Missing action_required")
if errors:
raise ValidationError(f"Validation failed: {', '.join(errors)}")
return llm_output
ここがポイント:Bedrock Flowsのバリデーションノードで「失敗時は条件分岐で別ノードへ」って設定できるんですが、その別ノードで「同じモデルに再度プロンプトを投げる」ってのが効果的でした。
ただし、大事なのはここ。再度呼び出すときはプロンプトに「前回の出力」と「期待される形式」を明示する。こうするとLLMが学習して、2回目の成功率が75%くらいに上がるんですよ:
Prompt:
You previously returned:
{
"subject": "...",
"sentiment": "POSITIVE"
}
But sentiment must be lowercase: positive, neutral, or negative.
Please fix this and reply ONLY with valid JSON, no markdown or explanation.
これを仕込むだけで、再試行での成功率がぐっと上がりました。正直、この「前回出力を含める」工夫って、LLMのエラーハンドリングの中で一番効果的だと感じます。
2. エラーハンドリングの粒度が思ったより粗い
Bedrock Flowsって、ノード単位でエラーハンドリングをカスタマイズできるけど、実際のエラー情報がめっちゃ少ないんですよ。
たとえば、Claude 3.5 Sonnetへのリクエストがタイムアウトしたとき、デフォルトだと:
Error: ThrottlingException
これだけ。いつタイムアウトしたのか、リトライすべき?それとも別のモデルに切り替える?全然わかりません。正直、このレベルのエラー情報では本番運用は無理だと感じました。
うちが実装した対策としては、Lambda関数内で詳細なロギングを仕込むこと。Bedrock Flowsのノード設定で「エラー時はこのLambdaを呼べ」みたいな設定ができるので、そこで詳細情報を記録します:
import json
import logging
from datetime import datetime
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
def error_handler_lambda(event, context):
"""Bedrock Flowsからのエラーを詳細にログして、対応を決定"""
error_info = event.get('errorInfo', {})
node_id = event.get('nodeId', 'unknown')
timestamp = datetime.utcnow().isoformat()
logger.info(json.dumps({
'timestamp': timestamp,
'node_id': node_id,
'error_type': error_info.get('errorType', 'unknown'),
'error_message': error_info.get('message', ''),
'error_code': error_info.get('code', ''),
'retry_count': event.get('retryCount', 0),
'input_data': event.get('input', {})
}))
# エラー種別による対応
error_type = error_info.get('errorType', '')
if 'ThrottlingException' in error_type:
logger.warning(f"Throttling detected at node {node_id}. Need exponential backoff.")
return {
'action': 'retry_with_backoff',
'backoff_ms': min(2 ** event.get('retryCount', 0) * 1000, 30000)
}
elif 'AccessDenied' in error_type:
logger.error(f"Access denied at node {node_id}. Check IAM permissions.")
return {
'action': 'fail',
'reason': 'permission_error'
}
elif 'InvalidInput' in error_type:
logger.error(f"Invalid input at node {node_id}. Data validation failed.")
return {
'action': 'fail_with_notification',
'notify_channel': 'slack'
}
else:
logger.error(f"Unknown error at node {node_id}: {error_info}")
return {
'action': 'escalate',
'escalate_to': 'oncall_engineer'
}
このLambdaをBedrock Flowsの「エラーハンドラーノード」として設定すると、CloudWatch Logsが詳細なエラー情報でいっぱいになります。その上で、Slack通知やPage Dutyエスカレーションを自動化できる。
地味だけど超重要:CloudWatch Logsをちゃんと構造化ログ(JSON形式)で吐いておくと、後でCloudWatch Insightsで「あの日の午後2時に何が起きたのか」って追跡できるんですよ。これ、本番運用では何度助けられたか。
3. コスト構造が思ったより複雑。予算が一気に膨れる
これが一番ショックでした。Bedrock Flowsって「フロー実行ごとに課金」されるんじゃなくて、フロー内のモデル呼び出しがいくつあるかで課金されます。
うちの場合、1件のメール生成で:
- ルーティング判定(軽量呼び出し)
- メイン生成(Claude 3.7 or 3.5)
- バリデーション失敗時の再生成(平均3回に1回発生)
- 管理用のログ出力(追加呼び出し)
つまり、「ユーザーが1回フローを実行する」=「内部では3〜4回のモデル呼び出しが発生」。さらに、リトライロジックを仕込んだから、失敗時はさらに2〜3回追加呼び出し。
の結果、最初の月は予想の3倍のコストが発生しました。ショックです。
xychart-beta
title "Bedrock Flows 月別コスト推移(最初の3ヶ月)"
x-axis [10月, 11月, 12月]
y-axis "コスト (USD)" 0 --> 12000
line [3000, 9200, 5800]
グラフを見るとわかる通り、11月にめっちゃ跳ねてます。これは「バリデーションリトライ」と「エラーハンドリングのLambda呼び出し」が増えたから。デバッグのために本来不要な呼び出しが発生してたんですよ。
うちが対策として実装したのが、以下の3つ:
(1)バッチ化とモデル選択の最適化
def route_to_optimal_model(request: dict) -> str:
"""リクエストの複雑さに応じてモデルを選択"""
# 複雑度スコアを計算
complexity_score = 0
complexity_score += len(request.get('history', [])) * 0.5 # 履歴が長い=複雑
complexity_score += 1 if request.get('requires_sentiment_analysis') else 0
complexity_score += 2 if request.get('legal_compliance_check') else 0
# スコアに応じて使い分け
if complexity_score > 4:
# 複雑なリクエスト→高精度モデル(高コスト)
return 'claude-3-7-sonnet'
elif complexity_score > 1.5:
# 中程度→標準モデル
return 'claude-3-5-sonnet'
else:
# シンプル→軽量モデル
return 'claude-3-5-haiku'
(2)キャッシングの活用
Claude SDKでPrompt Cachingという機能が使えます。これを使うと、同じ長いコンテキスト(たとえば「営業マニュアル」)を何度も送信するときに、2回目以降はキャッシュから読まれるのでトークン削減率は50%以上になるんですよ:
from anthropic import Anthropic
client = Anthropic()
# 長い営業マニュアルをキャッシュする
MANUAL_CONTENT = """...(5000トークン以上の営業マニュアル)..."""
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=[
{
"type": "text",
"text": "You are a sales email generator.",
},
{
"type": "text",
"text": MANUAL_CONTENT,
"cache_control": {"type": "ephemeral"}
}
],
messages=[
{
"role": "user",
"content": "Generate a follow-up email for this customer: ..."
}
]
)
print(f"Input tokens: {response.usage.input_tokens}")
print(f"Cache creation tokens: {response.usage.cache_creation_input_tokens}")
print(f"Cache read tokens: {response.usage.cache_read_input_tokens}")
(3)実行ログの最適化
デバッグのために「すべてのモデル出力をLogsに吐く」なんてやってると、ストレージコスト+API呼び出しが増えます。本番環境では「エラー時だけ詳細ログ」に切り替えました。
こういう最適化をやると、11月の9200ドルから12月は5800ドルまで下げられました。まだ予定の2倍ですけど、かなりマシになりました。
Bedrock Flows 2026年の新機能と、それが解決した問題
2026年春のアップデートで、いくつか有用な機能が追加されました。個人的には、これらが「本番運用をずっと楽に」したと感じます:
| 機能 | 説明 | うちのユースケース |
|---|---|---|
| フロー間呼び出し | あるフローから別のフローを呼び出せる | 複雑なワークフローをマイクロサービス的に分割。保守性が大幅に向上 |
| ダイナミック・プロンプト注入 | フロー実行時にプロンプトを動的に変更 | A/B テストが容易に。同じフロー構造で別プロンプト版を試せる |
| CloudWatch メトリクス統合 | フロー実行のメトリクスが自動でCloudWatchに投稿される | ダッシュボード構築が簡単に。リアルタイム監視が可能 |
特に「フロー間呼び出し」が便利で、複雑な処理を「メイン フロー」「バリデーション フロー」「通知 フロー」に分割できるようになった。各フローのテスト・保守が楽になりました。テストコード書く時間が半減したんじゃないかな。
Bedrock Flows 実装のベストプラクティス(うちが学んだこと)
1. 本番導入前に「リトライ戦略」を明確にする
LLMは必ず間違えます。その前提で戦略を立てなきゃダメ。「3回までリトライ」「異なるモデルにフェイルオーバー」「それでもダメなら手動確認待ち」みたいな段階を最初から設計しておく。後付けすると、必ずどこかでバグります。
2. 詳細ロギングとメトリクスは必須
CloudWatch Logsへの構造化ログ出力とメトリクス発行を最初から組み込む。後から「あの日何があったのか」を追跡するのに、めっちゃ時間かかります。マジで。
3. コスト監視を月単位じゃなく週単位で
フロー実行数が一気に増えると予想外にコスト跳ねます。AWS Cost Anomaly Detectionを設定して「いつもと違う支出」を自動アラートさせる。これで11月のような地獄は避けられます。
4. モデル選択の自動化
「いつもClaudeの最新版」みたいなベタな選択じゃなくて、リクエストの複雑さに応じて自動選択。Haiku〜Opusでコスト効率は大きく変わります。
まとめ
Bedrock Flowsは確かに便利です。複雑なLLMワークフロー(モデル呼び出し→バリデーション→リトライ→通知)を「コード行数少なく」実装できるのは強い。ただし、本番運用には以下の3点を忘れずに:
-
LLMの出力は必ず検証し、エラー時のリトライ戦略を最初から仕込む(バリデーション失敗率は数%では済まない)
-
エラーハンドリングはBedrock Flows単体では不十分。Lambda関数で詳細ログを吐いて、CloudWatch Logsを活用する
-
コスト構造を過小評価しない。モデル選択・キャッシング・バッチ化で早期から最適化を進める
次のステップとしては、イベント駆動アーキテクチャとBedrock Flowsの組み合わせも試してみたいと思ってます。SQS/SNSじゃなくてKafkaで大量のメール生成タスクをさばく場合、フロー実行を非同期に切り離すデザインが必要になりそう。
皆さんはBedrock Flows、導入してみました?もし実装してたら、ぜひコメントで教えてください。