SQS/SNS連携パターンをチームで整理し直した話|Fan-outからDLQ設計まで

「なんとなく動いてるけどベストプラクティスか自信ない」そんな状態からECリニューアルで3ヶ月かけて本番導入。Fan-out・フィルタリング・FIFO選択の判断基準を正直にまとめました。

うちのチームでSQS/SNSの連携パターンを整理し直した話

先日、ECサービスのリニューアルプロジェクトでSQS/SNS周りのアーキテクチャを一から設計し直す機会があって、改めて「このパターン、意外と整理されていないな」と思った。

SQSとSNSって単体ではよく使うんだけど、組み合わせると途端に選択肢が増える。Fan-outパターンにするのか、フィルタリングで一本化するのか、それともSNS FIFOまで使うのか。チーム内でも「なんとなく動いてるけどベストプラクティスかどうか自信ない」みたいな状況が続いていた。

今回は3ヶ月かけて本番環境に導入し、運用してみてわかったことを正直にまとめていく。SQSのパフォーマンス最適化やDLQ設計の詳細については以前書いたSQS・Kafka完全ガイド2026|DLQ設計とスループット最適化を実装コードで解説も参照してほしい。


パターン別の使い分け:3つの主要構成

まず整理しておきたいのが、SQS/SNSの組み合わせパターンだ。よく使うのは以下の3つで、それぞれ「どんな場面で選ぶか」が結構はっきり違う。

パターン1:Fan-out(1対多配信)

SNSトピックに対して複数のSQSキューをサブスクライブさせるやつ。イベントを複数のコンシューマーに届けたいときの定番パターンで、個人的にはこれが一番使う頻度が高い。

flowchart TB
    subgraph Producer["Producer Layer"]
        APP["アプリケーション"]
    end

    subgraph SNS_Layer["SNS Fan-out"]
        TOPIC["SNS Topic\n(order-events)"]
    end

    subgraph Consumer_Layer["Consumer Layer"]
        SQS1["SQS Queue\n(notification-queue)"]
        SQS2["SQS Queue\n(inventory-queue)"]
        SQS3["SQS Queue\n(analytics-queue)"]
        
        LAMBDA1["Lambda\n(メール通知)"]
        LAMBDA2["Lambda\n(在庫更新)"]
        LAMBDA3["Lambda\n(分析処理)"]
    end

    APP --> TOPIC
    TOPIC --> SQS1
    TOPIC --> SQS2
    TOPIC --> SQS3
    SQS1 --> LAMBDA1
    SQS2 --> LAMBDA2
    SQS3 --> LAMBDA3

Fan-outの利点は「疎結合」なんだけど、実務で地味に気をつけたのはキューごとにDLQを設定することだった。SQSに直接Lambdaをトリガーさせるとき、DLQを忘れると失敗したメッセージが静かに消えていく。これは本当に痛い。気づいたときにはもう手遅れ、みたいな事態になりかねない。

パターン2:SNSメッセージフィルタリング

2026年現在、SNSのサブスクリプションフィルタポリシーがかなり強化されていて、numeric フィルタで数値範囲指定や、exists 演算子によるキー存在チェックもできる。

{
  "Filter": {
    "eventType": ["order.placed", "order.cancelled"],
    "orderAmount": [{">=": 1000}],
    "isPremiumUser": [true]
  }
}

これを使うと、1つのSNSトピックから用途別のSQSキューに振り分けられる。以前は受け取ったメッセージをLambda側でフィルタリングしてたんだけど、SNS側でやったほうがコスト的にも有利なんですよね。Lambdaの実行回数が減るので。

ただ、フィルタポリシーはサブスクリプションごとに設定するので、管理が複雑になりがちだ。IaCで管理しないとすぐカオスになる(Terraformでの管理についてはTerraformで3年分の失敗から学んだ、State管理とAI検証の正解が参考になる)。

パターン3:FIFO トピック + FIFO キュー

