Spark 3.5→4.0移行で本当にハマった話|AQEが勝手に最適化してくる罠

Spark3.5から4.0への移行で半年苦労した実体験。ネットに書いてないAQEの落とし穴、パーティション自動マージの問題、本番チューニングの現実を本音で解説します。

Spark 3.5から4.0へ、なぜこんなに面倒なのか

先日プロジェクトで、データレイクのSparkバージョンを3.5から4.0に上げることになった。「そろそろ新しいバージョンでも試してみるか」くらいの軽い気持ちで始めたんだけど、本当に地獄だった。半年かけて検証・調整してようやく本番に乗せたけど、その過程で学んだ「ネットに書いてない落とし穴」が山ほどある。

正直なところ、ドキュメントだけ読んでたら絶対に引っかかる。特にAQE(Adaptive Query Execution)周りの振る舞いが3.5と大きく変わってて、既存のチューニング知識がそのまま通用しなかった。この記事は、自分たちが実装した時の失敗と成功パターンを共有するスタンスで書いてます。

Spark 4.0の主要な変更点——何が本当に変わったのか

Spark 4.0のリリースノートをザッと見ると、いろいろ書いてあるんだけど、実装レベルで本当に重要な変更は意外と少ない。うちのチームが「これが本当に影響した」と感じたのは以下の3つです。

項目Spark 3.5Spark 4.0影響度
AQEデフォルト動作基本的な最適化のみパーティション自動マージが積極的
Dynamic Partition Pruning基本的なプルーニング精度向上・副作用増加
PythonシリアライゼーションCloudPickle低版CloudPickle高版対応

それぞれ掘り下げます。

AQE(Adaptive Query Execution)のデフォルト動作の変化

Spark 3.5ではAQEはデフォルト有効だったけど、4.0ではより積極的に動作するようになった。特にspark.sql.adaptive.coalescePartitions.enabledの扱いが変わって、デフォルトで自動的にパーティションをマージしようとする。これ自体は良い最適化なんだけど、既存のジョブが「パーティション数は固定」という前提で動いてると、突然パフォーマンスが崩れるんですよね。

DynamicPartitionPruningの精度向上

パーティションプルーニングがより正確になった。これ自体は素晴らしいんだけど、既存のクエリで「実はプルーニングできてなかった部分」が急に最適化されちゃって、予想と違うパフォーマンス特性を示すことがある。地味に厄介です。

PythonスクリプトのシリアライゼーションとArrowの扱い

PySpark使ってる人には特に重要。CloudPickleのバージョンが上がって、既存のカスタムUDFが動かなくなるケースがある。複雑なオブジェクトをキャプチャしてるUDFは特に注意が必要だ。

本番環境で起きた3つの地獄

地獄1: パーティション自動マージでメモリ爆発

うちのジョブは毎日数TB規模のデータを処理してて、パーティション数は1000個以上。それをspark.sql.shuffle.partitionsで調整しながら運用してた。3.5では問題なかったのに、4.0にアップグレードしたら、いきなり”Executor lost”のエラーが多発した。

最初は「メモリが足りなくなったのかな」と思ってexecutor memoryを増やしたんだけど、焼け石に水。ログを詳しく見てみると、なんと自動的にパーティション数が300個に圧縮されてて、1つのパーティションあたりのデータサイズが5倍になってた。

AQEのcoalescePartitionsが勝手に動いてたんだ。正直これは盲点だった。

# 原因:こいつがデフォルトで有効に
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)

# または、より細かく制御する場合
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", 512)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", False)  # skew joinも注意

これを明示的にオフにしたら、パフォーマンスは元に戻った。ここで学んだのは、AQEって「自動的に最適化してくれる」と思ってると痛い目を見るということ。チームで「本当に必要な最適化は何か」を議論する必要があったんですよね。

地獄2: Dynamic Partition Pruningの副作用

複数テーブルのJOINを使ったクエリで、4.0に移行したら実行時間が2倍になった。クエリのLOGICAL PLANを見てもSQLは変わってないのに、フィジカルプランが全く違う。

