バッチ処理の設計で3回本番を止めて学んだ「壊れにくい」作り方

「データ読んで処理して書くだけ」と思ってたら3ヶ月で本番が2回止まった話。べき等性・リトライ・進捗管理など、実際に踏んだ失敗から逆算したクラウドネイティブ時代のバッチ設計パターンをまとめました。

バッチ処理の設計、正直ナメてた時期がある。

APIサーバーの設計や非同期処理に比べて「データを読んで、処理して、書き出す」だけでしょ、という感覚で最初のバッチをリリースした結果、3ヶ月で本番が2回止まった。チームに頭を下げながら障害報告を書いていたあの時間は、今でも思い出すと胃が痛い。

そこから設計を根本から見直して、気づいたら4年分の失敗知見がたまった。2026年現在はクラウドネイティブ環境でのバッチ実行がスタンダードになり、設計の考え方自体もだいぶ変わってきた。今日はその「変わったポイント」を中心に書いていく。

以前の記事でイベント駆動アーキテクチャの実装パターンについて書いたが、バッチ処理とイベント駆動は思想が近いようで、障害モードがまったく違う。その違いを理解することが設計の第一歩だと今は思っている。

バッチ処理の「壊れ方」を先に知る

設計を語る前に、バッチはどう壊れるかを整理しておきたい。自分が実際に踏んだパターンがこれだ。

flowchart TD
    A[バッチ起動] --> B{入力データ取得}
    B -->|成功| C[データ処理ループ]
    B -->|失敗| F1[❌ 失敗パターン1: 入力が空なのに正常終了]
    C --> D{外部API・DB書き込み}
    D -->|タイムアウト| F2[❌ 失敗パターン2: 中途半端な状態で停止]
    D -->|成功| E{次レコードあり?}
    E -->|あり| C
    E -->|なし| G{後続バッチ起動}
    G -->|重複起動| F3[❌ 失敗パターン3: 二重処理]
    G -->|正常| H[✅ 完了]

失敗パターン1は「入力データが0件のときの挙動が未定義」という問題。ゼロ件を正常として扱うのか、異常として扱うのか、設計段階で決めていなかった。

失敗パターン2は中途半端な状態での停止。10万件処理中の5万件目でタイムアウトした場合、どこから再開するのかが設計されていなかった。

失敗パターン3は二重起動・二重処理。前のバッチが終わりきっていないのに次がキックされてしまい、同じデータが2回処理された。

この3つ、やられた人いませんか? 特にパターン2は「再実行したら同じデータが2回処理される」という最悪のパターンに化けることがあって、マジで地獄だった。

べき等性の設計:「何度実行しても同じ結果」を具体的に実装する

バッチ設計の根幹は**べき等性(idempotency)**だ。概念としては知っていても、具体的にどう実装するかで詰まることが多い。個人的にもここが一番「知ってるのにできてなかった」ポイントだった。

実際にPython + PostgreSQLで組んだ実装パターンがこれ。

import hashlib
import json
from datetime import datetime, date
from typing import Optional
from dataclasses import dataclass
import psycopg2
from psycopg2.extras import RealDictCursor

@dataclass
class BatchJobRecord:
    job_id: str
    batch_name: str
    target_date: date
    status: str  # 'running' | 'completed' | 'failed'
    processed_count: int
    checkpoint: Optional[dict]
    created_at: datetime
    updated_at: datetime


class IdempotentBatchRunner:
    def __init__(self, conn_str: str):
        self.conn = psycopg2.connect(conn_str)

    def _generate_job_id(self, batch_name: str, target_date: date) -> str:
        """バッチ名と対象日からジョブIDを生成(同一バッチの重複実行防止)"""
        key = f"{batch_name}:{target_date.isoformat()}"
        return hashlib.sha256(key.encode()).hexdigest()[:16]

    def acquire_lock(self, batch_name: str, target_date: date) -> Optional[BatchJobRecord]:
        """
        ジョブの排他制御。
        - completed: スキップ(再実行不要)
        - running: スキップ(他プロセスが実行中)
        - failed / 未存在: 実行可能
        """
        job_id = self._generate_job_id(batch_name, target_date)

        with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
            # upsert with conflict detection
            cur.execute("""
                INSERT INTO batch_job_records
                    (job_id, batch_name, target_date, status, processed_count, checkpoint)
                VALUES (%s, %s, %s, 'running', 0, '{}')
                ON CONFLICT (job_id) DO UPDATE
                    SET status = CASE
                        WHEN batch_job_records.status = 'failed' THEN 'running'
                        ELSE batch_job_records.status
                    END,
                    updated_at = NOW()
                RETURNING *
            """, (job_id, batch_name, target_date))

            record = cur.fetchone()
            self.conn.commit()

            if record['status'] != 'running':
                return None  # completed or already running by another process

            return BatchJobRecord(**record)

    def save_checkpoint(self, job_id: str, processed_count: int, checkpoint: dict):
        """処理途中のチェックポイントを保存(再実行時の再開位置)"""
        with self.conn.cursor() as cur:
            cur.execute("""
                UPDATE batch_job_records
                SET processed_count = %s,
                    checkpoint = %s,
                    updated_at = NOW()
                WHERE job_id = %s
            """, (processed_count, json.dumps(checkpoint), job_id))
            self.conn.commit()

