EventBridge Pipes・Schedulerで本番サーバーレスを回した話

Lambda関数の変換ロジックをEventBridge Pipesで削減。複雑な定期実行もSchedulerで管理。実装して気づいた便利さと意外な落とし穴をシェアします。

EventBridge Pipsでイベント変換がめっちゃ楽になった件

先日プロジェクトで、複数のイベントソースをLambdaに流し込む設計をやってたんだけど、ずっと変換ロジックをLambda関数で書いてた。本当に面倒くさい。CloudFormationテンプレート見てもLambda関数の環境変数とロールの定義だらけで、単純な型変換のためだけにメンテコストがかかってる状態だったんですよ。

そこでEventBridge Pipsを本格的に試してみたら、これが想像以上に助かった。EventBridge Pipsって、ソース(EventBridge ルールやSQS、Kinesis、DynamoDB Streamsなど)からターゲット(Lambda、SQS、SNSなど)へのイベント流を中間で変換・フィルタリング・エンリッチメントできる機能なんです。2026年時点では、この使い方が成熟してきて、実装パターンも固まってきた感じですね。

実際にやった例としては、EC2のイベント(起動・停止)をSNSで通知するんだけど、通知メッセージには現在の時刻やインスタンスのメタデータを含めたい。従来なら、このエンリッチメント処理をLambdaで書いて、IAMロールでEC2の権限つけて…という流れだった。EventBridge Pipsなら、Input Transformerと静的なPOSTパラメータを組み合わせて、3行のJSON設定で実現できるんです。

{
  "Name": "ec2-to-sns-pipe",
  "Source": "aws.ec2",
  "SourceParameters": {
    "EventBridgeParameters": {
      "DetailType": ["EC2 Instance State-change Notification"]
    }
  },
  "Target": "arn:aws:sns:ap-northeast-1:123456789012:alert",
  "Enrichment": "arn:aws:lambda:ap-northeast-1:123456789012:function:enrich-ec2",
  "RoleArn": "arn:aws:iam::123456789012:role/EventBridgePipesRole",
  "TargetParameters": {
    "SNSTopicParameters": {
      "Subject": "EC2 Event: <instance-id>",
      "MessageAttributes": {
        "timestamp": {
          "DataType": "String",
          "StringValue": "$.time"
        }
      }
    }
  }
}

この例では、Enrichment(Lambda関数)を挟んで、EC2のイベント詳細を取得してからSNSで通知してる。ただ正直、最初はなかなか理解しにくかったです。SourceParameters、Filter、Enrichment、TargetParametersの組み合わせで何ができるのか、ドキュメント読んでも実際の運用イメージが湧きにくかったんですよね。

EventBridge Schedulerで複雑な定期実行を撃破

次にハマったのが定期実行の管理。従来はCloudWatch Events(EventBridge)でスケジュール定義して、それをLambdaで実行してた。シンプルなcron式だけなら問題ないんですけど、「毎月25日の15時 + 月末の23時」みたいに複数の時間指定が必要になると、ルールを複数作らないといけなくて面倒。さらに、スケジュール実行結果のリトライやDLQの設定も、Lambda側に全部押し付けられてた。

EventBridge Scheduler(2023年導入、2026年で機能がだいぶ充実)を使ったら、スケジュール定義、リトライ、DLQ設定がまるっと一箇所で管理できるようになったんです。

import boto3
import json
from datetime import datetime

scheduler_client = boto3.client('scheduler')