順序保証が必要なケースで使う。金融系や在庫管理で「二重処理を絶対に防ぎたい」という要件があるときの選択肢だ。

import boto3
import hashlib
import json

sns = boto3.client('sns', region_name='ap-northeast-1')

def publish_fifo_event(order_id: str, event: dict) -> dict:
    """
    SNS FIFOトピックへのメッセージ送信
    MessageGroupId で順序を保証
    MessageDeduplicationId で重複排除
    """
    message_body = json.dumps(event)
    # デデュプリケーションIDはメッセージ内容のハッシュ
    dedup_id = hashlib.sha256(
        f"{order_id}:{event['eventType']}:{event['timestamp']}".encode()
    ).hexdigest()[:128]

    response = sns.publish(
        TopicArn='arn:aws:sns:ap-northeast-1:123456789012:order-events.fifo',
        Message=message_body,
        MessageGroupId=order_id,          # 同一注文IDは順序保証
        MessageDeduplicationId=dedup_id,  # 5分以内の重複を排除
        MessageAttributes={
            'eventType': {
                'DataType': 'String',
                'StringValue': event['eventType']
            }
        }
    )
    return response

# 使用例
publish_fifo_event(
    order_id='ORD-2026-00123',
    event={
        'eventType': 'order.placed',
        'timestamp': '2026-05-01T10:00:00Z',
        'items': [{'sku': 'ITEM-001', 'qty': 2}]
    }
)

FIFOは便利なんだけど、スループットに制限がある(SNS FIFO は1秒あたり300件のパブリッシュ、バッチ時は3000件)。正直、高スループットが必要なら素直にKafkaを検討したほうがいい。SQSとKafkaの選定基準についてはSQS vs Kafka 完全比較2026|選定基準・コスト・パフォーマンス解説で詳しく書いたので参考に。


本番構成:ECサービスのアーキテクチャ全体像

今回実際に導入したECサービスの構成はこんな感じ。VPC内のサービスからSNSにイベントを発行して、各ドメインのSQSキューに配信している。

graph TB
    subgraph VPC["VPC (ap-northeast-1)"]
        subgraph AZ_A["AZ: ap-northeast-1a"]
            subgraph APP_LAYER["Application Layer"]
                API["API Gateway\n(REST)"]
                ORDER_SVC["Order Service\n(Lambda)"]
            end
        end

        subgraph AZ_B["AZ: ap-northeast-1c"]
            subgraph DB_LAYER["Data Layer"]
                RDS[("Aurora\nPostgreSQL")]
                ELASTICACHE[("ElastiCache\n(Redis)")]
            end
        end
    end

    subgraph MESSAGING["Messaging Layer (マネージドサービス)"]
        SNS_ORDER["SNS Topic\norder-events\n(Standard)"]
        SNS_INVENTORY["SNS Topic\ninventory-events\n(FIFO)"]

        subgraph QUEUES["SQS Queues"]
            SQS_NOTIFY["SQS\nnotification-queue"]
            SQS_SEARCH["SQS\nsearch-index-queue"]
            SQS_ANALYTICS["SQS\nanalytics-queue"]
            SQS_INVENTORY["SQS FIFO\ninventory-queue.fifo"]

            DLQ_NOTIFY["DLQ\nnotification-dlq"]
            DLQ_SEARCH["DLQ\nsearch-index-dlq"]
            DLQ_INVENTORY["DLQ\ninventory-dlq.fifo"]
        end
    end

    subgraph CONSUMERS["Consumer Layer"]
        L_NOTIFY["Lambda\n通知処理"]
        L_SEARCH["Lambda\n検索インデックス更新"]
        L_ANALYTICS["Lambda\n分析イベント集計"]
        L_INVENTORY["Lambda\n在庫同期"]
        ALARM["CloudWatch Alarm\nDLQメッセージ数監視"]
    end

    subgraph EXTERNAL["External Services"]
        SES["Amazon SES\nメール送信"]
        OPENSEARCH["Amazon OpenSearch"]
        FIREHOSE["Kinesis Firehose\n→ S3 / Redshift"]
    end

    API --> ORDER_SVC
    ORDER_SVC --> RDS
    ORDER_SVC --> SNS_ORDER
    ORDER_SVC --> SNS_INVENTORY

    SNS_ORDER --> SQS_NOTIFY
    SNS_ORDER --> SQS_SEARCH
    SNS_ORDER --> SQS_ANALYTICS
    SNS_INVENTORY --> SQS_INVENTORY

    SQS_NOTIFY --> L_NOTIFY
    SQS_SEARCH --> L_SEARCH
    SQS_ANALYTICS --> L_ANALYTICS
    SQS_INVENTORY --> L_INVENTORY

    SQS_NOTIFY -.->|maxReceiveCount: 3| DLQ_NOTIFY
    SQS_SEARCH -.->|maxReceiveCount: 3| DLQ_SEARCH
    SQS_INVENTORY -.->|maxReceiveCount: 5| DLQ_INVENTORY

    L_NOTIFY --> SES
    L_SEARCH --> OPENSEARCH
    L_ANALYTICS --> FIREHOSE
    L_INVENTORY --> ELASTICACHE

    DLQ_NOTIFY --> ALARM
    DLQ_SEARCH --> ALARM
    DLQ_INVENTORY --> ALARM