ポイントはON CONFLICTでのupsertだ。同じjob_idのジョブがすでにcompleted状態なら更新しない。failed状態ならrunningに戻して再実行可能にする。これで「完了済みバッチの再起動による二重処理」と「失敗後の再実行不可」の両方を防げる。

チェックポイントの仕組みも重要で、自分のチームでは「最後に処理したレコードのID」をJSONで保存している。再実行時はこの値を使って途中から再開できる。

def process_batch(runner: IdempotentBatchRunner, target_date: date):
    job = runner.acquire_lock('daily_aggregation', target_date)
    if job is None:
        print(f"Job already completed or running. Skipping.")
        return

    # チェックポイントから再開位置を取得
    last_id = job.checkpoint.get('last_processed_id', 0)
    processed = job.processed_count

    try:
        records = fetch_records_after(last_id, target_date)  # チェックポイント以降を取得
        for record in records:
            process_record(record)
            processed += 1

            # 1000件ごとにチェックポイント保存
            if processed % 1000 == 0:
                runner.save_checkpoint(
                    job.job_id,
                    processed,
                    {'last_processed_id': record['id']}
                )

        runner.mark_completed(job.job_id, processed)
    except Exception as e:
        runner.mark_failed(job.job_id, str(e))
        raise

この実装パターンを入れてから、「再実行で二重処理」の事故はゼロになった。地味に便利というより、これがないと夜に安眠できない。

2026年のクラウドネイティブバッチ構成

最近うちのチームで採用している構成がこれだ。ECSのScheduled Tasksをベースに、失敗時のリトライとオブザーバビリティを組み合わせている。

graph TB
    subgraph Trigger["トリガー層"]
        EB[EventBridge Scheduler]
        SQS[SQS バッチキュー]
    end

    subgraph Execution["実行層(ECS Fargate)"]
        CTRL[バッチコントローラー]
        W1[ワーカー 1]
        W2[ワーカー 2]
        W3[ワーカー N]
    end

    subgraph Storage["データ層"]
        PG[(PostgreSQL\nジョブ管理DB)]
        S3[(S3\n入力/出力データ)]
    end

    subgraph Observability["オブザーバビリティ"]
        CW[CloudWatch Logs]
        METRIC[カスタムメトリクス]
        ALARM[アラーム + SNS]
    end

    subgraph DLQ["リトライ・エラー処理"]
        DLQUEUE[SQS DLQ]
        LAMBDA[DLQハンドラー Lambda]
        SLACK[Slack通知]
    end

    EB -->|定時起動| CTRL
    SQS -->|手動/再実行キュー| CTRL
    CTRL --> PG
    CTRL -->|タスク分割| W1
    CTRL -->|タスク分割| W2
    CTRL -->|タスク分割| W3
    W1 --> S3
    W2 --> S3
    W3 --> S3
    W1 --> PG
    W2 --> PG
    W3 --> PG
    W1 --> CW
    W2 --> CW
    W3 --> CW
    CTRL --> METRIC
    METRIC --> ALARM
    ALARM --> SNS[SNS]
    SNS --> SLACK
    W1 -->|失敗時| DLQUEUE
    DLQUEUE --> LAMBDA
    LAMBDA --> SLACK

EventBridge Schedulerで定時起動し、バッチコントローラーがジョブをワーカーに分割する。ワーカーは並列実行可能で、それぞれが独立したチェックポイントを持つ。失敗したワーカーのタスクはSQSのDLQに積まれ、Lambdaが検知してSlack通知する仕組みだ。

SQS/SNS連携パターンをチームで整理し直した話でも触れたが、DLQ設計はバッチ処理との相性がいい。「失敗したサブタスクだけ再実行する」という粒度のコントロールができるのが特に大きくて、全体を再実行しなくていいのは精神的にもかなり楽になる。

