Bedrock Flows本番運用3ヶ月で気づいた、LLMワークフロー自動化の地味だけど痛い落とし穴

AIワークフロー自動化って聞くと華やかだけど、Bedrock Flows本番運用で直面した現実。LLMのばらつき、エラーハンドリング、想定外のコスト構造。実装前に知っておくべき課題を共有します。

Bedrock Flows、マジで導入する前に知っておくべきことがある

正直に言うと、うちのチームがBedrock Flowsを本番導入したのは去年の10月。それから3ヶ月間、地味だけど重い課題の連続に直面しました。「AIワークフロー自動化」って聞くと華やかに聞こえるけど、実際に運用すると、LLMのばらつき、エラーハンドリング、そしてコスト構造の複雑さに悩まされるんですよ。今回は、その辺をぶっちゃけで話したいと思う。

僕たちが導入した背景としては、複雑な営業フォローアップメール生成(複数モデルの判定→ドラフト生成→レビュー→送信)を自動化するためでした。従来のLambda+API呼び出しだと、エラーハンドリングやリトライロジックが散在していて、保守性が地獄。Flowsなら「ビジュアルワークフロー」で全体像を把握できるし、ノード単位でエラーハンドリングを設定できるんじゃないか、という期待でした。

結論からいうと、期待値の50%は達成できたけど、残りの50%は予期しない問題で消えた。その理由を、これからお話しします。

Bedrock Flowsの基本構造(と、うちの実装)

まず、Bedrock Flowsの全体像を図に落としておきます。これが実装するときの基本フローです。

graph TB
    Start([フロー開始<br/>入力データ]) --> Input[入力ノード]
    Input --> Router{ルーティング<br/>判定}
    Router -->|高優先度| Model1["Claude 3.7 Sonnet<br/>詳細生成"]
    Router -->|標準| Model2["Claude 3.5 Sonnet<br/>軽量生成"]
    Model1 --> Review[レビュー<br/>ノード]
    Model2 --> Review
    Review --> Validator{バリデーション<br/>通過?}
    Validator -->|失敗| Retry[リトライ]
    Retry --> Model1
    Validator -->|成功| Store["DynamoDB<br/>保存"]
    Store --> SNS["SNS通知<br/>メール送信"]
    SNS --> End([完了])

実装としてはこんな感じで、AWS全体の構成図も入れておきます。

graph TB
    subgraph "API層"
        APIGateway["API Gateway"]
    end
    subgraph "AWS リージョン"
        subgraph "Bedrock Flows"
            Flow1["フロー: メール生成"]
            Flow1 -->|ノード群| Node1["入力・ルーティング"]
            Node1 -->|条件分岐| Node2["モデル呼び出し"]
            Node2 -->|結果処理| Node3["バリデーション"]
            Node3 -->|エラー時| Node4["リトライ・ロギング"]
        end
        subgraph "LLMモデル層"
            Claude37["Claude 3.7 Sonnet<br/>ap-northeast-1"]
            Claude35["Claude 3.5 Sonnet<br/>ap-northeast-1"]
        end
        subgraph "ストレージ・通知"
            DynamoDB["DynamoDB<br/>実行結果・履歴"]
            SNS["SNS<br/>メール通知"]
            CloudWatch["CloudWatch Logs<br/>デバッグ"]
        end
    end
    subgraph "管理層"
        CloudWatchMetrics["CloudWatch<br/>メトリクス"]
    end
    
    APIGateway --> Flow1
    Flow1 --> Claude37
    Flow1 --> Claude35
    Flow1 --> DynamoDB
    Flow1 --> SNS
    Flow1 --> CloudWatch
    CloudWatch --> CloudWatchMetrics

シンプルに見えるけど、ここからが地獄ですよ。

実装してハマった3つの落とし穴

1. LLMの出力がマジで不安定。バリデーションの沼

最初は甘く見てました。「Sonnetで1000回呼び出せば、900回は想定通りの出力が得られるだろう」って思ってた。現実は違った。

営業フォローアップメール生成で、返すべき形式は以下の通り:

{
  "subject": "string",
  "body": "string",
  "sentiment": "positive|neutral|negative",
  "action_required": "boolean"
}