DynamicPartitionPruningが過度に効きすぎて、本来ブロードキャストすべき小さいテーブルまでプルーニング対象にされてた。結果として、ネットワークトラフィックが増えて全体が遅くなってた。悪い意味での最適化ですね。

# ExplainコマンドでPhysical Planを確認
df.explain(mode="formatted")

# 必要に応じてDPPを明示的に制御
spark.conf.set("spark.sql.exchange.reuse.enabled", True)
spark.conf.set("spark.sql.dynamicPartitionPruning.reuseBroadcastOnly", False)  # これが鍵

実装パターンとしては、「小さいテーブルは必ずBROADCAST JOINにする」という明示的な指定を増やすことで対応した。自動に頼らず、人間が制御する方がずっと安全だと気づかされました。

from pyspark.sql.functions import broadcast

result = large_df.join(broadcast(small_df), "key")

地獄3: PySpark + カスタムUDFの地獄

Pythonで書いたカスタムUDFが3.5では動いてたのに、4.0ではシリアライゼーションエラーで落ちるようになった。CloudPickleのバージョンが上がったせいで、ローカル変数をキャプチャしてるUDFが動かなくなってた。こんなことまで対応する羽目になるとは思ってなかった。

# こういうコード(3.5では動いた)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

config_value = "some_config"

@udf(StringType())
def my_udf(x):
    return x + config_value  # ← この外側の変数参照が問題に

df.withColumn("result", my_udf(df.col))

4.0では、このパターンが”PicklingError”で落ちることがある。対応策は2つあって、個人的には方法2の方をおすすめします。

# 方法1: broadcast変数を使う(いけど複雑)
config_broadcast = spark.broadcast({"value": "some_config"})

@udf(StringType())
def my_udf(x):
    return x + config_broadcast.value["value"]

# 方法2: SQL関数に移行する(最もおすすめ)
from pyspark.sql.functions import lit, concat

config_value = "some_config"
df.withColumn("result", concat(df.col, lit(config_value)))

本音を言うと、PySpark + カスタムUDFの組み合わせは3.5でも3.4でも微妙な互換性問題が多い。4.0に移行するなら、SQL関数かPandas UDFに移行することを強く推奨する。これマジで。

実装の工夫——本番で効いた3つの対策

対策1: Catalyst Optimizer をより理解する

4.0では、Catalystオプティマイザーがさらに複雑になった。同じクエリでも、統計情報が正確だと全く違う実行計画になる。ここを甘く見るとハマるんですよね。

# テーブルの統計情報を明示的に更新
spark.sql("ANALYZE TABLE my_table COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE my_table COMPUTE STATISTICS FOR COLUMNS col1, col2")

# 統計情報を確認
spark.sql("DESC FORMATTED my_table").show()

うちのチームは、本番デプロイ前に必ず統計情報を更新するようにした。これだけで、AQEの動作がずっと予測可能になった。地味だけど効果抜群です。

対策2: Query Executionプランの可視化

Explainコマンドを使って、実際のフィジカルプランを確認することを習慣化した。これはもう必須作業だと言っていい。

# formattedで詳細表示(わりと見やすい)
df.explain(mode="formatted")

# extended modeで最も詳細(パーティション数とかも見える)
df.explain(mode="extended")

特に本番デプロイ前に、Explainの結果をメトリクスと合わせて記録しておくと、後で問題が起きた時に原因特定が格段に楽になる。個人的には、Explainはコードレビュー時に必須だと考えるようになりました。

対策3: Configurationの明示的な設定

デフォルト値に頼らず、重要な設定は全て明示的に指定する方針に変えた。これがスタンダードになるべきだと思うくらい重要です。

spark.conf.set("spark.sql.adaptive.enabled", True)  # 有効化は必須
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)  # 環境に合わせて制御
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)  # Skew join最適化
spark.conf.set("spark.sql.shuffle.partitions", 512)  # 明示的にパーティション数指定
spark.conf.set("spark.sql.broadcastTimeout", 900)  # Broadcast timeout
spark.conf.set("spark.sql.files.maxRecordsPerFile", 0)  # ファイル分割なし
spark.conf.set("spark.memory.fraction", 0.6)  # ExecutorMemoryの配分

