AIエージェント開発で痛い目を見た話|2026年の実装課題と解決策

ChatGPTとは全く違うAIエージェント開発。LangChain、CrewAI、Anthropic Agentsを実際に試した結果とフレームワーク選定の失敗談を共有します。

先日プロジェクトで、本格的なAIエージェント開発に取り組み始めた

正直なところ、ChatGPTのような単一の応答モデルを使うのとは全く別のチャレンジがあることに気づかされた。エージェントって、一見シンプルに見えるけど、実装してみると「なんでこんなに複雑なんだ?」って思うことばかりだ。

うちのチームでは、LLM駆動のタスク自動化エージェントを構築していて、現在は小規模な顧客向けパイロット運用に入っている。そこで得た知見を、今日は全部さらけ出そうと思う。

Agent Frameworksの選定で、最初に失敗したこと

まず2025年末に着手したとき、「LangChain Agentでいいでしょ」って単純に考えていた。だけど1ヶ月使ってみて、本当にやりたいことに対してはオーバーキルだし、逆に細かい制御ができないことに気づいた。

2026年時点でメジャーなAgent Frameworksを整理するとこんな感じだ:

フレームワークメリットデメリット向いてる用途
LangChain Agentエコシステムが豊富、例が多い重い、細かい制御難しいプロトタイプ・学習
CrewAIマルチエージェント調整が得意、役割分担がシンプルドキュメント少ない、バージョン不安定チーム型自動化タスク
Anthropic AgentsClaude統合が最強、思考過程が透明Claude APIコスト高い複雑な推論が必要な案件
AutoGen(Microsoft)多様なLLM対応、カスタマイズ性高いセットアップ複雑、チューニング手間大企業向けカスタムエージェント
Native LLM API + Custom Logic最高の制御性、コスト最適化可能開発時間かかる、デバッグ辛い本番・スケール運用

うちが最終的に選んだのは、Anthropic AgentsとOpenAI APIを組み合わせた、ハイブリッドアプローチだ。理由は単純明快:

  1. Claudeの「思考」機能(thinking)が、複雑な推論タスクで異常に強い
  2. でもコスト面でGPT-4oの方が安いシーンも多い
  3. だから2つを使い分ける判断ロジックを自分たちで書いた

このアプローチは、実際のエージェント開発では複数LLM操作が当たり前になったってわけだ。

メモリ管理で、本当に困ったこと

エージェントって、会話履歴やツール実行の過去ログを保持し続けないといけない。これが、短期ラン(数分)では問題ないけど、長期運用(数時間のタスク)では爆発する。

うちのシステムでは、顧客の複数タスクを並列でこなす設計にしていたから、メモリ使用量が指数関数的に増加する問題に直面した。

実装した対策はこれだ:

from datetime import datetime, timedelta
import json
from typing import Optional

class AgentMemoryManager:
    def __init__(self, max_tokens: int = 8000, retention_days: int = 7):
        self.max_tokens = max_tokens
        self.retention_days = retention_days
        self.messages = []
        self.archived_messages = []
    
    def add_message(self, role: str, content: str, metadata: dict = None):
        """メッセージ追加時に古いものを自動削除"""
        message = {
            "timestamp": datetime.now().isoformat(),
            "role": role,
            "content": content,
            "metadata": metadata or {},
            "tokens": len(content.split())  # 簡易カウント
        }
        
        self.messages.append(message)
        self._prune_old_messages()
        self._enforce_token_limit()
    
    def _prune_old_messages(self):
        """retention_days以上前のメッセージをアーカイブ"""
        cutoff = datetime.now() - timedelta(days=self.retention_days)
        
        to_archive = [
            msg for msg in self.messages
            if datetime.fromisoformat(msg["timestamp"]) < cutoff
        ]
        
        self.archived_messages.extend(to_archive)
        self.messages = [
            msg for msg in self.messages
            if datetime.fromisoformat(msg["timestamp"]) >= cutoff
        ]
    
    def _enforce_token_limit(self):
        """トークン数の上限を超えたら、古いメッセージを削除"""
        total_tokens = sum(msg["tokens"] for msg in self.messages)
        
        if total_tokens > self.max_tokens:
            # 重要度の低いタイプのメッセージから削除
            priority = {"system": 3, "assistant": 2, "user": 1, "tool": 0}
            self.messages.sort(
                key=lambda x: (
                    priority.get(x["role"], 0),
                    x["timestamp"]
                )
            )
            
            # 古いものから削除
            while sum(msg["tokens"] for msg in self.messages) > self.max_tokens:
                self.messages.pop(0)
    
    def get_context(self, max_messages: int = 50) -> list:
        """エージェント実行用のコンテキストを取得"""
        # 最新のメッセージを優先(前後関係が大切)
        return self.messages[-max_messages:]
    
    def summarize_old_context(self) -> Optional[str]:
        """アーカイブされたメッセージを要約"""
        if not self.archived_messages:
            return None
        
        # 簡易的な要約(実運用ではLLMで要約)
        summary = {
            "total_archived": len(self.archived_messages),
            "date_range": {
                "start": min(msg["timestamp"] for msg in self.archived_messages),
                "end": max(msg["timestamp"] for msg in self.archived_messages)
            },
            "roles_count": {}
        }
        
        for msg in self.archived_messages:
            role = msg["role"]
            summary["roles_count"][role] = summary["roles_count"].get(role, 0) + 1
        
        return json.dumps(summary, indent=2)