当然のことながら、LLMは時々こんなのを返すんですよ:

{
  "subject": "Follow-up on Your Recent Inquiry",
  "body": "...",
  "sentiment": "POSITIVE",
  "action_required": "yes"
}

sentimentが大文字だし、action_requiredは文字列です。または完全にこれ:

{
  "subject": "Follow-up",
  "body": "...",
  "action_required": true
}

sentimentが丸々抜けてる。Bedrock Flowsのバリデーションノードって、JSON Schemaで厳密に定義できるんですけど、ここが大事:落ちたときのリトライ戦略が弱いんですよ。

うちが実装した対策がこれ:

# Lambda内で、バリデーション失敗時の処理
def validate_and_normalize(llm_output: dict) -> dict:
    """LLM出力を正規化。失敗時は例外を上げてリトライへ"""
    errors = []
    
    # sentimentの正規化
    if 'sentiment' in llm_output:
        sentiment = str(llm_output['sentiment']).lower()
        if sentiment not in ['positive', 'neutral', 'negative']:
            errors.append(f"Invalid sentiment: {sentiment}")
        else:
            llm_output['sentiment'] = sentiment
    else:
        errors.append("Missing sentiment")
    
    # action_requiredの正規化
    if 'action_required' in llm_output:
        ar = llm_output['action_required']
        if isinstance(ar, bool):
            pass  # OK
        elif isinstance(ar, str):
            if ar.lower() in ['true', 'yes']:
                llm_output['action_required'] = True
            elif ar.lower() in ['false', 'no']:
                llm_output['action_required'] = False
            else:
                errors.append(f"Cannot parse action_required: {ar}")
        else:
            errors.append(f"Invalid type for action_required")
    else:
        errors.append("Missing action_required")
    
    if errors:
        raise ValidationError(f"Validation failed: {', '.join(errors)}")
    
    return llm_output

ここがポイント:Bedrock Flowsのバリデーションノードで「失敗時は条件分岐で別ノードへ」って設定できるんですが、その別ノードで「同じモデルに再度プロンプトを投げる」ってのが効果的でした。

ただし、大事なのはここ。再度呼び出すときはプロンプトに「前回の出力」と「期待される形式」を明示する。こうするとLLMが学習して、2回目の成功率が75%くらいに上がるんですよ:

Prompt:
You previously returned:
{
  "subject": "...",
  "sentiment": "POSITIVE"
}

But sentiment must be lowercase: positive, neutral, or negative.
Please fix this and reply ONLY with valid JSON, no markdown or explanation.

これを仕込むだけで、再試行での成功率がぐっと上がりました。正直、この「前回出力を含める」工夫って、LLMのエラーハンドリングの中で一番効果的だと感じます。

2. エラーハンドリングの粒度が思ったより粗い

Bedrock Flowsって、ノード単位でエラーハンドリングをカスタマイズできるけど、実際のエラー情報がめっちゃ少ないんですよ。

たとえば、Claude 3.5 Sonnetへのリクエストがタイムアウトしたとき、デフォルトだと:

Error: ThrottlingException

これだけ。いつタイムアウトしたのか、リトライすべき?それとも別のモデルに切り替える?全然わかりません。正直、このレベルのエラー情報では本番運用は無理だと感じました。

うちが実装した対策としては、Lambda関数内で詳細なロギングを仕込むこと。Bedrock Flowsのノード設定で「エラー時はこのLambdaを呼べ」みたいな設定ができるので、そこで詳細情報を記録します:

import json
import logging
from datetime import datetime

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

