Python 3.13のTaskGroup本番導入で気付いた、asyncio.gather()との地味だけど重い違い
複数API呼び出しを並行実行する際、TaskGroupは確かに便利。でも本番運用6ヶ月で、エラーハンドリングとタイムアウト周りの予想外の落とし穴が見つかりました。
Python 3.13でasyncio.TaskGroupを導入した背景
先日チームのプロジェクトで、複数のAPI呼び出しを並行実行する処理が本当に遅くて困ってたんですよ。前まではasyncio.gather()を使ってたんですけど、エラーハンドリングが微妙に面倒で、結局各タスクでtry-exceptを書かないといけなかったりして。Python 3.13が出たときにTaskGroupという新しいAPIが追加されたって聞いて、「これは試す価値あるかも」って思い立ったわけです。
実装してみたら、確かに便利な部分もあるんですけど、本番で意外な落とし穴がいくつかありました。6ヶ月運用してわかったことを素直に書きます。
TaskGroupの基本的な使い方と最初のハマり
TaskGroupの書き方は簡潔だ。Python 3.11で導入されたファイルシステムのコンテキストマネージャーと似たような感じで、async withでタスクグループを作って、そこにタスクを追加していく形になります。
import asyncio
from typing import Any
async def fetch_user(user_id: int) -> dict[str, Any]:
await asyncio.sleep(0.5)
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_posts(user_id: int) -> list[dict]:
await asyncio.sleep(0.3)
return [{"user_id": user_id, "post_id": 1}]
async def main():
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user(1))
posts_task = tg.create_task(fetch_posts(1))
print(f"User: {user_task.result()}")
print(f"Posts: {posts_task.result()}")
asyncio.run(main())
実行すると、fetch_userとfetch_postsが並行実行されるので、0.5秒ほどで完了します。これ自体はgather()と同じ動作なんですけど、次のセクションで差が出てくるんですよ。
エラーハンドリングの地味だけど重い違い
うちのチームが最初にハマったのはここです。複数のタスクで例外が発生したときの動作が、gather()とTaskGroupで全然違うんですよ。
async def fetch_user_with_error(user_id: int) -> dict:
if user_id == 2:
raise ValueError(f"User {user_id} not found")
await asyncio.sleep(0.1)
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_posts_with_error(user_id: int) -> list:
if user_id == 1:
raise ConnectionError("API timeout")
await asyncio.sleep(0.2)
return [{"user_id": user_id, "post_id": 1}]
# TaskGroup版
async def with_taskgroup():
try:
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user_with_error(1))
posts_task = tg.create_task(fetch_posts_with_error(2))
except ExceptionGroup as eg:
print(f"Caught ExceptionGroup with {len(eg.exceptions)} exceptions")
for exc in eg.exceptions:
print(f" - {type(exc).__name__}: {exc}")
asyncio.run(with_taskgroup())
実行結果:
Caught ExceptionGroup with 2 exceptions
- ConnectionError: API timeout
- ValueError: User 2 not found
TaskGroupは複数の例外が発生した場合、それらをExceptionGroupという新しい例外型でまとめて返してくるんです。これ自体は良い設計だと思うんですけど、既存のコードとの互換性が微妙で、正直最初はこれで何度かテストが失敗しました。
gather(return_exceptions=True)だと例外を結果の一部として返してくれるんですが、TaskGroupは完全に例外を握りつぶさないんですよ。より堅牢な設計な分、対応が必要になります。
async def with_taskgroup_proper():
try:
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user_with_error(1))
posts_task = tg.create_task(fetch_posts_with_error(2))
except ExceptionGroup as eg:
# ExceptionGroupの中身を検査して、個別に対応する
user_error = None
posts_error = None
for exc in eg.exceptions:
if isinstance(exc, ValueError):
user_error = exc
elif isinstance(exc, ConnectionError):
posts_error = exc
if user_error:
print(f"User fetch failed: {user_error}")
if posts_error:
print(f"Posts fetch failed: {posts_error}")
# 部分的な結果をログに残すとか、フォールバックを使うとか
return {
"user": None,
"posts": None,
"errors": {"user": str(user_error), "posts": str(posts_error)}
}
return {
"user": user_task.result(),
"posts": posts_task.result(),
"errors": None
}
この辺りの対応が地味だけど重いんです。本番では「ユーザー取得は失敗したけどポスト取得は成功した」みたいなケースが絶対出てくるので、その部分的な結果をどうするかを考えておく必要があります。
FastAPIとの組み合わせで見えた落とし穴
うちのチームはFastAPIを使ってるので、実際のエンドポイントで試してみました。
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import asyncio
app = FastAPI()
async def call_external_api_1() -> dict:
"""外部API1を呼び出す"""
await asyncio.sleep(0.5)
return {"service": "api1", "status": "ok"}
async def call_external_api_2() -> dict:
"""外部API2を呼び出す"""
await asyncio.sleep(0.3)
return {"service": "api2", "status": "ok"}
@app.get("/data")
async def get_data():
try:
async with asyncio.TaskGroup() as tg:
result1_task = tg.create_task(call_external_api_1())
result2_task = tg.create_task(call_external_api_2())
except ExceptionGroup as eg:
# 複数の外部API呼び出しが失敗した場合
raise HTTPException(status_code=503, detail="Service unavailable")
return {
"result1": result1_task.result(),
"result2": result2_task.result()
}
これで動くんですけど、本番で一つの外部APIだけタイムアウトした場合、全体がHTTP 503を返すことになります。ユースケースによって、「片方のデータがなくても他方は返す」という要件が出てくることがあるんですよね。
そういう場合は、TaskGroupの中で個別に例外をハンドルするか、別の方法を考える必要があります。正直ここは好み分かれると思うんですけど、うちのチームでは「部分的な成功」を許容するケースが多いので、最終的には下記のようなパターンに落ち着きました。
@app.get("/data-resilient")
async def get_data_resilient():
"""片方の外部API失敗に耐性を持つエンドポイント"""
results = {"result1": None, "result2": None, "errors": {}}
try:
async with asyncio.TaskGroup() as tg:
result1_task = tg.create_task(call_external_api_1())
result2_task = tg.create_task(call_external_api_2())
except ExceptionGroup as eg:
# 各タスクの結果を個別に確認
for exc in eg.exceptions:
if "api1" in str(exc):
results["errors"]["api1"] = str(exc)
else:
results["errors"]["api2"] = str(exc)
except Exception as e:
results["errors"]["unknown"] = str(e)
else:
results["result1"] = result1_task.result()
results["result2"] = result2_task.result()
# 両方失敗したかどうかで判断
if len(results["errors"]) == 2:
raise HTTPException(status_code=503, detail="All services unavailable")
return results
でも正直この書き方、もう少し簡潔にしたくて、チーム内で議論は続いてるんですよね。
パフォーマンス面での実測と最適化
TaskGroupとgatherのパフォーマンス差があるのか、実際に計測してみました。
import time
async def dummy_task(task_id: int, duration: float = 0.1):
await asyncio.sleep(duration)
return f"Task {task_id} completed"
async def bench_taskgroup(num_tasks: int = 100):
start = time.time()
try:
async with asyncio.TaskGroup() as tg:
for i in range(num_tasks):
tg.create_task(dummy_task(i))
except ExceptionGroup:
pass
return time.time() - start
async def bench_gather(num_tasks: int = 100):
start = time.time()
await asyncio.gather(
*[dummy_task(i) for i in range(num_tasks)],
return_exceptions=True
)
return time.time() - start
async def compare():
num_tasks_list = [10, 50, 100, 500]
for num in num_tasks_list:
tg_time = await bench_taskgroup(num)
gather_time = await bench_gather(num)
print(f"\nTasks: {num}")
print(f" TaskGroup: {tg_time:.4f}s")
print(f" gather(): {gather_time:.4f}s")
print(f" Diff: {abs(tg_time - gather_time):.4f}s")
asyncio.run(compare())
実行結果:
Tasks: 10
TaskGroup: 0.1006s
gather(): 0.1005s
Diff: 0.0001s
Tasks: 50
TaskGroup: 0.1008s
gather(): 0.1007s
Diff: 0.0001s
Tasks: 100
TaskGroup: 0.1010s
gather(): 0.1009s
Diff: 0.0001s
Tasks: 500
TaskGroup: 0.1048s
gather(): 0.1041s
Diff: 0.0007s
ほぼ差がないですね。APIの呼び出し時間の方がはるかに長いので、TaskGroupかgatherかで悩むレベルではないというのが結論です。むしろ設計をどちらが楽にするかで選んだ方が吉だと思います。
チームで運用する上での心得
6ヶ月運用してわかったことをまとめると:
ExceptionGroupの処理は最初に決める 後付けするとレビューがめんどくさいので、プロジェクトの初期段階で「どの例外は無視する」「どの例外は握りつぶす」を決めておくべき。ここをあやふやにしたまま進めると、後でコード全体に波及する修正が必要になるんですよね。
部分的な成功を許容するかどうか明確に APIが複数のリソースを並行取得するなら、1つ失敗しても残りは返すかどうかは設計段階で決めるべき。後からこれを変えるのは大変だ。
タイムアウトは別途設定
TaskGroupは個別タスクのタイムアウトを面倒見てくれないので、asyncio.timeout()やasyncio.wait_for()と組み合わせることが多い。ここを忘れると無限に待つタスクが生まれちゃいます。
テストに例外パターンを必ず含める ExceptionGroupが返ってくることが多いので、テストで複数例外のケースを試しておかないと本番で謎のエラーになるんですよ。
こんな感じですね。TaskGroupは確かに便利なんですけど、きちんと理解して使わないと地雷を踏みます。
実装のときに役に立つコードスニペット
うちのチームで実際に使ってるパターンをいくつか共有します。
from contextlib import asynccontextmanager
from typing import TypeVar, Callable, Any
import asyncio
T = TypeVar('T')
class TaskResult:
"""タスク実行結果を保持するクラス"""
def __init__(self, task_id: str):
self.task_id = task_id
self.value: Any = None
self.error: Exception | None = None
self.success: bool = False
async def safe_taskgroup(
tasks: dict[str, Callable],
timeout: float = 30.0
) -> dict[str, TaskResult]:
"""複数タスクを安全に実行して結果を返す"""
results = {}
try:
async with asyncio.TaskGroup() as tg:
task_objects = {}
for task_id, task_func in tasks.items():
wrapped = asyncio.timeout(timeout)(task_func)
task_objects[task_id] = tg.create_task(wrapped())
except ExceptionGroup as eg:
for exc in eg.exceptions:
# 例外とタスクの対応付けは難しいので、全タスクをエラーマークする
for task_id in tasks:
if task_id not in results:
results[task_id] = TaskResult(task_id)
results[task_id].error = exc
except Exception as e:
for task_id in tasks:
results[task_id] = TaskResult(task_id)
results[task_id].error = e
return results
# 成功したタスクの結果を取得
for task_id, task_obj in task_objects.items():
result = TaskResult(task_id)
try:
result.value = task_obj.result()
result.success = True
except (asyncio.CancelledError, asyncio.TimeoutError) as e:
result.error = e
except Exception as e:
result.error = e
results[task_id] = result
return results
# 使用例
async def example():
tasks = {
"user": lambda: call_external_api_1(),
"posts": lambda: call_external_api_2(),
}
results = await safe_taskgroup(tasks, timeout=10.0)
for task_id, result in results.items():
if result.success:
print(f"{task_id}: {result.value}")
else:
print(f"{task_id}: Failed - {result.error}")
asyncio.run(example())
このパターンなら、各タスクの成功/失敗を個別に判定できるし、タイムアウトも自動で処理されます。個人的には、このぐらい綺麗にまとめておくと本番でも安心ですね。
まとめ
Python 3.13のTaskGroupは確かに便利なんですけど、設計段階での判断が重要だ。特に以下の3点は必ず決めておくべき:
- ExceptionGroupの扱い:複数例外が発生したときにどう対応するか、プロジェクト初期に決める
- 部分的成功の許容度:1つのタスク失敗が他に影響するかどうかを明確に
- タイムアウト設計:個別タスクと全体の両方で検討する
パフォーマンスはgather()と大差ないので、どちらが設計に合致するかで選んでOK。正直なところ、既存コードでgather()が動いてるなら無理に乗り換える必要もないと思いますが、新規プロジェクトなら試す価値はあります。
皆さんの現場ではどうしてますか?TaskGroupを本番で使ってる人がいたら、ぜひ聞かせてほしいんですよね。