# 実装例
manager = AgentMemoryManager(max_tokens=8000, retention_days=7)

# エージェント実行ループ
for step in agent_steps:
    manager.add_message("user", step["input"], {"step": step["id"]})
    result = llm_call(manager.get_context())
    manager.add_message("assistant", result, {"model": "claude-opus"})

この実装で気づいたポイント:

  • トークン数じゃなく、メッセージ数で管理する方が運用しやすい。後で正確なカウントに修正できるから、まずはシンプルに。
  • アーカイブ機能は必須。本当に古い情報は捨てて、要約だけ保持することで、メモリ圧力が大きく改善される。
  • 優先度付け削除が大事。ツール実行ログより、ユーザー指示の方が絶対に重要だからね。

マルチエージェント調整の地味な辛さ

次に直面したのが、複数エージェントが同じリソースにアクセスするときの競合だ。

うちのシステムでは:

  1. 「分析エージェント」がデータベースクエリを実行
  2. 「実行エージェント」がAPIコールして状態を変更
  3. 「報告エージェント」が結果をまとめる

この3つが並列で動くんだけど、分析がまだ走ってるのに実行エージェントが状態を変えちゃう、みたいなことが起きた。地味だけど、ほんっとに困った。

import asyncio
from enum import Enum

class AgentRole(Enum):
    ANALYZER = "analyzer"
    EXECUTOR = "executor"
    REPORTER = "reporter"

class AgentOrchestrator:
    def __init__(self):
        self.agent_locks = {role: asyncio.Lock() for role in AgentRole}
        self.execution_order = [
            AgentRole.ANALYZER,
            AgentRole.EXECUTOR,
            AgentRole.REPORTER
        ]
    
    async def run_sequential_with_dependencies(
        self,
        task: dict
    ) -> dict:
        """依存関係を尊重した順序実行"""
        results = {}
        
        for role in self.execution_order:
            async with self.agent_locks[role]:
                print(f"Running {role.value}...")
                
                # 前のエージェント結果を入力として使用
                agent_input = {
                    "task": task,
                    "previous_results": results
                }
                
                results[role.value] = await self._execute_agent(
                    role,
                    agent_input
                )
        
        return results
    
    async def _execute_agent(self, role: AgentRole, input_data: dict) -> dict:
        """個別エージェント実行(非同期)"""
        # ここで実際のエージェントロジックを呼び出し
        await asyncio.sleep(1)  # シミュレーション
        return {"status": "completed", "role": role.value}

# 実行例
orchestrator = AgentOrchestrator()

task = {
    "customer_id": "cust_123",
    "action": "analyze_and_report"
}

results = asyncio.run(orchestrator.run_sequential_with_dependencies(task))
print(results)

この実装で重要なのは:

  • ロックを使って順序を強制する。並列性は犠牲にするけど、データ整合性が保証される。本番では、この堅実さが何より大事。
  • 依存関係をコードで明示するexecution_orderに書くだけで、将来のメンテも楽になるし、新しい人もすぐ理解できる。
  • 前のエージェント結果を次に渡す。チェーンと違って、各エージェントが全体の状況を知ってる。判断の質が上がる。

実際には、もう少し複雑な依存関係(AとBは並列OK、でもCはBの後)みたいなケースも出てくる。その場合は、イベント駆動アーキテクチャを導入して、イベントベースで通知する方が柔軟だった。

推論コスト最適化で、やってよかったこと

エージェントって、ツール呼び出しのたびにLLM APIを叩くから、すぐにコストが爆発する。うちの場合、初月は推論コストが予算の3倍になってしまった。マジで焦った。

そこで実装した対策を2つ紹介する。

1. プロンプトキャッシング(Claude)の活用