設計上のポイントをいくつか補足しておく。

SNSトピックを用途別に分けた理由:最初は全イベントを1つのトピックにまとめていたんだけど、フィルタポリシーが複雑になりすぎた。在庫系はFIFOが必要だったので、アーキテクチャ的にもトピックを分けるほうが自然だったし、結果的にすっきりした。

DLQのアラームは必須:DLQにメッセージが1件でも来たらSlackに通知するようにしている。「気づいたらDLQに溜まってた」が一番怖い。インシデント対応の観点ではインシデント対応の最新ベストプラクティス2026|DevOps・SRE必読も参考になる。

可視性タイムアウトの設定:Lambdaの最大実行時間より長く設定すること。Lambdaのtimeoutが30秒なら、visibility timeoutは最低でも60秒は欲しい。これを守らないと同じメッセージが複数のLambdaで並行処理されてしまう。


実装で詰まったポイントと解決策

正直、最初は「SQS + SNSってドキュメント通りにやれば動くでしょ」と思ってた。でも実際に本番運用するといくつかハマりどころがあった。

メッセージ属性のエンコード問題

SNSからSQSへのメッセージは、Raw Message Delivery をOFFにするとSNSのJSONエンベロープにラップされる。これを知らずにSQS側のLambdaでそのまま json.loads(event['body']) すると全然違う構造が来てビビる。チームの誰かが必ず一度はハマるやつだ。

import json

def lambda_handler(event, context):
    for record in event['Records']:
        # SQSレコードのbodyはSNSのエンベロープ
        sqs_body = json.loads(record['body'])
        
        # Raw Message Deliveryがオフの場合、
        # sqs_body['Message']が実際のペイロード
        if 'TopicArn' in sqs_body:
            # SNS経由メッセージ
            actual_message = json.loads(sqs_body['Message'])
            message_attributes = sqs_body.get('MessageAttributes', {})
        else:
            # 直接SQSに送られたメッセージ
            actual_message = sqs_body
            message_attributes = {}
        
        print(f"処理中: {actual_message}")
        process_event(actual_message, message_attributes)

def process_event(message: dict, attributes: dict):
    event_type = message.get('eventType')
    print(f"イベントタイプ: {event_type}")
    # 処理ロジック

うちは Raw Message Delivery を有効にしてSNSエンベロープをスキップするようにした。ただし、そうするとフィルタポリシーでメッセージ属性ベースのフィルタリングが使えなくなるので注意が必要だ。

Lambda の並列実行数とバッチサイズのチューニング

SQSイベントソースマッピングの設定、最初は batchSize: 10 のデフォルトのまま使ってたんだけど、重い処理が混ざるとLambdaが1つのバッチを処理しきれずタイムアウトして、その10件が全部DLQ行きになるという事故があった。これは結構焦った。

