本番で売上データが150%跳ね上がった日、データ品質管理と向き合った話

データレイク導入3ヶ月で本番障害。Great ExpectationsとdBtで作った品質フレームワークの実装パターンと、2026年の運用ノウハウを公開します。

データ品質で本番が死んだ話

先日、某プロジェクトでデータレイク導入から3ヶ月目に地獄を見た。朝一メール見たら、ダッシュボードの売上数字が前日比で150%跳ね上がってた。最初は「やった!」と思ったんだけど、2秒後に気づいた——データが壊れてた。

テーブルAのレコード数が突然2倍になってて、実は前日のバッチ処理がエラーしたのに誰にも通知されてなかったんだ。その時に初めて痛感したのが「データが正しい、って信じるだけじゃダメだ」ということ。

それからウチのチームは本気でデータ品質管理に取り組むことになった。この記事では、その過程で学んだ実装パターンと、2026年時点で実際に運用してる仕組みをお話しする。

Great Expectations導入で見えた「品質の定義」の難しさ

最初に選んだのはGreat Expectations。オープンソースで評判も良かったし、Pythonベースで自分たちのスタックに合ってた。ただ正直、最初は「何を検証するのか」をぜんぜん分かってなかったんだ。

データ品質って、スキーマ検証だけじゃ終わらないんですよね。カラムが存在する、型が合ってるというのはもちろんだけど、本当に大事なのは「ビジネスロジックとして正しいデータか」ということ。売上が負数になってないか、ユーザーIDが有効範囲か、そういったレベルの検証が必要だった。

Great Expectationsを導入して3週間ぐらいで、最初のExpectation Suite作ってみた。まずはテーブルAの基本的な検証から始めたんだけど、こんな感じ。

from great_expectations.dataset import PandasDataset
import great_expectations as ge

# SimpleCheckpointで定期実行
validator = ge.from_pandas(df)

# 基本的な検証
validator.expect_column_values_to_be_in_set(
    column="status",
    value_set=["active", "inactive", "pending"]
)

validator.expect_column_values_to_be_between(
    column="price",
    min_value=0,
    max_value=1000000
)

validator.expect_column_values_to_not_be_null(
    column="user_id"
)

validator.save_expectation_suite()

実際に実行してみると、いくつかのレコードがExpectationに引っかかった。でもここからが厄介でね——「引っかかったレコードをどうするか」という判断が必要になるわけだ。削除する?修正する?ビジネス側に確認する?その判断基準がなかったんだ。

結局、チーム内で半日ワークショップ開いて「何が許容できるデータドリフトか」を定義することにした。売上の+10%ぐらいは調査対象だけど止めない、ただしユーザーID周りはNullが1件でも出たら即停止みたいな。その過程で初めて「品質の定義」ってビジネス側と一緒に決めるんだな、って気づいたんですよ。

dbt testで始まるカラム単位の品質管理

Great Expectationsで全体的な品質フレームワークを作りながら、並行してdbt testも入れ始めた。dbtを既に運用してたので、テスト駆動で品質を上げたいという気持ちと、実装しやすいという理由の両方から。

dbt testは2種類ある。generic testと singular test だ。generic testは既成の検証(not_null, unique, relationships とか)で、singular testはSQLを自分で書く。最初はgeneric testだけで十分だと思ってたけど、実装進むと「このビジネスロジックは singular test で書かないと無理だ」みたいなケースが山ほど出てきた。

こんな感じで dbt_project.yml に定義してる。

models:
  - name: orders
    columns:
      - name: order_id
        data_tests:
          - unique
          - not_null
      - name: user_id
        data_tests:
          - not_null
          - relationships:
              to: ref('users')
              field: id
      - name: total_amount
        data_tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "total_amount >= 0"
      - name: status
        data_tests:
          - accepted_values:
              values: ['pending', 'confirmed', 'shipped', 'delivered']

singular testではもっと複雑なビジネスロジックを検証する。例えば、同じユーザーが同じ日に同じ商品を複数注文してないかとか。

-- tests/singular/no_duplicate_orders_same_day.sql
select
  user_id,
  product_id,
  date_trunc('day', created_at) as order_date,
  count(*) as order_count
from {{ ref('orders') }}
group by 1, 2, 3
having count(*) > 1