これらを環境ごとにプロファイル化して、本番環境で一貫性を保つようにした。マジックナンバーを避けることが大事です。

実運用で見えた2026年時点でのSpark 4.0の現実

今年(2026年)現在、Spark 4.0は十分安定して本番環境で使える。ただし、3.5からの移行には細かい調整が必須なんですよ。特にAQEの扱いを理解していないと、知らずにパフォーマンスが退化することがある。マジで要注意です。

うちのチームの場合、移行に半年かかったけど、その期間にやったことは以下の通り。

  • 段階的な移行:まず検証環境で3ヶ月、その後本番の一部ジョブで2ヶ月、最後に全体移行
  • 詳細なベンチマーク:毎月、主要なジョブの実行時間とメモリ使用量を追跡
  • ドキュメント化:チーム向けのSpark 4.0ガイドを作成(AQE周りは特に詳しく)

これをやることで、新しいバージョンのメリット(より高速な実行計画、改善されたスケジューリング)を活かしつつ、既存ジョブの安定性を保つことができた。

ちなみに、最新のSpark情報についてはこちらの記事も参考になる:Apache Spark 2026年最新動向|大規模データ処理の最適化戦略

現場で学んだ、チーム運用の工夫

移行プロジェクトで学んだのは、技術的な問題よりも「チーム内の共有」が重要だということ。Sparkはダイナミックな最適化が増えると、個人の経験だけじゃ対応できない。

うちで実装したのは以下の3つのアプローチです。

週1回の「Spark Performance Review」

メインのジョブ5個に絞って、毎週Explainプランと実行時間をレビュー。異常値があったら即座に調査。これを3ヶ月続けたら、チーム全体のSparkリテラシーが一段階上がった。正直、この習慣が一番効いたと思う。

Configurationの中央管理

JSONで本番用の推奨Configを定義して、Git管理。新しいジョブはそのテンプレートから始める。こうすることで、設定の漂流を防ぐことができました。

{
  "production": {
    "spark.sql.adaptive.enabled": true,
    "spark.sql.adaptive.coalescePartitions.enabled": false,
    "spark.sql.shuffle.partitions": 512,
    "spark.executor.memory": "8g",
    "spark.executor.cores": 4
  },
  "testing": {
    "spark.sql.adaptive.enabled": true,
    "spark.sql.shuffle.partitions": 64
  }
}

まとめ

Spark 3.5から4.0への移行は、単なるバージョンアップじゃなく、データエンジニアリングの実装パターンの見直しが必要な変化だった。特に重要なのが以下の3点です。

AQE設定の理解が必須 自動最適化に頼らず、本番環境に合わせた明示的な設定が必須。coalescePartitionsskewJoinの挙動を理解してから移行すること。ここを甘く見ると本当に痛い目を見ます。

統計情報の管理 4.0ではCatalystオプティマイザーがテーブル統計に依存度が高い。本番デプロイ前に必ずANALYZEコマンドを実行。これだけで予測可能性がグンと上がる。

段階的な移行とベンチマーク 一気に全体を移行するのではなく、検証環境→本番の一部→全体という段階を踏む。各段階でExplain結果と実行時間を記録することが大事だ。

正直、移行は手間がかかる。でも新しいバージョンのメリット(より高速な実行計画、改善されたエラーハンドリング)は本物だから、時間をかけて検証する価値がある。もし今3.5を使ってるなら、焦らず徐々に移行することをおすすめする。焦りは禁物です。

次のステップとしては、バッチ処理設計|スケーラブルなシステム構築ガイドでバッチ処理全体の設計パターンを確認してから、Spark 4.0の導入を検討するのがいいと思う。

U

Untanbaby

ソフトウェアエンジニア|AWS / クラウドアーキテクチャ / DevOps

10年以上のIT実務経験をもとに、現場で使える技術情報を発信しています。 記事の誤りや改善点があればお問い合わせからお気軽にご連絡ください。

関連記事