パフォーマンス設計:スループットと安全性のトレードオフ

「バッチを速くしたい」という要求は必ず来る。でも速くするほど外部サービスへの負荷やDB競合が増えるトレードオフがある。自分が実際に計測した結果がこれだ。

xychart-beta
    title "並列度とスループットの関係(実測値)"
    x-axis ["直列(1)", "並列2", "並列4", "並列8", "並列16", "並列32"]
    y-axis "処理件数/秒" 0 --> 1200
    bar [120, 230, 420, 680, 820, 790]
    line [120, 240, 480, 960, 1200, 1200]

折れ線が理論値、棒グラフが実測値だ。並列16を超えたあたりからDB接続プールが詰まり始めてスループットが頭打ちになり、並列32ではむしろ並列16より遅くなっている。理論値と実測値の乖離が大きくなるポイントこそ「設計上の限界」なので、ここを事前に把握しておくことが大事。「並列度を増やせば増やすほど速い」は本当にウソなので、一度は計測してみてほしい。

実際に試したスループット最適化の手法と効果をまとめる。

手法実装コスト効果注意点
バルクインサート(1件→1000件一括)★★★★★トランザクション範囲に注意
接続プール最適化(pgbouncer導入)★★★★☆設定ミスで逆効果になる
並列ワーカー化★★★★☆べき等性設計が前提
非同期I/O(asyncio)★★★☆☆CPU boundには効かない
キャッシュ層(Redis)追加★★★☆☆キャッシュ設計の複雑化
ストリーミング処理に変更★★★★★アーキテクチャ変更が必要

うちのチームで一番費用対効果が高かったのはバルクインサートへの変更だった。1件ずつINSERTしていた処理を1000件のバッチINSERTに変えただけで、処理時間が1/8になった。コードの変更量も少ないのに効果が劇的で、「これ最初からやれよ」と過去の自分に言いたい気持ちだった。

Pythonでのバルクインサートはこんな感じになる。

from psycopg2.extras import execute_values

def bulk_insert_records(conn, records: list[dict], chunk_size: int = 1000):
    """チャンク単位でバルクインサート。メモリ効率も考慮"""
    with conn.cursor() as cur:
        for i in range(0, len(records), chunk_size):
            chunk = records[i:i + chunk_size]
            execute_values(
                cur,
                """
                INSERT INTO processed_data (id, user_id, amount, processed_at)
                VALUES %s
                ON CONFLICT (id) DO UPDATE SET
                    amount = EXCLUDED.amount,
                    processed_at = EXCLUDED.processed_at
                """,
                [(r['id'], r['user_id'], r['amount'], r['processed_at']) for r in chunk],
                template=None,
                page_size=chunk_size
            )
            conn.commit()  # チャンクごとにコミット(メモリ節約)

ON CONFLICT DO UPDATEを使っているのがポイントで、再実行時にも重複データが入らない。べき等性の担保をDBレベルでもやっている、という構成だ。

オブザーバビリティ:バッチが「どこで詰まっているか」を見えるようにする

バッチ処理で一番つらいのは「なんか遅い、なんか失敗してる」だけが分かって、どこが問題か分からない状態だ。インシデントが起きてから「ログが足りなかった」に気づいても手遅れなことが多いので、最初から整備しておくのが鉄則。

正直、SLI/SLO設計でも触れたが、バッチ処理のSLOは「完了率」と「レイテンシ」の2軸で考えるのがシンプルでいい。「1時間以内に完了すること」「エラー率1%以下であること」みたいな感じ。これを決めておくと、アラームの閾値設定がブレなくなる。

import time
from dataclasses import dataclass, field
from typing import Callable
import boto3

