本番で売上データが150%跳ね上がった日、データ品質管理と向き合った話
データレイク導入3ヶ月で本番障害。Great ExpectationsとdBtで作った品質フレームワークの実装パターンと、2026年の運用ノウハウを公開します。
データ品質で本番が死んだ話
先日、某プロジェクトでデータレイク導入から3ヶ月目に地獄を見た。朝一メール見たら、ダッシュボードの売上数字が前日比で150%跳ね上がってた。最初は「やった!」と思ったんだけど、2秒後に気づいた——データが壊れてた。
テーブルAのレコード数が突然2倍になってて、実は前日のバッチ処理がエラーしたのに誰にも通知されてなかったんだ。その時に初めて痛感したのが「データが正しい、って信じるだけじゃダメだ」ということ。
それからウチのチームは本気でデータ品質管理に取り組むことになった。この記事では、その過程で学んだ実装パターンと、2026年時点で実際に運用してる仕組みをお話しする。
Great Expectations導入で見えた「品質の定義」の難しさ
最初に選んだのはGreat Expectations。オープンソースで評判も良かったし、Pythonベースで自分たちのスタックに合ってた。ただ正直、最初は「何を検証するのか」をぜんぜん分かってなかったんだ。
データ品質って、スキーマ検証だけじゃ終わらないんですよね。カラムが存在する、型が合ってるというのはもちろんだけど、本当に大事なのは「ビジネスロジックとして正しいデータか」ということ。売上が負数になってないか、ユーザーIDが有効範囲か、そういったレベルの検証が必要だった。
Great Expectationsを導入して3週間ぐらいで、最初のExpectation Suite作ってみた。まずはテーブルAの基本的な検証から始めたんだけど、こんな感じ。
from great_expectations.dataset import PandasDataset
import great_expectations as ge
# SimpleCheckpointで定期実行
validator = ge.from_pandas(df)
# 基本的な検証
validator.expect_column_values_to_be_in_set(
column="status",
value_set=["active", "inactive", "pending"]
)
validator.expect_column_values_to_be_between(
column="price",
min_value=0,
max_value=1000000
)
validator.expect_column_values_to_not_be_null(
column="user_id"
)
validator.save_expectation_suite()
実際に実行してみると、いくつかのレコードがExpectationに引っかかった。でもここからが厄介でね——「引っかかったレコードをどうするか」という判断が必要になるわけだ。削除する?修正する?ビジネス側に確認する?その判断基準がなかったんだ。
結局、チーム内で半日ワークショップ開いて「何が許容できるデータドリフトか」を定義することにした。売上の+10%ぐらいは調査対象だけど止めない、ただしユーザーID周りはNullが1件でも出たら即停止みたいな。その過程で初めて「品質の定義」ってビジネス側と一緒に決めるんだな、って気づいたんですよ。
dbt testで始まるカラム単位の品質管理
Great Expectationsで全体的な品質フレームワークを作りながら、並行してdbt testも入れ始めた。dbtを既に運用してたので、テスト駆動で品質を上げたいという気持ちと、実装しやすいという理由の両方から。
dbt testは2種類ある。generic testと singular test だ。generic testは既成の検証(not_null, unique, relationships とか)で、singular testはSQLを自分で書く。最初はgeneric testだけで十分だと思ってたけど、実装進むと「このビジネスロジックは singular test で書かないと無理だ」みたいなケースが山ほど出てきた。
こんな感じで dbt_project.yml に定義してる。
models:
- name: orders
columns:
- name: order_id
data_tests:
- unique
- not_null
- name: user_id
data_tests:
- not_null
- relationships:
to: ref('users')
field: id
- name: total_amount
data_tests:
- not_null
- dbt_utils.expression_is_true:
expression: "total_amount >= 0"
- name: status
data_tests:
- accepted_values:
values: ['pending', 'confirmed', 'shipped', 'delivered']
singular testではもっと複雑なビジネスロジックを検証する。例えば、同じユーザーが同じ日に同じ商品を複数注文してないかとか。
-- tests/singular/no_duplicate_orders_same_day.sql
select
user_id,
product_id,
date_trunc('day', created_at) as order_date,
count(*) as order_count
from {{ ref('orders') }}
group by 1, 2, 3
having count(*) > 1
dbt testをパイプラインに組み込むと、毎回デプロイ前にこれらのテストが走る。正直最初は「遅くなるな」って思ったけど、実装してから3ヶ月で2回、本番バグを回避できた。テスト時間20秒の価値は十分にある。
データドリフト検知——スタテスティカルアプローチの現実
バッチ処理はスケーラリングのコツが本当に大事なんだけど、データ品質の観点でも、バッチで想定外の大量データが流れてくるというのはよくある話だ。
データドリフト検知は、単純な「前日との比較」では足りないことに気づいた。売上は季節性があるし、ユーザー数だって時間帯で変わる。そこで導入したのがEvidently というライブラリ。統計的にデータプロファイルを比較して「この変化は異常か」を判定する仕組みだ。
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
import pandas as pd
# 参照期間(正常だと判断した過去データ)
reference_data = pd.read_csv('reference_data.csv')
current_data = pd.read_csv('current_data.csv')
report = Report(metrics=[
DataDriftPreset(),
])
report.run(
reference_data=reference_data,
current_data=current_data
)
report.save_html('drift_report.html')
print(report.as_dict())
これを毎日走らせて、スコアが閾値を超えたらSlackに通知するようにした。最初は閾値を0.3(ドリフト度合い30%以上)に設定したんだけど、これが誤検知ばっかりだった。実運用では0.5ぐらいにして、ダッシュボードで日々のドリフトを可視化してる方が使える。
データ品質パイプラインの全体像はこんな感じになってる。
flowchart TB
subgraph Source["データソース"]
API["API/DB"]
Log["ログ"]
end
subgraph Ingestion["取り込み層"]
Ingest["Apache Airflow"]
end
subgraph Transform["変換層"]
dbt_test["dbt test<br/>(generic/singular)"]
Transform_Model["dbtモデル"]
end
subgraph Quality["品質管理層"]
GE["Great Expectations<br/>(スキーマ・カラム検証)"]
Drift["Evidently<br/>(ドリフト検知)"]
Custom["カスタム検証"]
end
subgraph Output["出力"]
DataLake[("Data Lake<br/>成功")]
DLQ[("隔離テーブル<br/>失敗")]
Alert["Slack通知"]
end
API --> Ingest
Log --> Ingest
Ingest --> dbt_test
Ingest --> Transform_Model
Transform_Model --> GE
Transform_Model --> Drift
Transform_Model --> Custom
GE -->|合格| DataLake
GE -->|不合格| DLQ
GE -->|エラー| Alert
Drift -->|異常検知| Alert
Custom -->|失敗| Alert
実装してて一番気づいたのが「品質フレームワークは検出だけじゃ終わらない」ということだ。不正なデータが出た時に、それを自動的に隔離して、本番データパイプラインを止めずに進める仕組みが必要なんですよね。
本番運用で痛感した「品質定義の動的性」
Great Expectationsとdbt testを3ヶ月運用した時点で、新しい課題が浮上した。ビジネス側の要件が変わったんだ。新しい商品カテゴリが追加されて、従来では異常判定してた値が実は正常だった、みたいなケース。
「品質の定義」って静的じゃなく動的なんだな、と痛感したんですよ。つまり、Expectation Suiteとテストの定義を毎月見直すワークフローが必要。ウチのチームでは月1回、ビジネス・データ・分析の各チームで1.5時間かけて「今月のデータドリフト傾向、新しく検出すべき異常、許容値の見直し」をディスカッション。その中で定義を随時更新してる。
実際のテンプレートはこんな感じ。
# great_expectations/expectations/orders_expectation_suite.py
class OrdersExpectations:
def __init__(self, context):
self.suite = context.create_expectation_suite(
expectation_suite_name="orders.base"
)
def define_expectations(self):
# 2026/5月時点の定義
# - 新商品カテゴリ「premium」追加に伴う許容値拡張
self.suite.expect_column_values_to_be_in_set(
column="category",
value_set=["basic", "standard", "premium"], # 更新
meta={"updated_at": "2026-05-01", "reason": "Premium商品カテゴリ追加"}
)
self.suite.expect_column_values_to_be_between(
column="price",
min_value=0,
max_value=5000000, # 従来は 1000000
meta={"updated_at": "2026-05-01", "reason": "Premium商品単価上昇"}
)
メタデータに「いつ」「なぜ」を記録することで、後から「あ、この定義はいつ変わった」って追跡できるようにしてる。地味だけどマジで大事なんですよ。
Great Expectations vs Soda vs Monte Carlo:何が違う?
ここまでGreat Expressionsの話ばっかりしたけど、実は2026年時点ではほかのツールも結構成熟してる。ウチのチームでも一回、3つのツールを実際に試してみた。
| 項目 | Great Expectations | Soda | Monte Carlo |
|---|---|---|---|
| 導入形式 | OSS/Cloud両対応 | Cloud中心 | Cloud専業 |
| セットアップ難易度 | 中(JSON/YAMLの記述量多い) | 低(YAML記述簡潔) | 低(UI操作可) |
| スケーラビリティ | 高(Spark対応) | 中(SQL主体) | 高(大規模DW対応) |
| 異常検知精度 | 手動定義主体 | ML活用で自動化 | ML活用で自動化 |
| コスト | 無料~数千/月 | ~$500/月~ | ~$1000/月~ |
| 推奨用途 | テーブル単位の品質管理 | 自動化・ML検知重視 | エンタープライズ規模 |
正直、ウチのチームがGreat Expressionsを選んだ理由は「メンバーのPythonスキルが高い」という人的要因が大きい。Sodaはyaml記述が簡潔で素早く導入できるし、Monte CarloはデータカタログとしてのMachine Learning検知機能も充実してるんだ。データカタログ構築を考えてるなら検討の価値がある。
地味だけど大事な「データ品質の継続性」
導入から6ヶ月経った今、見えてきたのは「最初の厳密さが長続きしない」ということ。Great Expectationsのテストが増えると、毎日のアラートが50~100件になるんだ。その中から「これは本当に対応すべき異常か」を判定するのに疲弊する。
ANOVA テストで統計的有意性をフィルタリングしたり、重要度レベルを5段階に分けてアラート優先度をコントロールしたりした。
from scipy import stats
import numpy as np
def is_statistically_significant_drift(reference, current, threshold=0.05):
"""統計的有意性でドリフト判定(Kolmogorov-Smirnov検定)"""
statistic, p_value = stats.ks_2samp(reference, current)
return p_value < threshold
# 重要度レベル分け
ALERT_LEVELS = {
"critical": 0, # ビジネスロジック破壊
"high": 1, # データ信頼性に影響
"medium": 2, # 要調査
"low": 3, # 情報提供
"info": 4 # 統計情報
}
半年運用してわかったのが「品質管理は継続が全て」だということ。最初の3ヶ月は熱量が高いけど、その後は工数と効果のバランスが重要になるんですよ。ウチではslackボットで自動アラート集約して、人間の判断は「本当に大事なやつ」にだけ割く運用に落ち着いた。
まとめ
データ品質管理は「導入したら終わり」じゃなく「継続的な定義アップデート」が本質だと思う。ウチのチームが学んだポイント:
品質の定義はビジネス側と共作する テクノロジー側だけで決めると、運用が続かない。月1回のワークショップで定義を見直す仕組みが必須だ。
Great Expressions + dbt testで層状に検証 複雑な品質管理を一つのツールで完結させようとするな。Great Expressionsでスキーマ・統計的異常、dbt testでビジネスロジック、という使い分けが効いた。
ドリフト検知は統計的アプローチを取る Evidently みたいなツールで自動検知を組み込むと、手作業の検査工数が圧倒的に減る。
アラート疲れは重大な敵 100件のアラートは0件と同じ。重要度・統計的有意性でフィルタリングして「本当に対応すべき異常」に絞り込む仕組みが必須だ。
品質フレームワークは動的 ビジネス要件が変わったら、即座に定義を更新できる体制を用意しとく。メタデータに「いつ」「なぜ」を記録することで後々追跡しやすくなる。
正直、データ品質管理は地味で感謝されにくい領域だけど、ここがしっかりしてるプロジェクトと甘いプロジェクトでは、半年後の信頼性に歴然とした差が出る。次のプロジェクトでこの仕組みを最初から仕込める組織は、多分データドリブンな意思決定がスケールしやすいんじゃないかな。