Spark 3.5→4.0移行で本当にハマった話|AQEが勝手に最適化してくる罠
Spark3.5から4.0への移行で半年苦労した実体験。ネットに書いてないAQEの落とし穴、パーティション自動マージの問題、本番チューニングの現実を本音で解説します。
Spark 3.5から4.0へ、なぜこんなに面倒なのか
先日プロジェクトで、データレイクのSparkバージョンを3.5から4.0に上げることになった。「そろそろ新しいバージョンでも試してみるか」くらいの軽い気持ちで始めたんだけど、本当に地獄だった。半年かけて検証・調整してようやく本番に乗せたけど、その過程で学んだ「ネットに書いてない落とし穴」が山ほどある。
正直なところ、ドキュメントだけ読んでたら絶対に引っかかる。特にAQE(Adaptive Query Execution)周りの振る舞いが3.5と大きく変わってて、既存のチューニング知識がそのまま通用しなかった。この記事は、自分たちが実装した時の失敗と成功パターンを共有するスタンスで書いてます。
Spark 4.0の主要な変更点——何が本当に変わったのか
Spark 4.0のリリースノートをザッと見ると、いろいろ書いてあるんだけど、実装レベルで本当に重要な変更は意外と少ない。うちのチームが「これが本当に影響した」と感じたのは以下の3つです。
| 項目 | Spark 3.5 | Spark 4.0 | 影響度 |
|---|---|---|---|
| AQEデフォルト動作 | 基本的な最適化のみ | パーティション自動マージが積極的 | 高 |
| Dynamic Partition Pruning | 基本的なプルーニング | 精度向上・副作用増加 | 高 |
| Pythonシリアライゼーション | CloudPickle低版 | CloudPickle高版対応 | 中 |
それぞれ掘り下げます。
AQE(Adaptive Query Execution)のデフォルト動作の変化
Spark 3.5ではAQEはデフォルト有効だったけど、4.0ではより積極的に動作するようになった。特にspark.sql.adaptive.coalescePartitions.enabledの扱いが変わって、デフォルトで自動的にパーティションをマージしようとする。これ自体は良い最適化なんだけど、既存のジョブが「パーティション数は固定」という前提で動いてると、突然パフォーマンスが崩れるんですよね。
DynamicPartitionPruningの精度向上
パーティションプルーニングがより正確になった。これ自体は素晴らしいんだけど、既存のクエリで「実はプルーニングできてなかった部分」が急に最適化されちゃって、予想と違うパフォーマンス特性を示すことがある。地味に厄介です。
PythonスクリプトのシリアライゼーションとArrowの扱い
PySpark使ってる人には特に重要。CloudPickleのバージョンが上がって、既存のカスタムUDFが動かなくなるケースがある。複雑なオブジェクトをキャプチャしてるUDFは特に注意が必要だ。
本番環境で起きた3つの地獄
地獄1: パーティション自動マージでメモリ爆発
うちのジョブは毎日数TB規模のデータを処理してて、パーティション数は1000個以上。それをspark.sql.shuffle.partitionsで調整しながら運用してた。3.5では問題なかったのに、4.0にアップグレードしたら、いきなり”Executor lost”のエラーが多発した。
最初は「メモリが足りなくなったのかな」と思ってexecutor memoryを増やしたんだけど、焼け石に水。ログを詳しく見てみると、なんと自動的にパーティション数が300個に圧縮されてて、1つのパーティションあたりのデータサイズが5倍になってた。
AQEのcoalescePartitionsが勝手に動いてたんだ。正直これは盲点だった。
# 原因:こいつがデフォルトで有効に
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
# または、より細かく制御する場合
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", 512)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", False) # skew joinも注意
これを明示的にオフにしたら、パフォーマンスは元に戻った。ここで学んだのは、AQEって「自動的に最適化してくれる」と思ってると痛い目を見るということ。チームで「本当に必要な最適化は何か」を議論する必要があったんですよね。
地獄2: Dynamic Partition Pruningの副作用
複数テーブルのJOINを使ったクエリで、4.0に移行したら実行時間が2倍になった。クエリのLOGICAL PLANを見てもSQLは変わってないのに、フィジカルプランが全く違う。
DynamicPartitionPruningが過度に効きすぎて、本来ブロードキャストすべき小さいテーブルまでプルーニング対象にされてた。結果として、ネットワークトラフィックが増えて全体が遅くなってた。悪い意味での最適化ですね。
# ExplainコマンドでPhysical Planを確認
df.explain(mode="formatted")
# 必要に応じてDPPを明示的に制御
spark.conf.set("spark.sql.exchange.reuse.enabled", True)
spark.conf.set("spark.sql.dynamicPartitionPruning.reuseBroadcastOnly", False) # これが鍵
実装パターンとしては、「小さいテーブルは必ずBROADCAST JOINにする」という明示的な指定を増やすことで対応した。自動に頼らず、人間が制御する方がずっと安全だと気づかされました。
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
地獄3: PySpark + カスタムUDFの地獄
Pythonで書いたカスタムUDFが3.5では動いてたのに、4.0ではシリアライゼーションエラーで落ちるようになった。CloudPickleのバージョンが上がったせいで、ローカル変数をキャプチャしてるUDFが動かなくなってた。こんなことまで対応する羽目になるとは思ってなかった。
# こういうコード(3.5では動いた)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
config_value = "some_config"
@udf(StringType())
def my_udf(x):
return x + config_value # ← この外側の変数参照が問題に
df.withColumn("result", my_udf(df.col))
4.0では、このパターンが”PicklingError”で落ちることがある。対応策は2つあって、個人的には方法2の方をおすすめします。
# 方法1: broadcast変数を使う(いけど複雑)
config_broadcast = spark.broadcast({"value": "some_config"})
@udf(StringType())
def my_udf(x):
return x + config_broadcast.value["value"]
# 方法2: SQL関数に移行する(最もおすすめ)
from pyspark.sql.functions import lit, concat
config_value = "some_config"
df.withColumn("result", concat(df.col, lit(config_value)))
本音を言うと、PySpark + カスタムUDFの組み合わせは3.5でも3.4でも微妙な互換性問題が多い。4.0に移行するなら、SQL関数かPandas UDFに移行することを強く推奨する。これマジで。
実装の工夫——本番で効いた3つの対策
対策1: Catalyst Optimizer をより理解する
4.0では、Catalystオプティマイザーがさらに複雑になった。同じクエリでも、統計情報が正確だと全く違う実行計画になる。ここを甘く見るとハマるんですよね。
# テーブルの統計情報を明示的に更新
spark.sql("ANALYZE TABLE my_table COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE my_table COMPUTE STATISTICS FOR COLUMNS col1, col2")
# 統計情報を確認
spark.sql("DESC FORMATTED my_table").show()
うちのチームは、本番デプロイ前に必ず統計情報を更新するようにした。これだけで、AQEの動作がずっと予測可能になった。地味だけど効果抜群です。
対策2: Query Executionプランの可視化
Explainコマンドを使って、実際のフィジカルプランを確認することを習慣化した。これはもう必須作業だと言っていい。
# formattedで詳細表示(わりと見やすい)
df.explain(mode="formatted")
# extended modeで最も詳細(パーティション数とかも見える)
df.explain(mode="extended")
特に本番デプロイ前に、Explainの結果をメトリクスと合わせて記録しておくと、後で問題が起きた時に原因特定が格段に楽になる。個人的には、Explainはコードレビュー時に必須だと考えるようになりました。
対策3: Configurationの明示的な設定
デフォルト値に頼らず、重要な設定は全て明示的に指定する方針に変えた。これがスタンダードになるべきだと思うくらい重要です。
spark.conf.set("spark.sql.adaptive.enabled", True) # 有効化は必須
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False) # 環境に合わせて制御
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True) # Skew join最適化
spark.conf.set("spark.sql.shuffle.partitions", 512) # 明示的にパーティション数指定
spark.conf.set("spark.sql.broadcastTimeout", 900) # Broadcast timeout
spark.conf.set("spark.sql.files.maxRecordsPerFile", 0) # ファイル分割なし
spark.conf.set("spark.memory.fraction", 0.6) # ExecutorMemoryの配分
これらを環境ごとにプロファイル化して、本番環境で一貫性を保つようにした。マジックナンバーを避けることが大事です。
実運用で見えた2026年時点でのSpark 4.0の現実
今年(2026年)現在、Spark 4.0は十分安定して本番環境で使える。ただし、3.5からの移行には細かい調整が必須なんですよ。特にAQEの扱いを理解していないと、知らずにパフォーマンスが退化することがある。マジで要注意です。
うちのチームの場合、移行に半年かかったけど、その期間にやったことは以下の通り。
- 段階的な移行:まず検証環境で3ヶ月、その後本番の一部ジョブで2ヶ月、最後に全体移行
- 詳細なベンチマーク:毎月、主要なジョブの実行時間とメモリ使用量を追跡
- ドキュメント化:チーム向けのSpark 4.0ガイドを作成(AQE周りは特に詳しく)
これをやることで、新しいバージョンのメリット(より高速な実行計画、改善されたスケジューリング)を活かしつつ、既存ジョブの安定性を保つことができた。
ちなみに、最新のSpark情報についてはこちらの記事も参考になる:Apache Spark 2026年最新動向|大規模データ処理の最適化戦略
現場で学んだ、チーム運用の工夫
移行プロジェクトで学んだのは、技術的な問題よりも「チーム内の共有」が重要だということ。Sparkはダイナミックな最適化が増えると、個人の経験だけじゃ対応できない。
うちで実装したのは以下の3つのアプローチです。
週1回の「Spark Performance Review」
メインのジョブ5個に絞って、毎週Explainプランと実行時間をレビュー。異常値があったら即座に調査。これを3ヶ月続けたら、チーム全体のSparkリテラシーが一段階上がった。正直、この習慣が一番効いたと思う。
Configurationの中央管理
JSONで本番用の推奨Configを定義して、Git管理。新しいジョブはそのテンプレートから始める。こうすることで、設定の漂流を防ぐことができました。
{
"production": {
"spark.sql.adaptive.enabled": true,
"spark.sql.adaptive.coalescePartitions.enabled": false,
"spark.sql.shuffle.partitions": 512,
"spark.executor.memory": "8g",
"spark.executor.cores": 4
},
"testing": {
"spark.sql.adaptive.enabled": true,
"spark.sql.shuffle.partitions": 64
}
}
まとめ
Spark 3.5から4.0への移行は、単なるバージョンアップじゃなく、データエンジニアリングの実装パターンの見直しが必要な変化だった。特に重要なのが以下の3点です。
AQE設定の理解が必須
自動最適化に頼らず、本番環境に合わせた明示的な設定が必須。coalescePartitionsやskewJoinの挙動を理解してから移行すること。ここを甘く見ると本当に痛い目を見ます。
統計情報の管理 4.0ではCatalystオプティマイザーがテーブル統計に依存度が高い。本番デプロイ前に必ずANALYZEコマンドを実行。これだけで予測可能性がグンと上がる。
段階的な移行とベンチマーク 一気に全体を移行するのではなく、検証環境→本番の一部→全体という段階を踏む。各段階でExplain結果と実行時間を記録することが大事だ。
正直、移行は手間がかかる。でも新しいバージョンのメリット(より高速な実行計画、改善されたエラーハンドリング)は本物だから、時間をかけて検証する価値がある。もし今3.5を使ってるなら、焦らず徐々に移行することをおすすめする。焦りは禁物です。
次のステップとしては、バッチ処理設計|スケーラブルなシステム構築ガイドでバッチ処理全体の設計パターンを確認してから、Spark 4.0の導入を検討するのがいいと思う。