@dataclass
class BatchMetrics:
    batch_name: str
    target_date: str
    start_time: float = field(default_factory=time.time)
    processed_count: int = 0
    error_count: int = 0
    skipped_count: int = 0
    _cw_client: object = field(default=None, repr=False)

    def __post_init__(self):
        self._cw_client = boto3.client('cloudwatch', region_name='ap-northeast-1')

    def increment_processed(self, count: int = 1):
        self.processed_count += count

    def increment_error(self, count: int = 1):
        self.error_count += count

    def flush(self):
        """CloudWatchにカスタムメトリクスを送信"""
        elapsed = time.time() - self.start_time
        throughput = self.processed_count / elapsed if elapsed > 0 else 0
        error_rate = self.error_count / max(self.processed_count, 1)

        metrics = [
            {'MetricName': 'ProcessedCount', 'Value': self.processed_count, 'Unit': 'Count'},
            {'MetricName': 'ErrorCount', 'Value': self.error_count, 'Unit': 'Count'},
            {'MetricName': 'ErrorRate', 'Value': error_rate, 'Unit': 'None'},
            {'MetricName': 'Throughput', 'Value': throughput, 'Unit': 'Count/Second'},
            {'MetricName': 'ElapsedSeconds', 'Value': elapsed, 'Unit': 'Seconds'},
        ]

        dimensions = [
            {'Name': 'BatchName', 'Value': self.batch_name},
            {'Name': 'TargetDate', 'Value': self.target_date},
        ]

        self._cw_client.put_metric_data(
            Namespace='CustomBatch/Metrics',
            MetricData=[
                {**m, 'Dimensions': dimensions}
                for m in metrics
            ]
        )
        print(f"[Metrics] processed={self.processed_count}, errors={self.error_count}, "
              f"throughput={throughput:.1f}/s, elapsed={elapsed:.1f}s")

このメトリクスをCloudWatchのアラームに連携して、「処理レートが想定の50%を下回ったら即アラート」みたいな設定を入れている。「バッチが完了しなかった」ではなく「バッチが詰まりかけている」段階で検知できるので、対応が格段に楽になった。異常に気づくのが完了後ではなく処理中というのは、思っている以上に大きい差がある。

ログには必ずbatch_nametarget_dateをコンテキストとして付けること。これがないとCloudWatch Logsで検索するときに地獄を見る。

import logging
import json

class StructuredLogger:
    def __init__(self, batch_name: str, target_date: str):
        self.context = {
            'batch_name': batch_name,
            'target_date': target_date,
        }
        self.logger = logging.getLogger(batch_name)

    def info(self, message: str, **kwargs):
        self.logger.info(json.dumps({
            **self.context,
            'level': 'INFO',
            'message': message,
            **kwargs
        }))

    def error(self, message: str, **kwargs):
        self.logger.error(json.dumps({
            **self.context,
            'level': 'ERROR',
            'message': message,
            **kwargs
        }))

構造化ログにしておけば、CloudWatch Logs Insightsでfilter batch_name = "daily_aggregation" | filter level = "ERROR"みたいなクエリがすぐ書ける。インシデント対応ベストプラクティスでも触れているが、ログが構造化されているかどうかで対応速度が全然違う。

皆さんのバッチ処理、ちゃんとログ構造化されてますか? チームに聞くと「print文で出してた」という声が意外と多くて、毎回もったいないなと思う。

まとめ

4年間バッチ処理の設計・運用をやってきて、今の自分が「これは外せない」と思うポイントを3つに絞るとこうなる。

  1. べき等性は設計の最優先事項。「何度実行しても同じ結果」を担保するために、ジョブIDベースの排他制御とチェックポイントの組み合わせが実用的だった。DBのON CONFLICTも積極的に使う。

  2. パフォーマンスの限界を事前に計測する。並列度を上げればいいわけではなく、DBの接続プールが詰まるポイントを事前に把握して設計する。バルクインサートは費用対効果が高いのでまず試してほしい。

  3. オブザーバビリティなき運用は地獄。構造化ログとカスタムメトリクスを最初から入れることで、「詰まりかけている」段階で検知できる。インシデントが起きてから整備しようとすると手遅れなことが多い。

正直まだ検証中なところとしては、クラウドネイティブバッチの文脈でStep FunctionsとECS Scheduled Tasksのどちらが長期的に運用しやすいかという判断だ。State管理が明示的なStep Functionsの方が複雑なフロー向きだが、シンプルなバッチならECSで十分なことも多い。この辺は分散バッチアーキテクチャのガイドも参考にしてみてほしい。

次のアクション:

  • 既存バッチのジョブ管理テーブルの有無を確認する(なければ今すぐ作る)
  • バルクインサートの未実装箇所を洗い出してリファクタする
  • CloudWatchカスタムメトリクスとアラームを1本だけ入れてみる

失敗して初めて分かることが多い領域なので、この記事が「あの失敗を防ぎたい」という人の役に立てばうれしい。

U

Untanbaby

ソフトウェアエンジニア|AWS / クラウドアーキテクチャ / DevOps

10年以上のIT実務経験をもとに、現場で使える技術情報を発信しています。 記事の誤りや改善点があればお問い合わせからお気軽にご連絡ください。

関連記事