// CDKでの設定例
const eventSource = new SqsEventSource(queue, {
  batchSize: 5,                           // デフォルト10から削減
  maxBatchingWindow: Duration.seconds(5), // バッチウィンドウ
  reportBatchItemFailures: true,          // 部分失敗対応(重要!)
});

notificationLambda.addEventSource(eventSource);

reportBatchItemFailures は本当に大事で、これを設定するとバッチ内の一部メッセージだけ失敗させられる。設定しないとバッチ全体が失敗扱いになる。

# Lambda関数側でも対応が必要
def lambda_handler(event, context):
    batch_item_failures = []
    
    for record in event['Records']:
        try:
            sqs_body = json.loads(record['body'])
            process_message(sqs_body)
        except Exception as e:
            print(f"処理失敗: {record['messageId']}, error: {str(e)}")
            # 失敗したメッセージIDだけ返す
            batch_item_failures.append(
                {"itemIdentifier": record['messageId']}
            )
    
    return {"batchItemFailures": batch_item_failures}

これを導入してからDLQ行きのメッセージが格段に減った。マジで助かった。

スループットの実測値

3ヶ月の運用で収集したデータを共有する。4月のピーク(セール期間)がなかなかインパクトのある数字だったので、グラフにしておく。

xychart-beta
    title "SQS処理スループット(メッセージ/秒)"
    x-axis ["1月", "2月", "3月", "4月(ピーク期)", "5月"]
    y-axis "処理数(msg/sec)" 0 --> 2500
    bar [320, 480, 650, 2100, 890]
    line [300, 450, 600, 2000, 850]

4月が一時2000 msg/secを超えたけど、SQSは問題なくスケールしてくれた。むしろLambdaの同時実行数の上限(デフォルト1000)のほうが先に問題になったので、Reserved Concurrencyの設定とアカウントのConcurrency Limit引き上げ申請をしている。SQSの心配より先にLambda側の計画が必要だったというのが正直なところだ。


パターン選択の判断基準

「結局どれを使えばいいの?」というのが一番よく聞かれる質問なので、判断基準をまとめておく。

パターン向いているケース注意点
SNS Fan-out → SQS同一イベントを複数コンシューマーで処理コンシューマー増加でSQSキューも増える
SNS フィルタリングイベント種別が多くコンシューマーを絞りたいフィルタポリシー管理が複雑化しやすい
FIFO SNS → FIFO SQS順序保証・重複排除が必要スループット上限あり(300 req/s)
SQS 単体(SNSなし)1対1処理で十分なケースシンプルで運用コストが低い
EventBridge → SQSイベントルーティングが複雑なケースコンテンツベースルーティングが強力

2026年現在、EventBridgeが選択肢に入ることが増えてきた。SNS/SQSよりも柔軟なルーティングができる反面、コストは高くなる(イベント100万件あたり $1.00 vs SNSの $0.50)。イベント量が多い場合はちゃんと試算してほしい。なんとなくEventBridgeを選ぶと地味にコストが積み上がる。

アーキテクチャの選定はイベント駆動アーキテクチャ実装ガイド|Kafka・マイクロサービス対応イベントソーシング×CQRS完全実装ガイド2026も合わせて読んでもらえると設計判断の幅が広がると思う。


コスト感と最適化

正直なところ、最初はコストをあまり気にしていなかった。でも本番稼働して3ヶ月後の請求書を見て焦った。

pie title SQS/SNS月次コスト内訳(実運用ベース)
    "SQS スタンダード" : 38
    "SNS スタンダード" : 22
    "SQS FIFO" : 18
    "SNS FIFO" : 12
    "データ転送" : 10

SNSからSQSへのメッセージ配信は、SNS側とSQS側の両方でカウントされるので注意が必要だ。1メッセージを3つのSQSキューに配信すると、SNSで1回 + SQSで3回 = 計4回の料金がかかる計算になる(実際はどちらも64KB超えると追加チャージ)。Fan-outを多用するほどじわじわ効いてくる。

