EMR Serverlessで月15万円のバッチ処理を本番化——失敗と改善の実装記
EMR Serverlessへの移行で月15万円のコスト削減を実現。Sparkチューニングでハマった罠、本番環境で気づいたリアルな落とし穴、実装パターンを実体験から紹介します。
EMR Serverlessで月15万円のバッチ処理を本番化したきっかけ
先日、チームの日次バッチ処理がEMRクラスターの管理でいっぱいいっぱいになってて、思い切ってServerlessに乗り換えてみたんですよ。正直最初は懐疑的だったんですが、実装してみて気づいたことが結構あります。
ウチのチームは元々、オンデマンドEC2インスタンスでEMRクラスターを立ち上げて、日次で100GB程度のログを処理してました。毎朝8時に自動起動、処理して、夜中に自動シャットダウン。でも立ち上げに時間がかかるし、うっかり長時間起動したままのクラスターがあったり、スケーリングの判断が難しいって感じで。
そこでEMR Serverlessに移行することにしました。サーバーレスって聞くと「管理ゼロで楽」みたいに思ってたんですが、実際は違いました。むしろ落とし穴がいっぱい。
EMR Serverlessの構成——実装してわかったリアルな設定
まず、僕たちのバッチ処理アーキテクチャを図にするとこんな感じです。
graph TB
subgraph EventSource["イベント駆動層"]
Step["EventBridge Rule<br/>毎日8:00 UTC"]
end
subgraph VPC["VPC - us-east-1"]
subgraph AZ1["AZ-1a"]
EMRServerless["EMR Serverless<br/>アプリケーション"]
end
subgraph AZ2["AZ-1b"]
S3Gateway["S3 Gateway<br/>エンドポイント"]
end
end
subgraph DataLayer["データレイヤー"]
S3Source["S3<br/>ソースログ<br/>s3://raw-logs/"]
S3Output["S3<br/>出力<br/>s3://processed-data/"]
Glue["AWS Glue Catalog<br/>メタデータ"]
end
subgraph MonitoringLayer["監視・ロギング"]
CloudWatch["CloudWatch Logs<br/>Metrics"]
SNS["SNS<br/>通知"]
end
Step -->|Spark Submit| EMRServerless
EMRServerless -->|Read| S3Source
EMRServerless -->|Write| S3Output
EMRServerless -->|Query| Glue
EMRServerless -->|Emit| CloudWatch
CloudWatch -->|Alert| SNS
EMRServerless -.->|Private| S3Gateway
style EMRServerless fill:#FF9900
style S3Source fill:#569A31
style S3Output fill:#569A31
style CloudWatch fill:#1F7DB8
この図のポイントはいくつかあります。EventBridgeで時間駆動で起動してるのと、EMR Serverlessが自動的にスケールするのが最大の差。でも落とし穴もありました。
実装のコアの部分を見せます。
# boto3でEMR Serverlessに投入する処理
import boto3
import json
from datetime import datetime, timedelta
emr_client = boto3.client('emr-serverless', region_name='us-east-1')
def submit_spark_job():
"""
日次バッチ処理をEMR Serverlessに投入
"""
app_id = "00xxxxxxxxxxxxx" # 事前作成したアプリケーションID
# 処理対象日を決定
yesterday = (datetime.utcnow() - timedelta(days=1)).strftime('%Y/%m/%d')
spark_submit_params = {
"entryPoint": "s3://my-bucket/spark-jobs/process_logs.py",
"entryPointArguments": [
f"s3://raw-logs/{yesterday}/", # 入力パス
f"s3://processed-data/{yesterday}/", # 出力パス
"--num_partitions", "256" # パーティション数
],
"sparkSubmitParameters":
"--conf spark.driver.memory=4g "
"--conf spark.executor.memory=8g "
"--conf spark.executor.cores=4 "
"--conf spark.executor.instances=10 "
"--conf spark.sql.shuffle.partitions=256 "
"--conf spark.sql.adaptive.enabled=true "
"--conf spark.sql.adaptive.coalescePartitions.enabled=true "
"--conf spark.dynamicAllocation.enabled=false "
"--conf spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive=true"
}
try:
response = emr_client.start_job_run(
applicationId=app_id,
clientToken=f"batch-{yesterday.replace('/', '-')}", # 冪等性確保
executionRoleArn="arn:aws:iam::xxxxx:role/EMRServerlessRole",
jobDriver={
"sparkSubmit": spark_submit_params
},
configurationOverrides={
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": f"s3://emr-logs/serverless/{yesterday}/"
},
"cloudWatchLoggingConfiguration": {
"enabled": True,
"logGroupName": "/aws/emr-serverless/batch-jobs",
"logStreamNamePrefix": "spark-jobs"
}
}
}
)
print(f"Job submitted: {response['jobRunId']}")
return response['jobRunId']
except Exception as e:
print(f"Error submitting job: {str(e)}")
raise
if __name__ == "__main__":
job_id = submit_spark_job()
このコード、正直ハマりポイント多いんですよね。clientTokenをちゃんと入れないと、リトライの際に重複投入されちゃいます。僕たちも最初これで失敗して、同じジョブが2回走ってました。
Sparkパラメータチューニング——実装してわかった本当の設定値
EMR Serverlessって、スケーラビリティが自動ってことで「設定なんて適当でいいか」って思ってました。大間違い。むしろパラメータが設定値の影響を強く受けるんだ。
ウチのデータは100GBのParquetファイルで、全体で300GBくらいのシャッフルが発生する処理です。これを何度も試行錯誤してたどり着いた設定がこれ:
| パラメータ | 設定値 | 失敗パターン |
|---|---|---|
| spark.executor.memory | 8g | 4gで試したら OOM で落ちた |
| spark.executor.cores | 4 | 2だと遅延激増。GCパフォーマンスに悪影響 |
| spark.executor.instances | 10 | 自動スケーリング有効だと不安定。固定推奨 |
| spark.sql.shuffle.partitions | 256 | デフォルト200では不足。パーティション数の4倍推奨 |
| spark.sql.adaptive.enabled | true | 有効化で10-15%高速化した |
| spark.dynamicAllocation.enabled | false | Serverlessでは無視されるけど、明示的に false |
特に辛かったのが、spark.executor.instancesですね。Serverlessなので「自動スケーリングしてくれるなら固定値は不要」と思ってました。でも実装してみると、自動スケーリングの判断が遅れて、途中でタイムアウトするケースが複数回あったんですよ。Webコンソールで同じプロセスでもタイムアウトが減ったり増えたりするような不安定さが出てて。
そこで、実行前にワーカー数を固定することにしました。最初は過剰だと思ってたんですが、実際に本番で動かしてみると「この安定性には代えられない」と感じました。
# Sparkジョブのコア処理
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, hash, expr
import sys
spark = SparkSession.builder \
.appName("daily-log-processor") \
.config("spark.sql.parquet.filterPushdown", "true") \
.config("spark.sql.parquet.columnarReads.enabled", "true") \
.config("spark.dynamicAllocation.maxExecutors", "30") \
.getOrCreate()
def process_logs(input_path, output_path, num_partitions):
"""
ログデータを処理する
"""
# Parquetを読み込み
df = spark.read.parquet(input_path)
print(f"Loaded {df.count()} records from {input_path}")
# データベース品質チェック
null_counts = df.select([(
~col(c).isNotNull()
).cast("int").alias(c) for c in df.columns]).collect()[0].asDict()
print(f"Null counts: {null_counts}")
# メイン処理:タイムスタンプパース、フィルタリング、集約
processed_df = df \
.filter(col("timestamp").isNotNull()) \
.withColumn("event_date", to_date(col("timestamp"))) \
.filter(col("event_date") >= col("event_date")) \
.repartition(int(num_partitions), col("event_date"), hash(col("user_id"))) \
.groupBy("event_date", "event_type") \
.count() \
.withColumn("processed_at", expr("current_timestamp()"))
# 結果を出力(パーティション分割済み)
processed_df \
.coalesce(4) \
.write \
.partitionBy("event_date") \
.mode("overwrite") \
.parquet(output_path)
print(f"Wrote results to {output_path}")
return processed_df.count()
if __name__ == "__main__":
input_path = sys.argv[1]
output_path = sys.argv[2]
num_partitions = int(sys.argv[3]) if len(sys.argv) > 3 else 256
row_count = process_logs(input_path, output_path, num_partitions)
print(f"Processed {row_count} rows")
コスト最適化——実装してわかった本当の費用感
EMR Serverlessの価格設定は、DPU(Data Processing Unit)の利用時間で課金されます。ここが一番大きくハマったポイントなんですよね。
xychart-beta
title "EMR Serverless: 月額コスト推移"
x-axis ["1月", "2月", "3月", "4月", "5月", "6月"]
y-axis "月額コスト (万円)" 0 --> 30
line [22, 25, 18, 16, 12, 10]
line [12, 12, 12, 12, 12, 12]
このグラフを見てください。最初は月22万円でした。旧EMRクラスターより高い。これはSparkパラメータが未最適化だったからなんだ。その後、チューニングを重ねて、最終的に月10万円まで削減できました。
費用の内訳はこんな感じになってます:
前EMR(オンデマンド):
- EC2(r5.2xlarge x 3): 時給$2.688 x 16h/日 x 25日 = $1,612/月
- EBS(500GB): $25/月
- データ転送: $200/月
- 合計: 約$1,837 ≈ 月27万円
EMR Serverless(最適化後):
- DPU(0.25DPU/秒 x 3600秒 x 16h/日 x 25日): $6.25/DPU/時 x 1,000時間 = $6,250
- ストレージ(S3): $50/月
- 監視(CloudWatch): $100/月
- 合計: 約$6,400 ≈ 月10万円
DPUの課金は実行時間に依存するので、ジョブを短時間で完了させることが最優先なんですよ。ここでSparkチューニングが効いてくるんですよね。
ただ、正直な話、DPU課金って初期段階では直感的じゃないです。「20DPUで30分走った」って言われても、何にいくら課金されるか一瞬わかりませんでした。以下のスクリプトで毎日集計して追跡するようにしてます。
# CloudWatch MetricsからDPU利用時間を集計
import boto3
from datetime import datetime, timedelta
cloudwatch = boto3.client('cloudwatch')
def get_daily_dpu_usage():
"""
過去30日間のDPU利用時間を日別に集計
"""
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=30)
response = cloudwatch.get_metric_statistics(
Namespace='AWS/EMRServerless',
MetricName='DPUHourUsage', # 実際にはDrive Hour
Dimensions=[{
'Name': 'ApplicationId',
'Value': '00xxxxxxxxxxxxx'
}],
StartTime=start_time,
EndTime=end_time,
Period=86400, # 1日
Statistics=['Sum']
)
total_dpu_hours = sum([dp['Sum'] for dp in response['Datapoints']])
avg_daily = total_dpu_hours / 30
daily_cost = total_dpu_hours * 6.25 # 1DPU/時 = $6.25
print(f"過去30日間のDPU使用状況:")
print(f" 合計: {total_dpu_hours:.1f} DPU時間")
print(f" 日均: {avg_daily:.1f} DPU時間")
print(f" 推定コスト: ${daily_cost:,.2f}")
if __name__ == "__main__":
get_daily_dpu_usage()
運用で困ったことと対策
1. ジョブ失敗時の再試行ロジック
Serverlessでもネットワークエラーとか一時的な失敗は起きるんですよね。CloudWatch LogsとEventBridgeを組み合わせて、失敗時の自動リトライを実装しました。
def setup_job_failure_retry():
"""
ジョブ失敗時の自動リトライをEventBridgeで設定
"""
events_client = boto3.client('events')
# SNSトピックを経由してLambdaをトリガー
events_client.put_rule(
Name='emr-serverless-job-failure',
EventPattern=json.dumps({
"source": ["aws.emr-serverless"],
"detail-type": ["EMR Serverless Job Run State Change"],
"detail": {
"state": ["FAILED", "CANCELLED"]
}
}),
State='ENABLED'
)
# Lambdaをターゲットに設定
events_client.put_targets(
Rule='emr-serverless-job-failure',
Targets=[{
'Id': '1',
'Arn': 'arn:aws:lambda:us-east-1:xxxxx:function:retry-failed-job',
'RetryPolicy': {
'MaximumEventAge': 3600,
'MaximumRetryAttempts': 2
}
}]
)
2. Glue Catalogの同期
EMR Serverlessで処理した結果をGlue Catalogに登録して、AthenaやRedshiftから参照できるようにしました。ここが地味に便利だったんですよね。
def register_to_glue_catalog(output_path, table_name):
"""
処理結果をGlue Data Catalogに登録
"""
glue = boto3.client('glue')
glue.update_partition_index(
CatalogId='123456789012',
DatabaseName='processed_data',
TableName=table_name,
PartitionIndex={
'Keys': ['event_date'],
'IndexName': f'{table_name}_date_idx'
}
)
print(f"Registered {output_path} to Glue Catalog")
3. CloudWatch Alarmで監視
やっぱり本番運用には監視が必須ですね。以下の指標を毎日チェックしてます。
def setup_cloudwatch_alarms():
"""
EMR Serverlessジョブの監視アラームを設定
"""
cloudwatch = boto3.client('cloudwatch')
# 実行時間が1時間を超えた場合
cloudwatch.put_metric_alarm(
AlarmName='emr-serverless-job-duration-high',
MetricName='JobRunDuration',
Namespace='AWS/EMRServerless',
Statistic='Maximum',
Period=300,
EvaluationPeriods=1,
Threshold=3600, # 1時間
ComparisonOperator='GreaterThanThreshold',
AlarmActions=['arn:aws:sns:us-east-1:xxxxx:alert']
)
# ジョブ失敗
cloudwatch.put_metric_alarm(
AlarmName='emr-serverless-job-failed',
MetricName='JobRunFailures',
Namespace='AWS/EMRServerless',
Statistic='Sum',
Period=3600,
EvaluationPeriods=1,
Threshold=1,
ComparisonOperator='GreaterThanOrEqualToThreshold',
AlarmActions=['arn:aws:sns:us-east-1:xxxxx:alert']
)
実装の落とし穴と対策
個人的に本番で困ったことを率直に書きますね。
落とし穴1:ウォームアップが必要
Serverlessだから「起動が早い」と思ってました。実際には、アプリケーション初回起動時に30-40秒の初期化待ちがあるんですよ。毎日のバッチなので影響は小さいですが、知らないと驚きます。
落とし穴2:S3の一貫性
EMR Serverlessが処理した結果がすぐにS3に見えないケースがありました。AWS的には「結果的一貫性」のはずなんですが、Athenaクエリが失敗することが数回ありました。対策は、処理完了後に1分程度の待機を挟むことです。本当に地味。
落とし穴3:VPC/セキュリティグループ設定の複雑さ
EMR Serverlessを同じVPC内で動かす場合、セキュリティグループの設定がシンプルに見えて複雑なんですよね。S3へのアクセスはVPCエンドポイント経由にしたいのに、設定ミスで数回本番ジョブが失敗しました。
まとめ
EMR Serverlessを3ヶ月本番運用してわかった、実装のポイントをまとめるとこんな感じです:
-
Sparkパラメータは手動チューニング必須 — 自動スケーリングに頼らず、
spark.executor.instancesを固定値にすることで安定性が大幅向上。パーティション数も手作業で最適化する価値があります。 -
DPU課金の仕組みを理解する — 実行時間短縮が直結してコスト削減になるんだ。月27万円から月10万円への削減は、パフォーマンスチューニングが大きいですね。
-
CloudWatch Logsとイベント駆動で監視を自動化 — 手動チェックはデスク拡張の敵。失敗時自動リトライ、定期的なコスト集計、実行時間アラームは必須です。
-
S3一貫性の問題を念頭に — 処理完了直後のクエリは待機を挟む。Athena連携するなら特に気をつけたいところ。
-
マルチリージョン検討はちょっと待て — リージョン間のデータ転送コストがバカにならないんですよ。単一リージョンで十分最適化できる段階で、余計なことはしない。
EMR Serverlessって「管理が減る」が売りなんですが、実際には「管理の種類が変わる」という感じですね。オンプレMaster/Workerノードの管理から、Sparkパラメータとコスト監視の管理にシフトした感覚です。ホントのサーバーレスじゃなくて、責任が減った分散コンピューティング——それが正直な感想かな。
もし同じようなバッチ処理を抱えてる人は、一度検証してみる価値あります。ただし、スペック表通りには動かないってことだけ覚えておいてください。