def error_handler_lambda(event, context):
    """Bedrock Flowsからのエラーを詳細にログして、対応を決定"""
    
    error_info = event.get('errorInfo', {})
    node_id = event.get('nodeId', 'unknown')
    timestamp = datetime.utcnow().isoformat()
    
    logger.info(json.dumps({
        'timestamp': timestamp,
        'node_id': node_id,
        'error_type': error_info.get('errorType', 'unknown'),
        'error_message': error_info.get('message', ''),
        'error_code': error_info.get('code', ''),
        'retry_count': event.get('retryCount', 0),
        'input_data': event.get('input', {})
    }))
    
    # エラー種別による対応
    error_type = error_info.get('errorType', '')
    
    if 'ThrottlingException' in error_type:
        logger.warning(f"Throttling detected at node {node_id}. Need exponential backoff.")
        return {
            'action': 'retry_with_backoff',
            'backoff_ms': min(2 ** event.get('retryCount', 0) * 1000, 30000)
        }
    elif 'AccessDenied' in error_type:
        logger.error(f"Access denied at node {node_id}. Check IAM permissions.")
        return {
            'action': 'fail',
            'reason': 'permission_error'
        }
    elif 'InvalidInput' in error_type:
        logger.error(f"Invalid input at node {node_id}. Data validation failed.")
        return {
            'action': 'fail_with_notification',
            'notify_channel': 'slack'
        }
    else:
        logger.error(f"Unknown error at node {node_id}: {error_info}")
        return {
            'action': 'escalate',
            'escalate_to': 'oncall_engineer'
        }

このLambdaをBedrock Flowsの「エラーハンドラーノード」として設定すると、CloudWatch Logsが詳細なエラー情報でいっぱいになります。その上で、Slack通知やPage Dutyエスカレーションを自動化できる。

地味だけど超重要:CloudWatch Logsをちゃんと構造化ログ(JSON形式)で吐いておくと、後でCloudWatch Insightsで「あの日の午後2時に何が起きたのか」って追跡できるんですよ。これ、本番運用では何度助けられたか。

3. コスト構造が思ったより複雑。予算が一気に膨れる

これが一番ショックでした。Bedrock Flowsって「フロー実行ごとに課金」されるんじゃなくて、フロー内のモデル呼び出しがいくつあるかで課金されます。

うちの場合、1件のメール生成で:

  1. ルーティング判定(軽量呼び出し)
  2. メイン生成(Claude 3.7 or 3.5)
  3. バリデーション失敗時の再生成(平均3回に1回発生)
  4. 管理用のログ出力(追加呼び出し)

つまり、「ユーザーが1回フローを実行する」=「内部では3〜4回のモデル呼び出しが発生」。さらに、リトライロジックを仕込んだから、失敗時はさらに2〜3回追加呼び出し

の結果、最初の月は予想の3倍のコストが発生しました。ショックです。

xychart-beta
    title "Bedrock Flows 月別コスト推移(最初の3ヶ月)"
    x-axis [10月, 11月, 12月]
    y-axis "コスト (USD)" 0 --> 12000
    line [3000, 9200, 5800]

グラフを見るとわかる通り、11月にめっちゃ跳ねてます。これは「バリデーションリトライ」と「エラーハンドリングのLambda呼び出し」が増えたから。デバッグのために本来不要な呼び出しが発生してたんですよ。

うちが対策として実装したのが、以下の3つ:

(1)バッチ化とモデル選択の最適化

def route_to_optimal_model(request: dict) -> str:
    """リクエストの複雑さに応じてモデルを選択"""
    
    # 複雑度スコアを計算
    complexity_score = 0
    complexity_score += len(request.get('history', [])) * 0.5  # 履歴が長い=複雑
    complexity_score += 1 if request.get('requires_sentiment_analysis') else 0
    complexity_score += 2 if request.get('legal_compliance_check') else 0
    
    # スコアに応じて使い分け
    if complexity_score > 4:
        # 複雑なリクエスト→高精度モデル(高コスト)
        return 'claude-3-7-sonnet'
    elif complexity_score > 1.5:
        # 中程度→標準モデル
        return 'claude-3-5-sonnet'
    else:
        # シンプル→軽量モデル
        return 'claude-3-5-haiku'

(2)キャッシングの活用

Claude SDKでPrompt Cachingという機能が使えます。これを使うと、同じ長いコンテキスト(たとえば「営業マニュアル」)を何度も送信するときに、2回目以降はキャッシュから読まれるのでトークン削減率は50%以上になるんですよ:

from anthropic import Anthropic

client = Anthropic()

# 長い営業マニュアルをキャッシュする
MANUAL_CONTENT = """...(5000トークン以上の営業マニュアル)..."""

