Spark 3.5→4.0移行で半年ハマった話|AQEと本番チューニングの現実
「マイナーアップみたいなもんでしょ」と思ったら半年かかった。AQEの罠、Connect API移行の落とし穴、Pandas on Sparkの地雷など、実際に踏んだ失敗と対処法をまとめました。
Spark 3.5から4.0に上げたら、思ってた以上にしんどかった
去年の秋、うちのチームでApache Spark 3.5から4.0への移行プロジェクトを走らせた。「マイナーバージョンアップみたいなもんでしょ」と軽く見てたら半年かかった。今はなんとか本番環境で安定稼働しているけど、その過程でかなり多くの地雷を踏んだので、同じ轍を踏んでほしくなくて書いている。
うちのデータ基盤はざっくり言うとこんな構成だ。
flowchart TB
subgraph ingestion["データ取り込み層"]
kafka["Kafka\nEvent Streams"]
rds["RDS\nPostgreSQL"]
api["REST API\nソース"]
end
subgraph processing["Spark処理層"]
spark_streaming["Spark Structured\nStreaming"]
spark_batch["Spark Batch\n(EMR)"]
delta["Delta Lake\nテーブル"]
end
subgraph serving["サービング層"]
catalog["Glue Data Catalog"]
athena["Amazon Athena"]
redshift["Redshift\nServerless"]
bi["BI Tools\n(Metabase)"]
end
kafka --> spark_streaming
rds --> spark_batch
api --> spark_batch
spark_streaming --> delta
spark_batch --> delta
delta --> catalog
catalog --> athena
catalog --> redshift
athena --> bi
redshift --> bi
Delta Lake + EMR上でSparkを動かし、Glueカタログ経由でAthenaとRedshift Serverlessに繋ぐ構成だ。データ量は日次バッチで約2TBほど処理している。Delta Lake・Iceberg・Hudi比較の記事も以前書いたけど、Delta Lakeを選んだのは正解だったと今でも思っている。
Spark 4.0で何が変わったか(個人的に刺さった点)
Spark 4.0のリリースノートを最初に読んだとき、正直「これは破壊的変更多いな」と感じた。公式ドキュメントには丁寧に書いてあるんだけど、実際に動かしてみると全然違うハマり方をするんですよね。
主要な変更をまとめるとこうなる。
| 機能 | Spark 3.5 | Spark 4.0 | 影響度 |
|---|---|---|---|
| Python API | PySpark (RDD互換) | Spark Connect デフォルト化 | 高 |
| Pandas on Spark | pandas-on-spark (koalas後継) | pandas API on Spark 強化 | 中 |
| AQE | オプション(デフォルトON) | さらに強化・挙動変更 | 高 |
| Python UDF | pickle serialization | Apache Arrow強化 | 中 |
| ANSI SQL | オプション | デフォルトON | 高 |
| DataFrame API | 一部deprecated | 整理・廃止 | 中 |
| JVM | Java 11+ | Java 17必須 | 低〜中 |
影響度「高」が3つ並んでいる時点でお察しなんだが、特に痛かったのはANSI SQLのデフォルトON化だ。既存のSQLクエリにNULL扱いやデータ型強制の差異があって、サイレントに結果が変わるケースが出た。「あれ、この集計値おかしくない?」となったときは正直焦った。しかも一見動いているのに数値がズレているパターンが一番発見しにくくて厄介だった。
Spark Connectも曲者だった。従来のSparkSessionが内部的にローカルで動いていたのに対して、Spark ConnectはクライアントとSparkクラスタをgRPCで分離する。開発体験は格段に良くなるんだけど、既存のSparkContextに直接アクセスするコードが全部壊れる。うちにはsc._jvm経由でJVM内部を叩いてるコードが数箇所あって、それを全部書き直すハメになった。
# Spark 3.5 まで動いてたコード(Spark Connect では NG)
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# JVM直叩き
hive_conf = sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
# Spark 4.0 で書き直したバージョン
# SparkConnect 経由では JVM に直アクセスできないため、
# Spark SQL の SET コマンドや spark.conf で代替
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
spark.conf.set("hive.exec.dynamic.partition", "true")
この変更に気づかずステージング環境でしばらく格闘した。ログを見ても分かりにくいエラーメッセージで、原因特定に2日かかった。2日ってそこそこ長いようだけど、エラーが「JVM云々」ではなく別の場所を指していたので仕方ない部分もある。
AQEとの格闘:「自動最適化」が裏目に出た話
Adaptive Query Execution(AQE)はSpark 3.0から入って、3.5でかなり成熟した機能だ。Spark 4.0でさらに強化されたんだけど、これが既存のパフォーマンスチューニングを根底から覆してくれた。
AQEの主な機能は3つ——Skew Join最適化・シャッフルパーティション動的変更・Join戦略の実行時変更だ。理論上は全部「よくなる方向」なんだけど、問題は「予測可能性」が下がること。チューニング職人的な勘が通用しなくなるというか、「あのパラメータでうまくいく」という経験則が全部リセットされる感覚があった。
xychart-beta
title "AQEあり/なし実行時間比較(ジョブ種別)"
x-axis ["小テーブルJoin", "大テーブルJoin", "集計処理", "Skewデータ", "ウィンドウ関数"]
y-axis "実行時間(分)" 0 --> 45
bar [12, 38, 8, 42, 15]
bar [10, 22, 7, 11, 16]
(青がAQEなし、オレンジがAQEあり。Skewデータのケースは劇的に改善するけど、ウィンドウ関数は誤差程度)
Skewデータの処理は本当に劇的に改善した。うちのデータには特定のテナントIDに処理が集中するパターンがあって、そこが毎回ボトルネックになっていた。AQEのSkew Join対策は実際に効いたし、これだけでも4.0に上げる価値はあると思っている。
一方でハマったのがシャッフルパーティション数の動的変更だ。Spark 3.5までspark.sql.shuffle.partitions=200で固定していたのに、AQE 4.0ではこれをランタイムに再設定する。問題は、後続のジョブがパーティション数を前提とした処理をしていたケース。設定でコントロールできるので、以下を入れてから実行時間のばらつきがかなり小さくなった。
# AQE有効時にパーティション数が想定外になるパターン
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# これを設定しないと AQE が勝手に 200 → 40 程度に減らすことがある
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "50")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "134217728") # 128MB
# Skew Join 対策
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
正直まだ完全に理解できていない部分もあって、advisoryPartitionSizeInBytesの最適値はデータ特性によって変わるので検証が必要だ。128MBで出発して、実行計画を見ながら調整するのが現実的なアプローチだと思っている。
Pandas on Spark で起きたデータ型の地雷
うちのチームにはPythonのpandasに慣れたデータサイエンティストが何人かいて、pyspark.pandas(旧Koalas)を使って分析コードを書いてもらっていた。Spark 4.0でのAPIの変更で、これが結構壊れた。
特に痛かったのはデータ型の推論周りで、しかもエラーで落ちるわけじゃなくて結果が静かにズレるやつだから発見が遅れる。
import pyspark.pandas as ps
import pandas as pd
# Spark 3.5 では暗黙変換が通っていたコード
psdf = ps.read_csv("s3://bucket/data/*.csv")
# 文字列の日付が object 型のまま処理されていた
print(psdf['date_col'].dtype) # object
# Spark 4.0 では型推論が厳密化
# 同じ CSV を読むと datetime64[ns] に変換されることがある
# 後続の文字列操作が TypeError で落ちる
# 対策:明示的にスキーマを指定する
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([
StructField("date_col", StringType(), True),
StructField("value", DoubleType(), True),
StructField("category", StringType(), True)
])
sdf = spark.read.schema(schema).csv("s3://bucket/data/*.csv")
psdf = sdf.pandas_api() # Spark 4.0 での推奨メソッド名
.to_pandas_on_spark()が非推奨になって.pandas_api()に変わったのも地味に面倒だった。コード量が多いとグローバル一括置換したくなるんだけど、テストが通るまで1日かかった。「一括置換できる変更」と「ちゃんと動作確認が要る変更」を混在させると精神的によくない。
データ品質管理2026年版の記事でも書いたように、型の不一致は下流に行くほど発見が遅れる。Sparkの移行と同時にスキーマバリデーションを厳密化したのは、結果的にかなり良い判断だった。
本番チューニングで実際に効いた設定
半年間の試行錯誤で、うちの構成(EMR 7.x + Spark 4.0 + Delta Lake 3.x)で効果があった設定をまとめる。全部の環境に当てはまるわけじゃないけど、スタート地点として参考にしてほしい。
# spark-defaults.conf または SparkSession builder に渡す設定
conf = {
# AQE 関連
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.coalescePartitions.minPartitionNum": "50",
"spark.sql.adaptive.advisoryPartitionSizeInBytes": "134217728",
"spark.sql.adaptive.skewJoin.enabled": "true",
"spark.sql.adaptive.skewJoin.skewedPartitionFactor": "5",
# メモリ管理
"spark.memory.fraction": "0.8",
"spark.memory.storageFraction": "0.3",
"spark.executor.memoryOverheadFactor": "0.1",
# シリアライズ(Arrow強化)
"spark.sql.execution.arrow.pyspark.enabled": "true",
"spark.sql.execution.arrow.pyspark.fallback.enabled": "true",
# Delta Lake 最適化
"spark.databricks.delta.optimizeWrite.enabled": "true",
"spark.databricks.delta.autoCompact.enabled": "true",
# ANSI SQL(既存コードがある場合は一時的に無効化して段階移行)
# 最終的には true にすべきだが移行期は false で逃げた
"spark.sql.ansi.enabled": "false", # 移行完了後は true に変更
# Dynamic Partition Pruning
"spark.sql.optimizer.dynamicPartitionPruning.enabled": "true",
# クエリプランキャッシュ
"spark.sql.queryExecutionListeners": "",
}
builder = SparkSession.builder.appName("production-pipeline")
for k, v in conf.items():
builder = builder.config(k, v)
spark = builder.getOrCreate()
spark.sql.ansi.enabledをfalseで逃げているのは個人的にイケてないと思っているんだけど、既存SQLを全部ANSIに対応させるのは工数がかかりすぎて、段階的に直しているところだ。新規で書くSQLは全部ANSI準拠で書くようにルール化しているので、時間が経てば自然に比率は下がっていくはず。
パフォーマンスの変化をグラフで見ると、移行後はこんな感じになった。
xychart-beta
title "日次バッチ処理時間の推移(週次平均、単位:分)"
x-axis ["2025-09", "2025-10", "2025-11", "2025-12", "2026-01", "2026-02", "2026-03"]
y-axis "処理時間(分)" 0 --> 120
line [88, 105, 112, 98, 82, 71, 68]
11月が一番しんどかった。AQEの挙動が安定しなくてSLAをギリギリ守る状態が続いた。12月以降は設定を詰めていって、今は3.5時代よりむしろ速くなっている。移行前のベースラインが88分だったのに対して今は68分なので、2割ちょっと改善している計算だ。あの地獄の2か月を考えると、もっと早く安定させたかったという気持ちもあるけど。
分散バッチアーキテクチャ完全ガイドにアーキテクチャ全体の設計についてまとまった情報があるので、全体設計から考え直したい場合はそちらも参考にしてほしい。
移行を振り返って:やっておけばよかったこと
Spark 4.0移行プロジェクトを終えて思うのは、「事前の計測と段階移行の設計」をもっとちゃんとやるべきだったということだ。後から振り返ると「なんでこれを最初にやらなかったんだ」というものばかりで、当時の自分を殴りたくなる。
1. 実行計画の差分比較を自動化する
Spark 3.5と4.0で同じクエリのEXPLAIN出力を比較するスクリプトを書いたのが移行途中だった。最初から用意しておけば、どのクエリに影響が出るか事前に洗い出せた。
def compare_explain_plans(spark_old, spark_new, query: str) -> dict:
"""2つのSparkSessionで実行計画を比較する"""
plan_old = spark_old.sql(f"EXPLAIN EXTENDED {query}").collect()[0][0]
plan_new = spark_new.sql(f"EXPLAIN EXTENDED {query}").collect()[0][0]
return {
"query": query[:100],
"plan_old": plan_old,
"plan_new": plan_new,
"changed": plan_old != plan_new
}
2. ANSI SQLの非互換チェックを先にやる
Spark 4.0を一時的にspark.sql.ansi.enabled=trueで起動して、全クエリをドライランするバッチを組むべきだった。後から1本ずつ直すのは本当にしんどい。これが一番「最初にやっておけば」と後悔したポイントだ。
3. Python UDFのArrow対応を事前に確認する
Arrowシリアライズが強化されたことで、型変換が暗黙的に行われていたUDFが壊れることがある。型アノテーションをちゃんと書いていなかったUDFが何本かあって、それが原因で結果が静かにズレていた。
# NG: 型アノテーションが曖昧
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
@udf(returnType=StringType())
def format_id(x): # 引数の型が不明確
return f"ID-{x}"
# OK: Pandas UDF + 型アノテーションで Arrow 最適化
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(StringType())
def format_id_arrow(series: pd.Series) -> pd.Series:
return series.apply(lambda x: f"ID-{x}")
Pandas UDFはApache Arrowを使ってPython-JVM間のデータ転送を効率化するので、通常のPython UDFより圧倒的に速い。Spark 3.xのときから使える機能だったんだけど、4.0移行を機にチーム全体で統一した。これは地味に便利で、単純な文字列加工UDFでも体感できるレベルで速くなるケースがある。
まとめ
半年間のSpark 4.0移行で学んだことを正直にまとめるとこうなる。
| やること | タイミング | 理由 |
|---|---|---|
| ANSI SQL非互換チェック(ドライラン) | 移行前・最優先 | 後から1本ずつ直す地獄を避ける |
sc._jvm依存コードのgrep洗い出し | 移行前 | Spark Connect対応の工数を正確に見積もるため |
| 実行計画差分比較スクリプトの整備 | 移行前 | 影響クエリの事前特定 |
| AQEパラメータの調整 | 移行後・段階的に | 実データで動かしながら詰めるしかない |
| Python UDF → Pandas UDF化 | 移行と同時進行 | 性能改善がセットで取れる |
| スキーマの明示化(StructType) | Pandas on Spark利用箇所すべて | 型推論に頼ると下流でサイレントにズレる |
次のアクションとしては、今年中にspark.sql.ansi.enabled=trueでの完全移行を終わらせることと、Spark Connectベースの開発環境を整備してデータサイエンティストのローカル実行環境を改善したいと思っている。Spark Connectは開発体験の向上という意味でかなり期待しているので、そこはまた別記事で書く予定だ。
皆さんのチームでもSpark 3系→4系の移行を検討しているところはあると思う。どこで詰まってるか、どんな知見があるかコメントで教えてもらえると嬉しい。