dbt testをパイプラインに組み込むと、毎回デプロイ前にこれらのテストが走る。正直最初は「遅くなるな」って思ったけど、実装してから3ヶ月で2回、本番バグを回避できた。テスト時間20秒の価値は十分にある。

データドリフト検知——スタテスティカルアプローチの現実

バッチ処理はスケーラリングのコツが本当に大事なんだけど、データ品質の観点でも、バッチで想定外の大量データが流れてくるというのはよくある話だ。

データドリフト検知は、単純な「前日との比較」では足りないことに気づいた。売上は季節性があるし、ユーザー数だって時間帯で変わる。そこで導入したのがEvidently というライブラリ。統計的にデータプロファイルを比較して「この変化は異常か」を判定する仕組みだ。

from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
import pandas as pd

# 参照期間(正常だと判断した過去データ)
reference_data = pd.read_csv('reference_data.csv')
current_data = pd.read_csv('current_data.csv')

report = Report(metrics=[
    DataDriftPreset(),
])

report.run(
    reference_data=reference_data,
    current_data=current_data
)

report.save_html('drift_report.html')
print(report.as_dict())

これを毎日走らせて、スコアが閾値を超えたらSlackに通知するようにした。最初は閾値を0.3(ドリフト度合い30%以上)に設定したんだけど、これが誤検知ばっかりだった。実運用では0.5ぐらいにして、ダッシュボードで日々のドリフトを可視化してる方が使える。

データ品質パイプラインの全体像はこんな感じになってる。

flowchart TB
    subgraph Source["データソース"]
        API["API/DB"]
        Log["ログ"]
    end

    subgraph Ingestion["取り込み層"]
        Ingest["Apache Airflow"]
    end

    subgraph Transform["変換層"]
        dbt_test["dbt test<br/>(generic/singular)"]
        Transform_Model["dbtモデル"]
    end

    subgraph Quality["品質管理層"]
        GE["Great Expectations<br/>(スキーマ・カラム検証)"]
        Drift["Evidently<br/>(ドリフト検知)"]
        Custom["カスタム検証"]
    end

    subgraph Output["出力"]
        DataLake[("Data Lake<br/>成功")]
        DLQ[("隔離テーブル<br/>失敗")]
        Alert["Slack通知"]
    end

    API --> Ingest
    Log --> Ingest
    Ingest --> dbt_test
    Ingest --> Transform_Model
    Transform_Model --> GE
    Transform_Model --> Drift
    Transform_Model --> Custom
    GE -->|合格| DataLake
    GE -->|不合格| DLQ
    GE -->|エラー| Alert
    Drift -->|異常検知| Alert
    Custom -->|失敗| Alert

実装してて一番気づいたのが「品質フレームワークは検出だけじゃ終わらない」ということだ。不正なデータが出た時に、それを自動的に隔離して、本番データパイプラインを止めずに進める仕組みが必要なんですよね。

本番運用で痛感した「品質定義の動的性」

Great Expectationsとdbt testを3ヶ月運用した時点で、新しい課題が浮上した。ビジネス側の要件が変わったんだ。新しい商品カテゴリが追加されて、従来では異常判定してた値が実は正常だった、みたいなケース。

「品質の定義」って静的じゃなく動的なんだな、と痛感したんですよ。つまり、Expectation Suiteとテストの定義を毎月見直すワークフローが必要。ウチのチームでは月1回、ビジネス・データ・分析の各チームで1.5時間かけて「今月のデータドリフト傾向、新しく検出すべき異常、許容値の見直し」をディスカッション。その中で定義を随時更新してる。

実際のテンプレートはこんな感じ。

# great_expectations/expectations/orders_expectation_suite.py
class OrdersExpectations:
    def __init__(self, context):
        self.suite = context.create_expectation_suite(
            expectation_suite_name="orders.base"
        )

    def define_expectations(self):
        # 2026/5月時点の定義
        # - 新商品カテゴリ「premium」追加に伴う許容値拡張
        self.suite.expect_column_values_to_be_in_set(
            column="category",
            value_set=["basic", "standard", "premium"],  # 更新
            meta={"updated_at": "2026-05-01", "reason": "Premium商品カテゴリ追加"}
        )

        self.suite.expect_column_values_to_be_between(
            column="price",
            min_value=0,
            max_value=5000000,  # 従来は 1000000
            meta={"updated_at": "2026-05-01", "reason": "Premium商品単価上昇"}
        )