実際にやったコスト最適化はこの3つだ。

  • メッセージの圧縮:大きなペイロードはS3に保存してSQS/SNSにはS3の参照のみ渡す「Claim Checkパターン」を採用。これで平均メッセージサイズを8KB → 0.3KBに削減できた。
  • バッチ送信:SNSの publish_batch を使うとAPI呼び出し回数を減らせる(最大10件/バッチ)。
  • フィルタリングの活用:SNS側でフィルタリングすることで不要なSQS配信を減らす。
# Claim Checkパターンの実装例
import boto3
import json
import uuid

s3 = boto3.client('s3')
sns = boto3.client('sns')

CLAIM_CHECK_THRESHOLD = 256 * 1024  # 256KB以上はS3に退避
BUCKET = 'my-event-payloads'
TOPIC_ARN = 'arn:aws:sns:ap-northeast-1:123456789012:order-events'

def publish_event(event: dict) -> None:
    payload = json.dumps(event)
    
    if len(payload.encode()) > CLAIM_CHECK_THRESHOLD:
        # 大きなペイロードはS3に保存
        s3_key = f"events/{uuid.uuid4()}.json"
        s3.put_object(
            Bucket=BUCKET,
            Key=s3_key,
            Body=payload,
            ContentType='application/json'
        )
        # SNSにはS3参照だけ渡す
        message = json.dumps({
            'type': 'claim-check',
            'bucket': BUCKET,
            'key': s3_key,
            'eventType': event.get('eventType')
        })
    else:
        message = payload
    
    sns.publish(
        TopicArn=TOPIC_ARN,
        Message=message,
        MessageAttributes={
            'eventType': {
                'DataType': 'String',
                'StringValue': event.get('eventType', 'unknown')
            }
        }
    )

このパターン、地味に便利で、ペイロードが大きい注文完了イベント(商品情報やオプション情報が全部入ってたりする)でかなり効いた。最初から入れておけばよかったと思うくらいには効果的だった。


まとめ

SQS/SNSの連携パターンって「シンプルに見えて奥が深い」というのが正直な感想だ。3ヶ月本番で動かして整理した要点をまとめておく。

  1. Fan-outは疎結合の基本:1つのSNSトピックから複数SQSキューへ配信することで、コンシューマーを独立してデプロイ・スケールできる。必ずキューごとにDLQとCloudWatchアラームを設定すること。

  2. フィルタリングはSNS側でやる:Lambda側でのフィルタリングはコストの無駄。SNSのサブスクリプションフィルタポリシーで早期に絞り込む。

  3. reportBatchItemFailures は必ず設定:SQSイベントソースマッピングでの部分失敗対応はマスト。これがないとバッチ全体がDLQ行きになる事故が起きる。

  4. FIFOの罠を理解する:順序保証と重複排除は魅力的だが、スループット上限(300 req/s)がある。高負荷が予想されるならKafka等も検討する価値がある。

  5. 大きなペイロードはClaimCheckパターン:256KBを超えそうなメッセージはS3に退避してコスト削減。SNS/SQSはメッセージサイズで課金されるので見落としがち。

既存のSQS/SNS構成がある人は、まずDLQのアラームが設定されているか確認してほしい。意外と「DLQは作ったけど監視してない」というケースが多い。次に reportBatchItemFailures の設定、そしてメッセージサイズの見直しを順番にやると効果が出やすいはずだ。

まだ検証中なのがEventBridge Pipesとの使い分けで、SQS → Lambdaの変換処理をPipesでノーコードでやるのがどこまで実用的か試している途中だ。これはまた別の記事で書くつもり。

U

Untanbaby

ソフトウェアエンジニア|AWS / クラウドアーキテクチャ / DevOps

10年以上のIT実務経験をもとに、現場で使える技術情報を発信しています。 記事の誤りや改善点があればお問い合わせからお気軽にご連絡ください。

関連記事