Anthropicが2025年秋に導入した「プロンプトキャッシング」を使うと、同じシステムプロンプトやドキュメントを何度も送信しなくて済む。うちの場合、システムプロンプント(約2,000トークン)がキャッシュされて、1,500回のAPI呼び出しで90%削減できた。正直、これがなかったら今の運用は成り立ってない。

from anthropic import Anthropic

client = Anthropic()

# システムプロンプトは固定で大量に使用される
system_prompt = """
You are a customer support agent...
[大量のナレッジベース、例、ガイドラインなど]
"""

def call_agent_with_caching(user_message: str) -> str:
    response = client.messages.create(
        model="claude-opus",
        max_tokens=1024,
        system=[
            {
                "type": "text",
                "text": system_prompt,
                "cache_control": {"type": "ephemeral"}  # キャッシュを有効化
            }
        ],
        messages=[{"role": "user", "content": user_message}]
    )
    
    # 使用量の確認
    usage = response.usage
    print(f"Input tokens: {usage.input_tokens}")
    print(f"Cache creation tokens: {getattr(usage, 'cache_creation_input_tokens', 0)}")
    print(f"Cache read tokens: {getattr(usage, 'cache_read_input_tokens', 0)}")
    
    return response.content[0].text

# 複数回呼び出し(2回目以降はキャッシュヒット)
for i in range(5):
    result = call_agent_with_caching(f"Question {i}")

2. 軽い意思決定は小さいモデルで

すべてのタスクにClaude OpusやGPT-4oを使う必要ない。エージェントの中でも段階分けした方が、コストを大きく圧縮できる。

class AdaptiveAgentRouter:
    async def route_to_model(self, task: dict) -> str:
        """
        タスクの複雑さで最適なモデルを選択
        """
        complexity_score = self._estimate_complexity(task)
        
        if complexity_score < 2:  # 簡単
            return "gpt-3.5-turbo"  # 激安
        elif complexity_score < 5:  # 中程度
            return "gpt-4-turbo"  # 中位
        else:  # 複雑
            return "claude-opus"  # 最強
    
    def _estimate_complexity(self, task: dict) -> float:
        """タスクの複雑さを推定"""
        score = 0
        
        # ルール:単語数が多いほど複雑
        score += len(task.get("description", "").split()) * 0.1
        
        # 推論ステップが多いと複雑
        score += len(task.get("steps", [])) * 0.5
        
        # 外部データへのアクセスが多いと複雑
        score += len(task.get("data_sources", [])) * 0.3
        
        return score

router = AdaptiveAgentRouter()
task = {"description": "...", "steps": [...], "data_sources": [...]}
model = asyncio.run(router.route_to_model(task))

このアプローチで、推論コストを当初の40%まで削減できた。正直、最初から意識してなかったから、もったいなかった。早めに対策打つべき案件だと痛感したね。

デバッグの地獄と、そこからの解放

エージェントは、何をやってるのか分からなくなりやすい。特にツール呼び出しが連鎖するとき、本当にカオスになる。

うちが導入したのは、完全なトレース記録だ。これなしでは本番運用は無理だと言い切れる:

import json
from datetime import datetime
from typing import Any

class AgentTracer:
    def __init__(self, session_id: str):
        self.session_id = session_id
        self.traces = []
        self.start_time = datetime.now()
    
    def log_agent_step(self, step_num: int, agent_name: str, action: str, **kwargs):
        """エージェントの各ステップをログ"""
        trace = {
            "timestamp": datetime.now().isoformat(),
            "session_id": self.session_id,
            "step": step_num,
            "agent": agent_name,
            "action": action,
            "details": kwargs
        }
        self.traces.append(trace)
    
    def log_tool_call(self, tool_name: str, input_data: dict, output_data: Any, duration_ms: float):
        """ツール呼び出しをログ"""
        self.traces.append({
            "timestamp": datetime.now().isoformat(),
            "type": "tool_call",
            "tool": tool_name,
            "input": input_data,
            "output": output_data if isinstance(output_data, (dict, list, str, int, float)) else str(output_data),
            "duration_ms": duration_ms
        })
    
    def log_decision(self, decision: str, reasoning: str, confidence: float):
        """エージェントの判断をログ"""
        self.traces.append({
            "timestamp": datetime.now().isoformat(),
            "type": "decision",
            "decision": decision,
            "reasoning": reasoning,
            "confidence": confidence
        })
    
    def save(self, filepath: str):
        """トレースを JSON に保存"""
        output = {
            "session_id": self.session_id,
            "duration_seconds": (datetime.now() - self.start_time).total_seconds(),
            "trace_count": len(self.traces),
            "traces": self.traces
        }
        
        with open(filepath, 'w') as f:
            json.dump(output, f, indent=2)
    
    def get_failure_chain(self):
        """何が失敗したのかを逆順でたどる"""
        for trace in reversed(self.traces):
            if trace.get("type") == "decision" and trace.get("confidence", 1.0) < 0.7:
                return trace
        return None