メタデータに「いつ」「なぜ」を記録することで、後から「あ、この定義はいつ変わった」って追跡できるようにしてる。地味だけどマジで大事なんですよ。

Great Expectations vs Soda vs Monte Carlo:何が違う?

ここまでGreat Expressionsの話ばっかりしたけど、実は2026年時点ではほかのツールも結構成熟してる。ウチのチームでも一回、3つのツールを実際に試してみた。

項目Great ExpectationsSodaMonte Carlo
導入形式OSS/Cloud両対応Cloud中心Cloud専業
セットアップ難易度中(JSON/YAMLの記述量多い)低(YAML記述簡潔)低(UI操作可)
スケーラビリティ高(Spark対応)中(SQL主体)高(大規模DW対応)
異常検知精度手動定義主体ML活用で自動化ML活用で自動化
コスト無料~数千/月~$500/月~~$1000/月~
推奨用途テーブル単位の品質管理自動化・ML検知重視エンタープライズ規模

正直、ウチのチームがGreat Expressionsを選んだ理由は「メンバーのPythonスキルが高い」という人的要因が大きい。Sodaはyaml記述が簡潔で素早く導入できるし、Monte CarloはデータカタログとしてのMachine Learning検知機能も充実してるんだ。データカタログ構築を考えてるなら検討の価値がある。

地味だけど大事な「データ品質の継続性」

導入から6ヶ月経った今、見えてきたのは「最初の厳密さが長続きしない」ということ。Great Expectationsのテストが増えると、毎日のアラートが50~100件になるんだ。その中から「これは本当に対応すべき異常か」を判定するのに疲弊する。

ANOVA テストで統計的有意性をフィルタリングしたり、重要度レベルを5段階に分けてアラート優先度をコントロールしたりした。

from scipy import stats
import numpy as np

def is_statistically_significant_drift(reference, current, threshold=0.05):
    """統計的有意性でドリフト判定(Kolmogorov-Smirnov検定)"""
    statistic, p_value = stats.ks_2samp(reference, current)
    return p_value < threshold

# 重要度レベル分け
ALERT_LEVELS = {
    "critical": 0,      # ビジネスロジック破壊
    "high": 1,          # データ信頼性に影響
    "medium": 2,        # 要調査
    "low": 3,           # 情報提供
    "info": 4           # 統計情報
}

半年運用してわかったのが「品質管理は継続が全て」だということ。最初の3ヶ月は熱量が高いけど、その後は工数と効果のバランスが重要になるんですよ。ウチではslackボットで自動アラート集約して、人間の判断は「本当に大事なやつ」にだけ割く運用に落ち着いた。

まとめ

データ品質管理は「導入したら終わり」じゃなく「継続的な定義アップデート」が本質だと思う。ウチのチームが学んだポイント:

品質の定義はビジネス側と共作する テクノロジー側だけで決めると、運用が続かない。月1回のワークショップで定義を見直す仕組みが必須だ。

Great Expressions + dbt testで層状に検証 複雑な品質管理を一つのツールで完結させようとするな。Great Expressionsでスキーマ・統計的異常、dbt testでビジネスロジック、という使い分けが効いた。

ドリフト検知は統計的アプローチを取る Evidently みたいなツールで自動検知を組み込むと、手作業の検査工数が圧倒的に減る。

アラート疲れは重大な敵 100件のアラートは0件と同じ。重要度・統計的有意性でフィルタリングして「本当に対応すべき異常」に絞り込む仕組みが必須だ。

品質フレームワークは動的 ビジネス要件が変わったら、即座に定義を更新できる体制を用意しとく。メタデータに「いつ」「なぜ」を記録することで後々追跡しやすくなる。

正直、データ品質管理は地味で感謝されにくい領域だけど、ここがしっかりしてるプロジェクトと甘いプロジェクトでは、半年後の信頼性に歴然とした差が出る。次のプロジェクトでこの仕組みを最初から仕込める組織は、多分データドリブンな意思決定がスケールしやすいんじゃないかな。

U

Untanbaby

ソフトウェアエンジニア|AWS / クラウドアーキテクチャ / DevOps

10年以上のIT実務経験をもとに、現場で使える技術情報を発信しています。 記事の誤りや改善点があればお問い合わせからお気軽にご連絡ください。

関連記事