# 月末の23時に実行するスケジュール
response = scheduler_client.create_schedule(
    Name='monthly-backup-schedule',
    ScheduleExpression='cron(0 23 L * ? *)',  # 月末23時
    FlexibleTimeWindow={
        'Mode': 'FLEXIBLE',
        'MaximumWindowInMinutes': 15
    },
    Target={
        'Arn': 'arn:aws:scheduler:::aws-sdk:lambda:invoke',
        'RoleArn': 'arn:aws:iam::123456789012:role/SchedulerRole',
        'Input': json.dumps({
            'FunctionName': 'arn:aws:lambda:ap-northeast-1:123456789012:function:backup',
            'InvocationType': 'RequestResponse',
            'Payload': {
                'backup_type': 'monthly',
                'timestamp': datetime.now().isoformat()
            }
        })
    },
    RetryPolicy={
        'MaximumEventAge': 3600,  # 1時間以内に実行
        'MaximumRetryAttempts': 2
    },
    DeadLetterConfig={
        'Arn': 'arn:aws:sqs:ap-northeast-1:123456789012:backup-dlq'
    }
)

これで、リトライが失敗したらSQSのDLQに自動的に送られるし、実行時間のずれ(FlexibleTimeWindow)も柔軟に指定できる。個人的に気に入ってるのは、Rate Expression(例:rate(30 minutes))とCron Expression(例:cron(0 * * * ? *))を同じインターフェースで管理できる点。従来はCloudWatch Events とLambda ScheduleExpressionで書き方が違ってて、ちょっと混乱してたんですよね。これが統一されると、本当にメンテがラクになります。

AWS構成図:EventBridge Pipes × Schedulerの実装パターン

実装してみた構成図をシェアします。

graph TB
    subgraph "Event Sources"
        EC2["EC2 Instance<br/>Events"]
        SNSSource["SNS Topic"]
        SQSSource["SQS Queue"]
        DDB["DynamoDB<br/>Streams"]
    end

    subgraph "VPC & Processing"
        Pipe1["EventBridge<br/>Pipes"]
        Pipe2["EventBridge<br/>Pipes"]
        Filter["Filter &<br/>Transform"]
        Enrich["Lambda<br/>Enrichment"]
    end

    subgraph "Scheduler & Targets"
        Scheduler["EventBridge<br/>Scheduler"]
        Lambda1["Lambda<br/>Backup"]
        Lambda2["Lambda<br/>Cleanup"]
        SNSTarget["SNS<br/>Notification"]
        SQSTarget["SQS<br/>Queue"]
        DLQ["SQS<br/>Dead Letter<br/>Queue"]
    end

    EC2 -->|EC2 State Change| Pipe1
    SNSSource -->|Raw Event| Pipe2
    SQSSource -->|Message| Pipe2
    
    Pipe1 --> Filter
    Pipe2 --> Filter
    Filter -->|Filtered Event| Enrich
    
    Enrich -->|Enriched Data| SNSTarget
    Enrich -->|Context Data| SQSTarget
    
    Scheduler -->|Cron Schedule| Lambda1
    Scheduler -->|Rate Schedule| Lambda2
    
    Lambda1 --> SNSTarget
    Lambda2 -->|Retry Failed| DLQ
    
    DDB -->|Stream Record| Pipe1

実装してみて気づいた、EventBridge Pipes と Scheduler の使い分けですが、こんな感じですね:

  • EventBridge Pipes:リアルタイムイベント駆動(EC2起動、S3 PutObject、SQSメッセージなど)の変換・フィルタリングが必要な時に使う
  • EventBridge Scheduler:定期的に実行するバッチ処理やクリーンアップタスクに最適

正直まだ検証中な部分もあります。Pipsの Enrichment 関数でエラーが出た時、リトライポリシーがどう動くかは、ドキュメント読むだけじゃ曖昧だったから、本番で実運用してる最中なんです。

落とし穴:入力変換とVPC設定

導入してから「あ、これハマるな」と思ったポイントをいくつか。

1. Input Transformer の JSONPath が複雑すぎる

EventBridge Pipes の Input Transformer は、$.detail.instance-id みたいなJSONPathで値を抽出するんですけど、ネストが深いとすぐ複雑になります。特に、AWS SDK呼び出しの結果をそのままパスするような場合、ドキュメント通りに書いても動かないことが何度かありました。

解決策としては、複雑な変換は Enrichment Lambda に全部任せる方が運用しやすいです。Pipsの責務は「イベントのルーティング」くらいに留めて、実ロジックはLambdaで書く。こっちの方が、あとで変更がある時もテストしやすいし、デバッグも楽ですよ。