# 実装例
tracer = AgentTracer("sess_12345")

tracer.log_agent_step(1, "analyzer", "querying_database", table="customers")
tracer.log_tool_call("db_query", {"query": "SELECT ...", "table": "customers"},  {"rows": 100}, 234.5)

tracer.log_decision(
    decision="route_to_human",
    reasoning="Confidence too low for automated response",
    confidence=0.45
)

tracer.save("/tmp/agent_trace_sess_12345.json")

# 失敗原因を特定
failure = tracer.get_failure_chain()
if failure:
    print(f"Failed at: {failure['decision']}")
    print(f"Reasoning: {failure['reasoning']}")

このトレースを見ると、どのステップで何が起きたのか、なぜ失敗したのか、一目瞭然になる。本番運用では、このトレースを自動でCloudWatch Logsに送ってる。「なんでダメなのかわかんない」状態から解放されたときの爽快感は、マジに言葉にならないレベル。

実運用で気づいた、細かいけど大事なこと

ここからは、マジで細かいけど本番では引っかかる話だ。

ツール呼び出しのタイムアウト設定を甘く見てはいけない。エージェントが「これはAPIを呼ぶ必要がある」と判断したのに、そのAPIが遅いと、ユーザーは永遠に待つことになる。地味に怖い。

import asyncio
from functools import wraps

def with_timeout(seconds: int):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            try:
                return await asyncio.wait_for(
                    func(*args, **kwargs),
                    timeout=seconds
                )
            except asyncio.TimeoutError:
                return {"error": "timeout", "details": f"Operation exceeded {seconds}s"}
        return wrapper
    return decorator

# ツール定義
@with_timeout(5)  # 5秒でタイムアウト
async def call_external_api(endpoint: str, data: dict):
    # 実装
    pass

エージェントの「考えすぎ」対策も必要なんです。たまにLLMが無限ループに入ったようになる。特にClaudeのthinkingトークンを使うときは、上限を設定しないと、とんでもないコストになる。

response = client.messages.create(
    model="claude-opus",
    max_tokens=16000,
    thinking={
        "type": "enabled",
        "budget_tokens": 5000  # 思考に費やすトークンの上限
    },
    system="...",
    messages=[...]
)

次のステップと、これからの課題

今のところ、パイロット段階で月50万円程度のAPI費用がかかってる。スケールするには、もう少し最適化が必要だ。

やりたいことリスト:

  1. エージェント間の学習共有 - 一つのエージェントが改善したら、他も自動で学ぶ仕組み
  2. ローカルLLMの統合 - Ollama + llama.cpp で、小さいタスクは完全オンプレミス化
  3. 監視と自動フェイルオーバー - エージェントが機能不全に陥ったら、別のアプローチに自動切り替え

1番目は、「フィードバックループ」の発展形だと思ってる。2番目については、既に試験中だ。3番目は、インシデント対応のベストプラクティスと組み合わせる必要があって、ここはまだ検証中。

まとめ

2026年のAIエージェント開発で実装して学んだことを3つの柱にまとめると:

1. Framework選びより、メモリ・コスト管理が先。LangChainでいいかCrewAIでいいか、ってよりも、どうやって長期運用するかをいかに設計するかが本当に大事。うちはハイブリッド構成で複数LLM使い分けることで、推論コスト40%削減できた。

2. マルチエージェント調整は順序が命。並列処理は魅力的だけど、本番では依存関係を明示的にコード化して、ロックで順序を守る方が安全。後々複雑な並列化が必要になったら、イベント駆動に移行する戦略で対応する。

3. デバッグは、完全なトレース記録がないと地獄。何が起きたのか分からない状態が続くと、チーム全体のモチベーション下がる。トレース機能は、最初は過剰に見えるけど、月が経つほど価値が出てくる。

いま開発してるエージェント、正直まだ本番レディではない。でも、ここまでの知見があれば、次のプロジェクトではずっと早く進められそうな手応えがある。皆さんが同じように開発してるなら、この先の「あの問題どうやって解決したの?」って質問、ぜひ聞いてくれ。

U

Untanbaby

ソフトウェアエンジニア|AWS / クラウドアーキテクチャ / DevOps

10年以上のIT実務経験をもとに、現場で使える技術情報を発信しています。 記事の誤りや改善点があればお問い合わせからお気軽にご連絡ください。

関連記事