response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    system=[
        {
            "type": "text",
            "text": "You are a sales email generator.",
        },
        {
            "type": "text",
            "text": MANUAL_CONTENT,
            "cache_control": {"type": "ephemeral"}
        }
    ],
    messages=[
        {
            "role": "user",
            "content": "Generate a follow-up email for this customer: ..."
        }
    ]
)

print(f"Input tokens: {response.usage.input_tokens}")
print(f"Cache creation tokens: {response.usage.cache_creation_input_tokens}")
print(f"Cache read tokens: {response.usage.cache_read_input_tokens}")

(3)実行ログの最適化

デバッグのために「すべてのモデル出力をLogsに吐く」なんてやってると、ストレージコスト+API呼び出しが増えます。本番環境では「エラー時だけ詳細ログ」に切り替えました。

こういう最適化をやると、11月の9200ドルから12月は5800ドルまで下げられました。まだ予定の2倍ですけど、かなりマシになりました。

Bedrock Flows 2026年の新機能と、それが解決した問題

2026年春のアップデートで、いくつか有用な機能が追加されました。個人的には、これらが「本番運用をずっと楽に」したと感じます:

機能説明うちのユースケース
フロー間呼び出しあるフローから別のフローを呼び出せる複雑なワークフローをマイクロサービス的に分割。保守性が大幅に向上
ダイナミック・プロンプト注入フロー実行時にプロンプトを動的に変更A/B テストが容易に。同じフロー構造で別プロンプト版を試せる
CloudWatch メトリクス統合フロー実行のメトリクスが自動でCloudWatchに投稿されるダッシュボード構築が簡単に。リアルタイム監視が可能

特に「フロー間呼び出し」が便利で、複雑な処理を「メイン フロー」「バリデーション フロー」「通知 フロー」に分割できるようになった。各フローのテスト・保守が楽になりました。テストコード書く時間が半減したんじゃないかな。

Bedrock Flows 実装のベストプラクティス(うちが学んだこと)

1. 本番導入前に「リトライ戦略」を明確にする

LLMは必ず間違えます。その前提で戦略を立てなきゃダメ。「3回までリトライ」「異なるモデルにフェイルオーバー」「それでもダメなら手動確認待ち」みたいな段階を最初から設計しておく。後付けすると、必ずどこかでバグります。

2. 詳細ロギングとメトリクスは必須

CloudWatch Logsへの構造化ログ出力とメトリクス発行を最初から組み込む。後から「あの日何があったのか」を追跡するのに、めっちゃ時間かかります。マジで。

3. コスト監視を月単位じゃなく週単位で

フロー実行数が一気に増えると予想外にコスト跳ねます。AWS Cost Anomaly Detectionを設定して「いつもと違う支出」を自動アラートさせる。これで11月のような地獄は避けられます。

4. モデル選択の自動化

「いつもClaudeの最新版」みたいなベタな選択じゃなくて、リクエストの複雑さに応じて自動選択。Haiku〜Opusでコスト効率は大きく変わります。

まとめ

Bedrock Flowsは確かに便利です。複雑なLLMワークフロー(モデル呼び出し→バリデーション→リトライ→通知)を「コード行数少なく」実装できるのは強い。ただし、本番運用には以下の3点を忘れずに:

  1. LLMの出力は必ず検証し、エラー時のリトライ戦略を最初から仕込む(バリデーション失敗率は数%では済まない)

  2. エラーハンドリングはBedrock Flows単体では不十分。Lambda関数で詳細ログを吐いて、CloudWatch Logsを活用する

  3. コスト構造を過小評価しない。モデル選択・キャッシング・バッチ化で早期から最適化を進める

次のステップとしては、イベント駆動アーキテクチャとBedrock Flowsの組み合わせも試してみたいと思ってます。SQS/SNSじゃなくてKafkaで大量のメール生成タスクをさばく場合、フロー実行を非同期に切り離すデザインが必要になりそう。

皆さんはBedrock Flows、導入してみました?もし実装してたら、ぜひコメントで教えてください。

U

Untanbaby

ソフトウェアエンジニア|AWS / クラウドアーキテクチャ / DevOps

10年以上のIT実務経験をもとに、現場で使える技術情報を発信しています。 記事の誤りや改善点があればお問い合わせからお気軽にご連絡ください。

関連記事