2. Scheduler のリトライは「時刻に基づく」じゃなく「作成時刻から相対」

これめっちゃハマった。月末スケジュール(cron(0 23 L * ? *))が 23:00 に実行に失敗して、リトライで翌日の 00:05 に再実行されるのかと思ってました。実際は、スケジュール作成時刻から「MaximumEventAge」の範囲内なら何度もリトライされる仕様だったんです。つまり、23:00 に失敗→リトライ→翌日 23:00 のスケジュールは別の実行として扱われる。

この仕様を知らずに、「同じスケジュールが1日に何度も走ってる」バグ報告を受けて、ログ追跡するのに時間かかったんですよ。運用してみるまで気づきにくい部分だから、本当に注意が必要です。

3. EventBridge Pipes と VPC

Pipes が VPC 内のリソースにアクセスする場合、ENI(Elastic Network Interface)を使ってVPCに接続する必要があるんです。これ、ドキュメントに明記されてるんですけど、実装時に見落としがち。特に、Enrichment Lambda が VPC 内のRDSやElastiCacheを参照する場合、Pipes 自体も同じ VPC・サブネット・セキュリティグループで設定しないと、タイムアウトで失敗します。

AWS構成図でも追加するなら、こんな感じになります:

flowchart TB
    EventSource["Event Source"]
    
    subgraph VPC["VPC"]
        subgraph AZ1["AZ-1a"]
            ENI1["ENI for Pipes"]
            Lambda["Enrichment<br/>Lambda"]
            RDS[("RDS<br/>PostgreSQL")]
        end
        subgraph AZ1b["AZ-1c"]
            ENI2["ENI for Pipes"]
        end
    end
    
    Target["Target<br/>SNS/SQS"]
    
    EventSource --> Pipes{"EventBridge<br/>Pipes"}
    Pipes --> ENI1
    Pipes --> ENI2
    ENI1 --> Lambda
    Lambda --> RDS
    Pipes --> Target

実装するなら、Pipes のセキュリティグループとLambdaのセキュリティグループを別に作って、Pipes→Lambda→RDS という通信フローを意識した設定が必要になります。ここをミスると通信がタイムアウトして、原因特定に時間かかるんですよね。

まとめ

EventBridge Pipes と Scheduler は、サーバーレス設計をかなりシンプルにしてくれる。特に以下の3点が実装の効率化に繋がりました:

1. EventBridge Pipes で、Lambda 関数を「ルーティング専用」にできる 変換ロジックを Pipes に移すことで、Lambda の責務が明確になります。テストとメンテが楽になるし、何より関数の役割が一目瞭然です。

2. EventBridge Scheduler のリトライ・DLQ が標準装備 定期実行タスクの失敗処理が built-in されてるから、Lambda 関数側で独自のリトライ実装が不要になるんです。これ地味に便利。

3. ただし、複雑な変換は Enrichment Lambda に任せる方が無難 Pipes の JSONPath は単純な値抽出程度に留めて、ビジネスロジックはLambdaで書いた方が後々楽です。責務の分離が大事。

2026年時点では、EventBridge 周辺機能はかなり充実してきた。イベント駆動設計を本気でやるなら、この辺りの組み合わせを検討する価値は十分あります。うちのチームでは、新規プロジェクトでこの構成を採用したら、Lambda 関数の数が3分の1に減った。運用コストも落ちたし、何より意図が明確になった感じです。

いずれにせよ、本番投入する前に、VPC設定とリトライポリシーは実装を通じて検証しておくことをお勧めします。ドキュメント通りだと思ってても、細かい挙動は実運用してみないと見えてこないんですよ。

U

Untanbaby

ソフトウェアエンジニア|AWS / クラウドアーキテクチャ / DevOps

10年以上のIT実務経験をもとに、現場で使える技術情報を発信しています。 記事の誤りや改善点があればお問い合わせからお気軽にご連絡ください。

関連記事