多 Agent Token 優化系列 pt.7:專責化 Agent 協作模式 — 從團隊設計到生產級協調

在前一篇《Agent 專責化實戰指南》中,我們學會了如何設計單一職責的精簡 Agent。然而,擁有一群優秀的專家還不夠——關鍵在於如何讓他們高效協作。本文將深入探討專責化 Agent 的協作模式,從團隊架構設計到生產級的協調機制。


核心挑戰:從個體到團隊

專責化帶來的協調複雜度

專責化前(單體 Agent):
┌───────────────────────────────────────────────────────┐
│                    全能 Agent                          │
│  ┌─────────────────────────────────────────────────┐  │
│  │ 分析 → 設計 → 開發 → 審查 → 文件             │  │
│  │ (全部在同一個 Context 內完成)                   │  │
│  └─────────────────────────────────────────────────┘  │
│                                                        │
│  優點:無協調開銷                                      │
│  缺點:System Prompt 巨大,Token 浪費                 │
└───────────────────────────────────────────────────────┘

專責化後(Agent 團隊):
┌───────────────────────────────────────────────────────┐
│                                                        │
│  ┌──────────────┐                                     │
│  │ Orchestrator │ ←── 協調開銷                        │
│  └──────┬───────┘                                     │
│         │                                              │
│    ┌────┴────┬────────┬────────┬────────┐            │
│    ▼         ▼        ▼        ▼        ▼            │
│  ┌────┐   ┌────┐   ┌────┐   ┌────┐   ┌────┐         │
│  │分析│   │設計│   │開發│   │審查│   │文件│         │
│  └────┘   └────┘   └────┘   └────┘   └────┘         │
│                                                        │
│  優點:各 Agent System Prompt 精簡                    │
│  缺點:需要協調機制,可能增加總呼叫次數               │
└───────────────────────────────────────────────────────┘

關鍵問題:協調開銷是否小於 System Prompt 節省?
答案:設計得當時,節省 >> 開銷

協調的 Token 成本模型

 1def calculate_coordination_overhead(
 2    num_agents: int,
 3    orchestrator_tokens: int,
 4    context_passing_tokens: int,
 5    avg_handoff_tokens: int
 6) -> int:
 7    """
 8    計算協調的額外 Token 開銷
 9
10    組成:
11    1. Orchestrator 呼叫(任務分解 + 結果整合)
12    2. Agent 間 Context 傳遞
13    3. Handoff 訊息
14    """
15    orchestrator_overhead = orchestrator_tokens * 2  # 開始 + 結束
16    context_overhead = context_passing_tokens * (num_agents - 1)
17    handoff_overhead = avg_handoff_tokens * num_agents
18
19    return orchestrator_overhead + context_overhead + handoff_overhead
20
21
22def calculate_system_prompt_savings(
23    general_agent_prompt: int,
24    specialized_prompts: list[int],
25    num_calls: int
26) -> int:
27    """
28    計算 System Prompt 節省
29
30    假設:每個任務只呼叫相關的專責 Agent
31    """
32    # 通用 Agent:每次呼叫都發送完整 prompt
33    general_cost = general_agent_prompt * num_calls
34
35    # 專責 Agent:只發送該 Agent 的精簡 prompt
36    specialized_cost = sum(specialized_prompts)  # 假設每個 Agent 呼叫一次
37
38    return general_cost - specialized_cost
39
40
41# 範例計算
42general_prompt = 16000  # 通用 Agent 的 System Prompt
43specialized_prompts = [2500, 3000, 2800, 2200, 1800]  # 各專責 Agent
44num_agents = len(specialized_prompts)
45
46# 假設完成一個完整任務需要呼叫 5 個功能
47# 通用 Agent:呼叫 5 次,每次 16000 tokens
48# 專責 Agent:每個專責 Agent 呼叫 1 次
49
50savings = calculate_system_prompt_savings(
51    general_prompt,
52    specialized_prompts,
53    num_calls=5
54)
55
56overhead = calculate_coordination_overhead(
57    num_agents=5,
58    orchestrator_tokens=2000,
59    context_passing_tokens=500,
60    avg_handoff_tokens=300
61)
62
63print(f"System Prompt 節省: {savings:,} tokens")
64print(f"協調開銷: {overhead:,} tokens")
65print(f"淨節省: {savings - overhead:,} tokens")
66print(f"節省比例: {(savings - overhead) / (general_prompt * 5) * 100:.1f}%")
67
68# 輸出:
69# System Prompt 節省: 67,700 tokens
70# 協調開銷: 7,500 tokens
71# 淨節省: 60,200 tokens
72# 節省比例: 75.3%

協作架構模式

模式一:Hub-and-Spoke(中心輻射)

架構圖:

                    ┌──────────────────┐
                    │   Orchestrator   │
                    │   (Hub 中心)     │
                    └────────┬─────────┘
                             │
         ┌───────────────────┼───────────────────┐
         │                   │                   │
         ▼                   ▼                   ▼
    ┌─────────┐        ┌─────────┐        ┌─────────┐
    │ Agent A │        │ Agent B │        │ Agent C │
    │ (Spoke) │        │ (Spoke) │        │ (Spoke) │
    └─────────┘        └─────────┘        └─────────┘

特點:
- 所有通訊經過 Orchestrator
- 簡單、可控
- Orchestrator 可能成為瓶頸
- 適合:任務明確、Agent 數量適中的場景
  1import anthropic
  2from dataclasses import dataclass, field
  3from typing import Optional, Callable
  4from enum import Enum
  5
  6client = anthropic.Anthropic()
  7
  8
  9class AgentRole(Enum):
 10    ORCHESTRATOR = "orchestrator"
 11    ANALYST = "analyst"
 12    DEVELOPER = "developer"
 13    REVIEWER = "reviewer"
 14    DOC_WRITER = "doc_writer"
 15
 16
 17@dataclass
 18class TaskResult:
 19    """標準化的任務結果"""
 20    agent: AgentRole
 21    task_id: str
 22    status: str  # "success", "partial", "failed"
 23    output: str
 24    summary: str  # 精簡摘要(用於傳遞給下游)
 25    tokens_used: int
 26    metadata: dict = field(default_factory=dict)
 27
 28
 29class HubAndSpokeOrchestrator:
 30    """
 31    Hub-and-Spoke 協作模式
 32
 33    所有 Agent 通訊經過中心 Orchestrator:
 34    1. Orchestrator 接收任務,分解為子任務
 35    2. 依序(或並行)派發給各 Spoke Agent
 36    3. 收集結果,決定下一步或整合最終輸出
 37    """
 38
 39    def __init__(self):
 40        self.agents = self._initialize_agents()
 41        self.task_results: list[TaskResult] = []
 42        self._token_budget = 0
 43        self._tokens_used = 0
 44
 45    def _initialize_agents(self) -> dict:
 46        """初始化各專責 Agent 的配置"""
 47        return {
 48            AgentRole.ORCHESTRATOR: {
 49                "model": "claude-sonnet-4-20250514",
 50                "system": """你是任務協調者。
 51
 52職責:
 531. 分解複雜任務為子任務
 542. 決定執行順序和依賴關係
 553. 整合各 Agent 的輸出
 56
 57輸出格式(JSON):
 58{
 59    "subtasks": [
 60        {"id": "1", "agent": "analyst", "task": "...", "depends_on": []},
 61        {"id": "2", "agent": "developer", "task": "...", "depends_on": ["1"]}
 62    ],
 63    "execution_plan": "sequential|parallel|mixed"
 64}""",
 65                "max_tokens": 2048
 66            },
 67
 68            AgentRole.ANALYST: {
 69                "model": "claude-sonnet-4-20250514",
 70                "system": """你是需求分析師。
 71
 72職責:分析需求,提取關鍵資訊。
 73
 74輸出格式:
 75- 功能需求(列表)
 76- 非功能需求(列表)
 77- 關鍵限制(列表)
 78- 建議(列表)""",
 79                "max_tokens": 2048
 80            },
 81
 82            AgentRole.DEVELOPER: {
 83                "model": "claude-sonnet-4-20250514",
 84                "system": """你是程式開發者。
 85
 86職責:根據需求實作程式碼。
 87
 88輸出格式:
 891. 完整可執行的程式碼
 902. 簡短說明(3-5 行)
 913. 使用方式""",
 92                "max_tokens": 4096
 93            },
 94
 95            AgentRole.REVIEWER: {
 96                "model": "claude-3-5-haiku-20241022",  # 輕量任務用 Haiku
 97                "system": """你是程式碼審查員。
 98
 99職責:審查程式碼品質。
100
101輸出格式(按嚴重度排序):
102🔴 嚴重問題:[...]
103🟡 建議改進:[...]
104🟢 良好實踐:[...]""",
105                "max_tokens": 1024
106            },
107
108            AgentRole.DOC_WRITER: {
109                "model": "claude-3-5-haiku-20241022",
110                "system": """你是技術文件撰寫者。
111
112職責:撰寫清晰的技術文件。
113
114輸出格式(Markdown):
115## 概述
116## 安裝
117## 使用方式
118## API 參考""",
119                "max_tokens": 2048
120            }
121        }
122
123    def _call_agent(
124        self,
125        role: AgentRole,
126        task: str,
127        context: str = ""
128    ) -> TaskResult:
129        """呼叫單個 Agent"""
130        config = self.agents[role]
131
132        # 組合輸入(Context + Task)
133        full_input = f"{context}\n\n任務:{task}" if context else task
134
135        response = client.messages.create(
136            model=config["model"],
137            max_tokens=config["max_tokens"],
138            system=config["system"],
139            messages=[{"role": "user", "content": full_input}]
140        )
141
142        output = response.content[0].text
143        tokens = response.usage.input_tokens + response.usage.output_tokens
144        self._tokens_used += tokens
145
146        # 生成精簡摘要(前 500 字元)
147        summary = output[:500] + "..." if len(output) > 500 else output
148
149        return TaskResult(
150            agent=role,
151            task_id=f"{role.value}_{len(self.task_results)}",
152            status="success",
153            output=output,
154            summary=summary,
155            tokens_used=tokens
156        )
157
158    def _build_context_for_agent(
159        self,
160        target_agent: AgentRole,
161        dependency_results: list[TaskResult]
162    ) -> str:
163        """
164        為目標 Agent 建構最小化 Context
165
166        核心:只傳遞摘要,不傳遞完整輸出
167        """
168        if not dependency_results:
169            return ""
170
171        context_parts = []
172        for result in dependency_results:
173            context_parts.append(
174                f"[{result.agent.value} 的分析結果]\n{result.summary}"
175            )
176
177        return "\n\n---\n\n".join(context_parts)
178
179    def execute(self, task: str, token_budget: int = 50000) -> dict:
180        """
181        執行完整的協作流程
182
183        Args:
184            task: 使用者任務
185            token_budget: Token 預算上限
186        """
187        self._token_budget = token_budget
188        self._tokens_used = 0
189        self.task_results = []
190
191        print(f"\n{'='*60}")
192        print("Hub-and-Spoke 協作模式")
193        print(f"{'='*60}")
194        print(f"任務: {task[:100]}...")
195        print(f"Token 預算: {token_budget:,}")
196
197        # Step 1: Orchestrator 分解任務
198        print("\n[Step 1] Orchestrator 分解任務...")
199        plan_result = self._call_agent(
200            AgentRole.ORCHESTRATOR,
201            f"請分解以下任務並制定執行計畫:\n\n{task}"
202        )
203        self.task_results.append(plan_result)
204        print(f"  ✓ 計畫完成,tokens: {plan_result.tokens_used}")
205
206        # Step 2: 依序執行各 Agent
207        # (簡化版:固定流程,實際可根據計畫動態調整)
208        pipeline = [
209            (AgentRole.ANALYST, "分析需求"),
210            (AgentRole.DEVELOPER, "實作程式碼"),
211            (AgentRole.REVIEWER, "審查程式碼"),
212            (AgentRole.DOC_WRITER, "撰寫文件")
213        ]
214
215        for i, (role, action) in enumerate(pipeline, start=2):
216            print(f"\n[Step {i}] {role.value}: {action}...")
217
218            # 檢查 Token 預算
219            if self._tokens_used >= self._token_budget * 0.9:
220                print(f"  ⚠️ 接近 Token 預算,跳過後續步驟")
221                break
222
223            # 建構 Context(只傳遞前一步的摘要)
224            context = self._build_context_for_agent(
225                role,
226                self.task_results[-2:]  # 只取最近 2 個結果
227            )
228
229            # 執行
230            result = self._call_agent(role, task, context)
231            self.task_results.append(result)
232            print(f"  ✓ 完成,tokens: {result.tokens_used}")
233
234        # Step 3: Orchestrator 整合結果
235        print("\n[Final] Orchestrator 整合結果...")
236        final_context = self._build_context_for_agent(
237            AgentRole.ORCHESTRATOR,
238            self.task_results
239        )
240
241        final_result = self._call_agent(
242            AgentRole.ORCHESTRATOR,
243            "請整合所有專家的輸出,生成最終報告",
244            final_context
245        )
246        self.task_results.append(final_result)
247
248        # 統計
249        print(f"\n{'='*60}")
250        print("執行統計")
251        print(f"{'='*60}")
252        print(f"總 Token 使用: {self._tokens_used:,} / {self._token_budget:,}")
253        print(f"Agent 呼叫次數: {len(self.task_results)}")
254
255        return {
256            "final_output": final_result.output,
257            "all_results": self.task_results,
258            "total_tokens": self._tokens_used,
259            "budget_usage": f"{self._tokens_used / self._token_budget * 100:.1f}%"
260        }

模式二:Pipeline(流水線)

架構圖:

┌────────┐    ┌────────┐    ┌────────┐    ┌────────┐    ┌────────┐
│ Agent A│───▶│ Agent B│───▶│ Agent C│───▶│ Agent D│───▶│ Agent E│
│  分析  │    │  設計  │    │  開發  │    │  審查  │    │  文件  │
└────────┘    └────────┘    └────────┘    └────────┘    └────────┘
     │             │             │             │             │
     ▼             ▼             ▼             ▼             ▼
  需求文件      設計文件      程式碼        審查報告      文件

特點:
- 線性執行,前一個完成後啟動下一個
- 無需複雜的協調邏輯
- 延遲較高(串行)
- 適合:有明確順序依賴的任務流程
  1from dataclasses import dataclass
  2from typing import Callable, Any
  3
  4@dataclass
  5class PipelineStage:
  6    """流水線階段"""
  7    role: AgentRole
  8    system_prompt: str
  9    model: str
 10    max_tokens: int
 11    input_transformer: Optional[Callable[[str, list], str]] = None
 12    output_validator: Optional[Callable[[str], bool]] = None
 13
 14
 15class PipelineOrchestrator:
 16    """
 17    Pipeline 協作模式
 18
 19    特點:
 20    1. 嚴格的線性執行順序
 21    2. 每個階段的輸出是下一階段的輸入
 22    3. 可選的輸入轉換和輸出驗證
 23    """
 24
 25    def __init__(self, stages: list[PipelineStage]):
 26        self.stages = stages
 27        self.execution_log: list[dict] = []
 28
 29    def _transform_input(
 30        self,
 31        stage: PipelineStage,
 32        original_task: str,
 33        previous_outputs: list[str]
 34    ) -> str:
 35        """轉換輸入"""
 36        if stage.input_transformer:
 37            return stage.input_transformer(original_task, previous_outputs)
 38
 39        # 預設:組合原始任務 + 上一階段輸出
 40        if previous_outputs:
 41            last_output = previous_outputs[-1]
 42            # 只取摘要(前 1000 字元)
 43            summary = last_output[:1000] + "..." if len(last_output) > 1000 else last_output
 44            return f"背景:\n{summary}\n\n任務:{original_task}"
 45
 46        return original_task
 47
 48    def _validate_output(
 49        self,
 50        stage: PipelineStage,
 51        output: str
 52    ) -> tuple[bool, str]:
 53        """驗證輸出"""
 54        if stage.output_validator:
 55            is_valid = stage.output_validator(output)
 56            return is_valid, "" if is_valid else "輸出驗證失敗"
 57
 58        # 預設驗證:非空
 59        if not output or len(output.strip()) < 10:
 60            return False, "輸出過短或為空"
 61
 62        return True, ""
 63
 64    def execute(
 65        self,
 66        task: str,
 67        stop_on_failure: bool = True
 68    ) -> dict:
 69        """
 70        執行流水線
 71
 72        Args:
 73            task: 原始任務
 74            stop_on_failure: 驗證失敗時是否停止
 75        """
 76        outputs: list[str] = []
 77        self.execution_log = []
 78        total_tokens = 0
 79
 80        print(f"\n{'='*60}")
 81        print("Pipeline 協作模式")
 82        print(f"{'='*60}")
 83        print(f"階段數: {len(self.stages)}")
 84
 85        for i, stage in enumerate(self.stages):
 86            print(f"\n[{i+1}/{len(self.stages)}] {stage.role.value}")
 87
 88            # 準備輸入
 89            stage_input = self._transform_input(stage, task, outputs)
 90
 91            # 呼叫 Agent
 92            response = client.messages.create(
 93                model=stage.model,
 94                max_tokens=stage.max_tokens,
 95                system=stage.system_prompt,
 96                messages=[{"role": "user", "content": stage_input}]
 97            )
 98
 99            output = response.content[0].text
100            tokens = response.usage.input_tokens + response.usage.output_tokens
101            total_tokens += tokens
102
103            # 驗證輸出
104            is_valid, error = self._validate_output(stage, output)
105
106            log_entry = {
107                "stage": i + 1,
108                "role": stage.role.value,
109                "tokens": tokens,
110                "valid": is_valid,
111                "error": error
112            }
113            self.execution_log.append(log_entry)
114
115            if is_valid:
116                outputs.append(output)
117                print(f"  ✓ 完成,tokens: {tokens}")
118            else:
119                print(f"  ✗ 驗證失敗: {error}")
120                if stop_on_failure:
121                    print("  ⚠️ 流水線中止")
122                    break
123                outputs.append("")  # 空輸出,繼續下一階段
124
125        return {
126            "final_output": outputs[-1] if outputs else "",
127            "all_outputs": outputs,
128            "total_tokens": total_tokens,
129            "stages_completed": len([l for l in self.execution_log if l["valid"]]),
130            "log": self.execution_log
131        }
132
133
134# 建立開發流水線
135def create_development_pipeline() -> PipelineOrchestrator:
136    """創建標準開發流水線"""
137    stages = [
138        PipelineStage(
139            role=AgentRole.ANALYST,
140            system_prompt="你是需求分析師。提取功能需求、非功能需求、限制條件。輸出 JSON。",
141            model="claude-sonnet-4-20250514",
142            max_tokens=2048
143        ),
144        PipelineStage(
145            role=AgentRole.DEVELOPER,
146            system_prompt="你是程式開發者。根據需求實作 Python 程式碼。輸出完整可執行程式碼。",
147            model="claude-sonnet-4-20250514",
148            max_tokens=4096,
149            output_validator=lambda x: "def " in x or "class " in x  # 簡單驗證
150        ),
151        PipelineStage(
152            role=AgentRole.REVIEWER,
153            system_prompt="你是程式碼審查員。審查程式碼的正確性、可讀性、安全性。",
154            model="claude-3-5-haiku-20241022",
155            max_tokens=1024
156        ),
157        PipelineStage(
158            role=AgentRole.DOC_WRITER,
159            system_prompt="你是文件撰寫者。為程式碼撰寫 README 文件。",
160            model="claude-3-5-haiku-20241022",
161            max_tokens=2048
162        ),
163    ]
164
165    return PipelineOrchestrator(stages)

模式三:Parallel with Merge(並行合併)

架構圖:

                    ┌──────────────────┐
                    │   Task Splitter  │
                    │   (任務拆分)      │
                    └────────┬─────────┘
                             │
         ┌───────────────────┼───────────────────┐
         │                   │                   │
         ▼                   ▼                   ▼
    ┌─────────┐        ┌─────────┐        ┌─────────┐
    │ Agent A │        │ Agent B │        │ Agent C │
    │ (並行)  │        │ (並行)  │        │ (並行)  │
    └────┬────┘        └────┬────┘        └────┬────┘
         │                   │                   │
         └───────────────────┼───────────────────┘
                             │
                    ┌────────▼─────────┐
                    │   Result Merger  │
                    │   (結果合併)      │
                    └──────────────────┘

特點:
- 獨立子任務並行執行
- 大幅縮短總延遲
- 需要設計好合併邏輯
- 適合:可並行的獨立審查、多角度分析
  1import asyncio
  2from anthropic import AsyncAnthropic
  3
  4async_client = AsyncAnthropic()
  5
  6
  7@dataclass
  8class ParallelTask:
  9    """並行任務定義"""
 10    id: str
 11    role: AgentRole
 12    task: str
 13    system_prompt: str
 14    model: str
 15    max_tokens: int
 16
 17
 18class ParallelMergeOrchestrator:
 19    """
 20    Parallel with Merge 協作模式
 21
 22    適用場景:
 23    1. 多角度分析(安全、效能、可讀性審查)
 24    2. 多專家意見收集
 25    3. 獨立子任務處理
 26    """
 27
 28    def __init__(self):
 29        self.results: dict[str, TaskResult] = {}
 30
 31    async def _execute_single(self, task: ParallelTask) -> TaskResult:
 32        """非同步執行單個任務"""
 33        try:
 34            response = await async_client.messages.create(
 35                model=task.model,
 36                max_tokens=task.max_tokens,
 37                system=task.system_prompt,
 38                messages=[{"role": "user", "content": task.task}]
 39            )
 40
 41            output = response.content[0].text
 42            tokens = response.usage.input_tokens + response.usage.output_tokens
 43
 44            return TaskResult(
 45                agent=task.role,
 46                task_id=task.id,
 47                status="success",
 48                output=output,
 49                summary=output[:500],
 50                tokens_used=tokens
 51            )
 52
 53        except Exception as e:
 54            return TaskResult(
 55                agent=task.role,
 56                task_id=task.id,
 57                status="failed",
 58                output="",
 59                summary="",
 60                tokens_used=0,
 61                metadata={"error": str(e)}
 62            )
 63
 64    async def execute_parallel(
 65        self,
 66        tasks: list[ParallelTask],
 67        timeout: float = 60.0
 68    ) -> list[TaskResult]:
 69        """並行執行所有任務"""
 70        print(f"\n並行執行 {len(tasks)} 個任務...")
 71
 72        # 創建所有協程
 73        coroutines = [self._execute_single(task) for task in tasks]
 74
 75        # 並行執行(帶超時)
 76        try:
 77            results = await asyncio.wait_for(
 78                asyncio.gather(*coroutines, return_exceptions=True),
 79                timeout=timeout
 80            )
 81        except asyncio.TimeoutError:
 82            print("⚠️ 部分任務超時")
 83            results = []
 84
 85        # 過濾結果
 86        valid_results = []
 87        for r in results:
 88            if isinstance(r, TaskResult):
 89                valid_results.append(r)
 90                self.results[r.task_id] = r
 91            elif isinstance(r, Exception):
 92                print(f"  ✗ 任務失敗: {r}")
 93
 94        return valid_results
 95
 96    def merge_results(
 97        self,
 98        results: list[TaskResult],
 99        merge_strategy: str = "concatenate"
100    ) -> str:
101        """
102        合併並行結果
103
104        策略:
105        - concatenate: 串連所有輸出
106        - summarize: 使用 LLM 總結
107        - structured: 結構化整合
108        """
109        if merge_strategy == "concatenate":
110            merged = []
111            for r in results:
112                merged.append(f"## {r.agent.value}\n\n{r.output}")
113            return "\n\n---\n\n".join(merged)
114
115        elif merge_strategy == "structured":
116            # 結構化整合
117            structured = {
118                "results": [],
119                "summary": "",
120                "consensus": []
121            }
122
123            for r in results:
124                structured["results"].append({
125                    "agent": r.agent.value,
126                    "key_points": r.summary
127                })
128
129            return str(structured)
130
131        else:
132            raise ValueError(f"未知的合併策略: {merge_strategy}")
133
134
135# 建立並行審查系統
136def create_parallel_review_system():
137    """創建並行程式碼審查系統"""
138
139    async def review_code(code: str) -> dict:
140        orchestrator = ParallelMergeOrchestrator()
141
142        # 定義並行審查任務
143        tasks = [
144            ParallelTask(
145                id="security_review",
146                role=AgentRole.REVIEWER,
147                task=f"從安全性角度審查以下程式碼:\n\n{code}",
148                system_prompt="你是安全專家。專注於:SQL 注入、XSS、認證漏洞、敏感資料洩露。",
149                model="claude-3-5-haiku-20241022",
150                max_tokens=1024
151            ),
152            ParallelTask(
153                id="performance_review",
154                role=AgentRole.REVIEWER,
155                task=f"從效能角度審查以下程式碼:\n\n{code}",
156                system_prompt="你是效能專家。專注於:時間複雜度、記憶體使用、N+1 查詢、快取策略。",
157                model="claude-3-5-haiku-20241022",
158                max_tokens=1024
159            ),
160            ParallelTask(
161                id="maintainability_review",
162                role=AgentRole.REVIEWER,
163                task=f"從可維護性角度審查以下程式碼:\n\n{code}",
164                system_prompt="你是架構師。專注於:程式碼結構、命名、單一職責、可讀性。",
165                model="claude-3-5-haiku-20241022",
166                max_tokens=1024
167            ),
168        ]
169
170        # 並行執行
171        results = await orchestrator.execute_parallel(tasks)
172
173        # 合併結果
174        merged = orchestrator.merge_results(results, "concatenate")
175
176        total_tokens = sum(r.tokens_used for r in results)
177
178        return {
179            "merged_review": merged,
180            "individual_reviews": {r.task_id: r.output for r in results},
181            "total_tokens": total_tokens,
182            "parallel_speedup": f"{len(tasks)}x (理論值)"
183        }
184
185    return review_code
186
187
188# 使用範例
189async def main():
190    review_code = create_parallel_review_system()
191
192    sample_code = '''
193def get_user(user_id):
194    query = f"SELECT * FROM users WHERE id = {user_id}"
195    result = db.execute(query)
196    return result
197'''
198
199    result = await review_code(sample_code)
200    print(result["merged_review"])
201
202# asyncio.run(main())

動態路由與任務分配

智慧路由器設計

  1from enum import Enum
  2from dataclasses import dataclass
  3from typing import Optional
  4import re
  5
  6
  7class TaskCategory(Enum):
  8    """任務類別"""
  9    CODE_GENERATION = "code_generation"
 10    CODE_REVIEW = "code_review"
 11    DATA_ANALYSIS = "data_analysis"
 12    DOCUMENTATION = "documentation"
 13    ARCHITECTURE = "architecture"
 14    TESTING = "testing"
 15    DEBUGGING = "debugging"
 16    UNKNOWN = "unknown"
 17
 18
 19@dataclass
 20class RoutingDecision:
 21    """路由決策"""
 22    category: TaskCategory
 23    primary_agent: AgentRole
 24    supporting_agents: list[AgentRole]
 25    confidence: float
 26    reasoning: str
 27
 28
 29class IntelligentRouter:
 30    """
 31    智慧任務路由器
 32
 33    功能:
 34    1. 分析任務類型
 35    2. 選擇最適合的 Agent
 36    3. 決定是否需要多 Agent 協作
 37    """
 38
 39    # 關鍵字到類別的映射
 40    KEYWORD_PATTERNS = {
 41        TaskCategory.CODE_GENERATION: [
 42            r"寫[一個]?程式", r"實作", r"開發", r"create", r"implement",
 43            r"build", r"code", r"function", r"class"
 44        ],
 45        TaskCategory.CODE_REVIEW: [
 46            r"審查", r"review", r"檢查", r"check", r"分析.*程式碼",
 47            r"有.*問題", r"bug", r"錯誤"
 48        ],
 49        TaskCategory.DATA_ANALYSIS: [
 50            r"分析.*資料", r"統計", r"趨勢", r"data", r"analysis",
 51            r"chart", r"圖表", r"視覺化"
 52        ],
 53        TaskCategory.DOCUMENTATION: [
 54            r"文件", r"document", r"說明", r"readme", r"api.*doc",
 55            r"寫.*文"
 56        ],
 57        TaskCategory.ARCHITECTURE: [
 58            r"架構", r"設計", r"architecture", r"design", r"系統",
 59            r"微服務", r"database.*設計"
 60        ],
 61        TaskCategory.TESTING: [
 62            r"測試", r"test", r"單元", r"整合", r"unit", r"integration"
 63        ],
 64        TaskCategory.DEBUGGING: [
 65            r"除錯", r"debug", r"修復", r"fix", r"錯誤", r"crash"
 66        ]
 67    }
 68
 69    # 類別到 Agent 的映射
 70    CATEGORY_AGENT_MAP = {
 71        TaskCategory.CODE_GENERATION: {
 72            "primary": AgentRole.DEVELOPER,
 73            "supporting": [AgentRole.ANALYST, AgentRole.REVIEWER]
 74        },
 75        TaskCategory.CODE_REVIEW: {
 76            "primary": AgentRole.REVIEWER,
 77            "supporting": []
 78        },
 79        TaskCategory.DATA_ANALYSIS: {
 80            "primary": AgentRole.ANALYST,
 81            "supporting": []
 82        },
 83        TaskCategory.DOCUMENTATION: {
 84            "primary": AgentRole.DOC_WRITER,
 85            "supporting": [AgentRole.ANALYST]
 86        },
 87        TaskCategory.ARCHITECTURE: {
 88            "primary": AgentRole.ANALYST,  # 架構師(若有的話)
 89            "supporting": [AgentRole.DEVELOPER]
 90        },
 91        TaskCategory.TESTING: {
 92            "primary": AgentRole.DEVELOPER,
 93            "supporting": [AgentRole.REVIEWER]
 94        },
 95        TaskCategory.DEBUGGING: {
 96            "primary": AgentRole.DEVELOPER,
 97            "supporting": [AgentRole.REVIEWER]
 98        }
 99    }
100
101    def __init__(self, use_llm_fallback: bool = True):
102        self.use_llm_fallback = use_llm_fallback
103        self._routing_history: list[RoutingDecision] = []
104
105    def _keyword_classify(self, task: str) -> tuple[TaskCategory, float]:
106        """基於關鍵字的快速分類"""
107        task_lower = task.lower()
108        scores = {}
109
110        for category, patterns in self.KEYWORD_PATTERNS.items():
111            score = 0
112            for pattern in patterns:
113                if re.search(pattern, task_lower):
114                    score += 1
115            scores[category] = score
116
117        if not scores or max(scores.values()) == 0:
118            return TaskCategory.UNKNOWN, 0.0
119
120        best_category = max(scores, key=scores.get)
121        # 信心度 = 匹配數 / 總模式數
122        confidence = scores[best_category] / len(self.KEYWORD_PATTERNS[best_category])
123
124        return best_category, min(confidence, 1.0)
125
126    def _llm_classify(self, task: str) -> tuple[TaskCategory, float]:
127        """使用 LLM 進行精確分類(較貴但更準確)"""
128        classification_prompt = f"""分類以下任務。只回答類別名稱。
129
130類別:
131- code_generation: 寫程式碼、實作功能
132- code_review: 審查現有程式碼
133- data_analysis: 分析資料、統計
134- documentation: 撰寫文件
135- architecture: 系統設計、架構規劃
136- testing: 撰寫或執行測試
137- debugging: 除錯、修復問題
138- unknown: 無法分類
139
140任務:{task}
141
142類別:"""
143
144        response = client.messages.create(
145            model="claude-3-5-haiku-20241022",  # 使用便宜的模型做分類
146            max_tokens=20,
147            messages=[{"role": "user", "content": classification_prompt}]
148        )
149
150        result = response.content[0].text.strip().lower()
151
152        # 解析結果
153        for category in TaskCategory:
154            if category.value in result:
155                return category, 0.9  # LLM 分類給予較高信心度
156
157        return TaskCategory.UNKNOWN, 0.5
158
159    def route(self, task: str) -> RoutingDecision:
160        """
161        路由任務到適當的 Agent
162
163        邏輯:
164        1. 先用關鍵字快速分類
165        2. 信心度低時,使用 LLM 分類
166        3. 根據類別選擇 Agent
167        """
168        # 快速分類
169        category, confidence = self._keyword_classify(task)
170
171        # 信心度低時使用 LLM
172        if confidence < 0.5 and self.use_llm_fallback:
173            llm_category, llm_confidence = self._llm_classify(task)
174            if llm_confidence > confidence:
175                category = llm_category
176                confidence = llm_confidence
177
178        # 選擇 Agent
179        agent_config = self.CATEGORY_AGENT_MAP.get(
180            category,
181            {"primary": AgentRole.ORCHESTRATOR, "supporting": []}
182        )
183
184        decision = RoutingDecision(
185            category=category,
186            primary_agent=agent_config["primary"],
187            supporting_agents=agent_config["supporting"],
188            confidence=confidence,
189            reasoning=f"根據任務內容,分類為 {category.value},信心度 {confidence:.2f}"
190        )
191
192        self._routing_history.append(decision)
193        return decision
194
195    def get_routing_stats(self) -> dict:
196        """取得路由統計"""
197        if not self._routing_history:
198            return {"total": 0}
199
200        category_counts = {}
201        for decision in self._routing_history:
202            cat = decision.category.value
203            category_counts[cat] = category_counts.get(cat, 0) + 1
204
205        return {
206            "total": len(self._routing_history),
207            "by_category": category_counts,
208            "avg_confidence": sum(d.confidence for d in self._routing_history) / len(self._routing_history)
209        }
210
211
212class AdaptiveOrchestrator:
213    """
214    自適應協調器
215
216    根據任務特性自動選擇協作模式:
217    - 簡單任務 → 單 Agent
218    - 中等任務 → Pipeline
219    - 複雜任務 → Hub-and-Spoke
220    - 可並行任務 → Parallel
221    """
222
223    def __init__(self):
224        self.router = IntelligentRouter()
225        self.hub_spoke = HubAndSpokeOrchestrator()
226        self.pipeline = create_development_pipeline()
227
228    def _estimate_complexity(self, task: str, routing: RoutingDecision) -> str:
229        """估算任務複雜度"""
230        # 簡單啟發式
231        word_count = len(task.split())
232
233        if routing.confidence > 0.8 and word_count < 30:
234            return "simple"
235        elif len(routing.supporting_agents) == 0:
236            return "simple"
237        elif len(routing.supporting_agents) >= 2:
238            return "complex"
239        else:
240            return "medium"
241
242    def execute(self, task: str) -> dict:
243        """自適應執行任務"""
244        # 路由決策
245        routing = self.router.route(task)
246        complexity = self._estimate_complexity(task, routing)
247
248        print(f"\n路由決策:")
249        print(f"  類別: {routing.category.value}")
250        print(f"  主要 Agent: {routing.primary_agent.value}")
251        print(f"  支援 Agent: {[a.value for a in routing.supporting_agents]}")
252        print(f"  複雜度: {complexity}")
253        print(f"  信心度: {routing.confidence:.2f}")
254
255        # 根據複雜度選擇模式
256        if complexity == "simple":
257            # 單 Agent 執行
258            print("\n使用模式:單 Agent")
259            result = self._execute_single_agent(task, routing.primary_agent)
260
261        elif complexity == "medium":
262            # Pipeline 執行
263            print("\n使用模式:Pipeline")
264            result = self.pipeline.execute(task)
265
266        else:
267            # Hub-and-Spoke 執行
268            print("\n使用模式:Hub-and-Spoke")
269            result = self.hub_spoke.execute(task)
270
271        return {
272            "routing": routing,
273            "mode": complexity,
274            "result": result
275        }
276
277    def _execute_single_agent(self, task: str, role: AgentRole) -> dict:
278        """單 Agent 執行"""
279        config = self.hub_spoke.agents[role]
280
281        response = client.messages.create(
282            model=config["model"],
283            max_tokens=config["max_tokens"],
284            system=config["system"],
285            messages=[{"role": "user", "content": task}]
286        )
287
288        return {
289            "output": response.content[0].text,
290            "tokens": response.usage.input_tokens + response.usage.output_tokens
291        }

狀態管理與錯誤處理

分散式狀態管理

  1from dataclasses import dataclass, field
  2from typing import Any, Optional
  3from enum import Enum
  4import json
  5import time
  6
  7
  8class TaskStatus(Enum):
  9    """任務狀態"""
 10    PENDING = "pending"
 11    IN_PROGRESS = "in_progress"
 12    COMPLETED = "completed"
 13    FAILED = "failed"
 14    BLOCKED = "blocked"
 15
 16
 17@dataclass
 18class AgentState:
 19    """Agent 狀態"""
 20    role: AgentRole
 21    current_task: Optional[str] = None
 22    status: TaskStatus = TaskStatus.PENDING
 23    last_output: Optional[str] = None
 24    error: Optional[str] = None
 25    retries: int = 0
 26    tokens_consumed: int = 0
 27
 28
 29@dataclass
 30class WorkflowState:
 31    """工作流程狀態"""
 32    workflow_id: str
 33    original_task: str
 34    agent_states: dict[str, AgentState] = field(default_factory=dict)
 35    shared_context: dict[str, Any] = field(default_factory=dict)
 36    execution_order: list[str] = field(default_factory=list)
 37    current_step: int = 0
 38    start_time: float = field(default_factory=time.time)
 39    end_time: Optional[float] = None
 40
 41    def to_dict(self) -> dict:
 42        """序列化為字典(用於持久化)"""
 43        return {
 44            "workflow_id": self.workflow_id,
 45            "original_task": self.original_task,
 46            "agent_states": {
 47                k: {
 48                    "role": v.role.value,
 49                    "status": v.status.value,
 50                    "retries": v.retries,
 51                    "tokens_consumed": v.tokens_consumed
 52                }
 53                for k, v in self.agent_states.items()
 54            },
 55            "current_step": self.current_step,
 56            "total_steps": len(self.execution_order),
 57            "elapsed_time": time.time() - self.start_time
 58        }
 59
 60
 61class StatefulOrchestrator:
 62    """
 63    帶狀態管理的協調器
 64
 65    功能:
 66    1. 追蹤每個 Agent 的狀態
 67    2. 支援斷點續傳
 68    3. 錯誤恢復和重試
 69    """
 70
 71    MAX_RETRIES = 3
 72    RETRY_DELAY = 1.0
 73
 74    def __init__(self):
 75        self.current_workflow: Optional[WorkflowState] = None
 76        self.agents = self._initialize_agents()
 77
 78    def _initialize_agents(self) -> dict:
 79        """初始化 Agent 配置"""
 80        return {
 81            AgentRole.ANALYST: {
 82                "model": "claude-sonnet-4-20250514",
 83                "system": "你是需求分析師。分析需求並輸出 JSON。",
 84                "max_tokens": 2048
 85            },
 86            AgentRole.DEVELOPER: {
 87                "model": "claude-sonnet-4-20250514",
 88                "system": "你是程式開發者。根據需求實作程式碼。",
 89                "max_tokens": 4096
 90            },
 91            AgentRole.REVIEWER: {
 92                "model": "claude-3-5-haiku-20241022",
 93                "system": "你是審查員。審查程式碼並列出問題。",
 94                "max_tokens": 1024
 95            }
 96        }
 97
 98    def _create_workflow(self, task: str) -> WorkflowState:
 99        """創建新的工作流程"""
100        workflow_id = f"wf_{int(time.time())}"
101
102        workflow = WorkflowState(
103            workflow_id=workflow_id,
104            original_task=task,
105            execution_order=[
106                AgentRole.ANALYST.value,
107                AgentRole.DEVELOPER.value,
108                AgentRole.REVIEWER.value
109            ]
110        )
111
112        # 初始化各 Agent 狀態
113        for role_name in workflow.execution_order:
114            role = AgentRole(role_name)
115            workflow.agent_states[role_name] = AgentState(role=role)
116
117        return workflow
118
119    def _execute_agent_with_retry(
120        self,
121        role: AgentRole,
122        task: str,
123        context: str = ""
124    ) -> tuple[str, int]:
125        """帶重試的 Agent 執行"""
126        config = self.agents[role]
127        state = self.current_workflow.agent_states[role.value]
128
129        full_input = f"{context}\n\n任務:{task}" if context else task
130
131        for attempt in range(self.MAX_RETRIES):
132            try:
133                state.status = TaskStatus.IN_PROGRESS
134                state.current_task = task
135
136                response = client.messages.create(
137                    model=config["model"],
138                    max_tokens=config["max_tokens"],
139                    system=config["system"],
140                    messages=[{"role": "user", "content": full_input}]
141                )
142
143                output = response.content[0].text
144                tokens = response.usage.input_tokens + response.usage.output_tokens
145
146                state.status = TaskStatus.COMPLETED
147                state.last_output = output
148                state.tokens_consumed += tokens
149                state.error = None
150
151                return output, tokens
152
153            except Exception as e:
154                state.retries += 1
155                state.error = str(e)
156
157                if attempt < self.MAX_RETRIES - 1:
158                    print(f"  ⚠️ 重試 {attempt + 1}/{self.MAX_RETRIES}: {e}")
159                    time.sleep(self.RETRY_DELAY * (attempt + 1))
160                else:
161                    state.status = TaskStatus.FAILED
162                    raise
163
164    def execute(self, task: str, resume_from: Optional[str] = None) -> dict:
165        """
166        執行工作流程
167
168        Args:
169            task: 任務描述
170            resume_from: 可選,從指定工作流程 ID 續傳
171        """
172        # 創建或恢復工作流程
173        if resume_from:
174            # TODO: 從持久化存儲載入
175            print(f"續傳模式:{resume_from}")
176        else:
177            self.current_workflow = self._create_workflow(task)
178
179        print(f"\n{'='*60}")
180        print(f"工作流程: {self.current_workflow.workflow_id}")
181        print(f"{'='*60}")
182
183        results = []
184        total_tokens = 0
185
186        # 執行各步驟
187        for i, role_name in enumerate(self.current_workflow.execution_order):
188            if i < self.current_workflow.current_step:
189                print(f"[{i+1}] {role_name}: 跳過(已完成)")
190                continue
191
192            role = AgentRole(role_name)
193            state = self.current_workflow.agent_states[role_name]
194
195            print(f"\n[{i+1}] {role_name}")
196
197            try:
198                # 構建 Context(前一步的摘要)
199                context = ""
200                if results:
201                    last_result = results[-1]
202                    context = f"[上一步輸出摘要]\n{last_result[:1000]}..."
203
204                # 執行
205                output, tokens = self._execute_agent_with_retry(role, task, context)
206                results.append(output)
207                total_tokens += tokens
208
209                print(f"  ✓ 完成,tokens: {tokens}")
210
211                # 更新進度
212                self.current_workflow.current_step = i + 1
213
214                # 保存 checkpoint(可選:持久化)
215                self._save_checkpoint()
216
217            except Exception as e:
218                print(f"  ✗ 失敗: {e}")
219                return {
220                    "status": "failed",
221                    "failed_at": role_name,
222                    "error": str(e),
223                    "partial_results": results,
224                    "workflow_state": self.current_workflow.to_dict()
225                }
226
227        # 完成
228        self.current_workflow.end_time = time.time()
229
230        return {
231            "status": "completed",
232            "workflow_id": self.current_workflow.workflow_id,
233            "final_output": results[-1] if results else "",
234            "all_outputs": results,
235            "total_tokens": total_tokens,
236            "elapsed_time": self.current_workflow.end_time - self.current_workflow.start_time,
237            "workflow_state": self.current_workflow.to_dict()
238        }
239
240    def _save_checkpoint(self):
241        """保存檢查點(用於斷點續傳)"""
242        # 實際應用中,這裡會寫入資料庫或檔案
243        state = self.current_workflow.to_dict()
244        # print(f"  📍 Checkpoint: step {state['current_step']}/{state['total_steps']}")
245
246
247class ErrorRecoveryOrchestrator(StatefulOrchestrator):
248    """
249    帶錯誤恢復的協調器
250
251    策略:
252    1. 重試(帶退避)
253    2. 降級(使用備用 Agent)
254    3. 跳過(標記為可選步驟)
255    4. 中止(關鍵步驟失敗)
256    """
257
258    def __init__(self):
259        super().__init__()
260
261        # 定義步驟的關鍵性
262        self.step_criticality = {
263            AgentRole.ANALYST.value: "critical",    # 失敗則中止
264            AgentRole.DEVELOPER.value: "critical",
265            AgentRole.REVIEWER.value: "optional",   # 失敗可跳過
266        }
267
268        # 備用 Agent 映射
269        self.fallback_agents = {
270            AgentRole.DEVELOPER: AgentRole.ANALYST,  # 開發者失敗時,嘗試分析師
271        }
272
273    def _handle_failure(
274        self,
275        role: AgentRole,
276        error: Exception
277    ) -> tuple[str, str]:
278        """
279        處理失敗
280
281        Returns:
282            (action, reason): 行動和原因
283        """
284        criticality = self.step_criticality.get(role.value, "critical")
285
286        if criticality == "optional":
287            return "skip", f"{role.value} 為可選步驟,跳過"
288
289        if role in self.fallback_agents:
290            fallback = self.fallback_agents[role]
291            return "fallback", f"嘗試使用備用 Agent: {fallback.value}"
292
293        return "abort", f"關鍵步驟 {role.value} 失敗,中止流程"

生產級最佳實踐

Token 預算管理

 1@dataclass
 2class TokenBudget:
 3    """Token 預算配置"""
 4    total_budget: int
 5    per_agent_limits: dict[str, int]
 6    reserve_ratio: float = 0.1  # 保留 10% 給協調開銷
 7
 8    def get_available(self, role: AgentRole) -> int:
 9        """取得可用 Token"""
10        limit = self.per_agent_limits.get(role.value, 2000)
11        return limit
12
13    def can_afford(self, estimated_tokens: int, used: int) -> bool:
14        """檢查是否能負擔"""
15        available = self.total_budget * (1 - self.reserve_ratio) - used
16        return estimated_tokens <= available
17
18
19class BudgetAwareOrchestrator:
20    """Token 預算感知的協調器"""
21
22    def __init__(self, budget: TokenBudget):
23        self.budget = budget
24        self.tokens_used = 0
25
26    def _estimate_tokens(self, task: str, role: AgentRole) -> int:
27        """估算任務所需 Token"""
28        # 簡單估算:System Prompt + 輸入 + 預期輸出
29        base_estimate = {
30            AgentRole.ANALYST: 3000,
31            AgentRole.DEVELOPER: 6000,
32            AgentRole.REVIEWER: 2000,
33            AgentRole.DOC_WRITER: 3000,
34            AgentRole.ORCHESTRATOR: 2500
35        }
36        return base_estimate.get(role, 3000)
37
38    def execute_within_budget(self, task: str) -> dict:
39        """在預算內執行"""
40        steps = [
41            AgentRole.ANALYST,
42            AgentRole.DEVELOPER,
43            AgentRole.REVIEWER
44        ]
45
46        results = []
47        for role in steps:
48            estimated = self._estimate_tokens(task, role)
49
50            if not self.budget.can_afford(estimated, self.tokens_used):
51                print(f"⚠️ 預算不足,跳過 {role.value}")
52                continue
53
54            # 執行...
55            # result, tokens = self._execute(role, task)
56            # self.tokens_used += tokens
57            # results.append(result)
58
59        return {
60            "results": results,
61            "tokens_used": self.tokens_used,
62            "budget_remaining": self.budget.total_budget - self.tokens_used
63        }

監控與可觀測性

 1import time
 2from dataclasses import dataclass, field
 3
 4
 5@dataclass
 6class AgentMetrics:
 7    """Agent 指標"""
 8    role: str
 9    invocations: int = 0
10    total_tokens: int = 0
11    total_latency_ms: float = 0
12    errors: int = 0
13    retries: int = 0
14
15    @property
16    def avg_tokens(self) -> float:
17        return self.total_tokens / self.invocations if self.invocations > 0 else 0
18
19    @property
20    def avg_latency_ms(self) -> float:
21        return self.total_latency_ms / self.invocations if self.invocations > 0 else 0
22
23    @property
24    def error_rate(self) -> float:
25        return self.errors / self.invocations if self.invocations > 0 else 0
26
27
28class MetricsCollector:
29    """指標收集器"""
30
31    def __init__(self):
32        self.agent_metrics: dict[str, AgentMetrics] = {}
33        self.workflow_metrics: list[dict] = []
34
35    def record_agent_call(
36        self,
37        role: AgentRole,
38        tokens: int,
39        latency_ms: float,
40        success: bool
41    ):
42        """記錄 Agent 呼叫"""
43        role_key = role.value
44
45        if role_key not in self.agent_metrics:
46            self.agent_metrics[role_key] = AgentMetrics(role=role_key)
47
48        metrics = self.agent_metrics[role_key]
49        metrics.invocations += 1
50        metrics.total_tokens += tokens
51        metrics.total_latency_ms += latency_ms
52
53        if not success:
54            metrics.errors += 1
55
56    def get_report(self) -> str:
57        """生成報告"""
58        lines = [
59            "="*60,
60            "Agent 協作系統監控報告",
61            "="*60,
62            ""
63        ]
64
65        for role, m in self.agent_metrics.items():
66            lines.extend([
67                f"【{role}】",
68                f"  呼叫次數: {m.invocations}",
69                f"  總 Tokens: {m.total_tokens:,}",
70                f"  平均 Tokens: {m.avg_tokens:.0f}",
71                f"  平均延遲: {m.avg_latency_ms:.0f}ms",
72                f"  錯誤率: {m.error_rate*100:.1f}%",
73                ""
74            ])
75
76        # 總計
77        total_tokens = sum(m.total_tokens for m in self.agent_metrics.values())
78        total_calls = sum(m.invocations for m in self.agent_metrics.values())
79
80        lines.extend([
81            "-"*60,
82            f"總呼叫: {total_calls}",
83            f"總 Tokens: {total_tokens:,}",
84            "="*60
85        ])
86
87        return "\n".join(lines)

完整範例:生產級開發團隊

  1"""
  2生產級專責化 Agent 開發團隊
  3
  4整合:
  51. 智慧路由
  62. Hub-and-Spoke 協調
  73. 狀態管理
  84. 錯誤恢復
  95. Token 預算
 106. 監控
 11"""
 12
 13class ProductionAgentTeam:
 14    """生產級 Agent 團隊"""
 15
 16    def __init__(
 17        self,
 18        token_budget: int = 50000,
 19        enable_metrics: bool = True
 20    ):
 21        # 核心組件
 22        self.router = IntelligentRouter()
 23        self.orchestrator = StatefulOrchestrator()
 24
 25        # 預算
 26        self.budget = TokenBudget(
 27            total_budget=token_budget,
 28            per_agent_limits={
 29                "analyst": 3000,
 30                "developer": 8000,
 31                "reviewer": 2000,
 32                "doc_writer": 3000,
 33                "orchestrator": 2500
 34            }
 35        )
 36
 37        # 監控
 38        self.metrics = MetricsCollector() if enable_metrics else None
 39
 40        self.tokens_used = 0
 41
 42    def process_request(self, request: str) -> dict:
 43        """處理使用者請求"""
 44        start_time = time.time()
 45
 46        print(f"\n{'='*60}")
 47        print("生產級 Agent 團隊")
 48        print(f"{'='*60}")
 49        print(f"請求: {request[:100]}...")
 50        print(f"Token 預算: {self.budget.total_budget:,}")
 51
 52        # 1. 路由決策
 53        routing = self.router.route(request)
 54        print(f"\n路由決策: {routing.primary_agent.value} (信心度: {routing.confidence:.2f})")
 55
 56        # 2. 執行工作流程
 57        result = self.orchestrator.execute(request)
 58
 59        # 3. 更新統計
 60        elapsed = (time.time() - start_time) * 1000
 61
 62        if self.metrics:
 63            # 從工作流程狀態提取各 Agent 的指標
 64            for role_name, state in self.orchestrator.current_workflow.agent_states.items():
 65                self.metrics.record_agent_call(
 66                    role=AgentRole(role_name),
 67                    tokens=state.tokens_consumed,
 68                    latency_ms=elapsed / len(self.orchestrator.current_workflow.agent_states),
 69                    success=state.status == TaskStatus.COMPLETED
 70                )
 71
 72        # 4. 返回結果
 73        return {
 74            "status": result["status"],
 75            "output": result.get("final_output", ""),
 76            "total_tokens": result.get("total_tokens", 0),
 77            "elapsed_ms": elapsed,
 78            "routing": {
 79                "category": routing.category.value,
 80                "primary_agent": routing.primary_agent.value,
 81                "confidence": routing.confidence
 82            },
 83            "workflow_id": result.get("workflow_id")
 84        }
 85
 86    def get_metrics_report(self) -> str:
 87        """取得監控報告"""
 88        if self.metrics:
 89            return self.metrics.get_report()
 90        return "監控未啟用"
 91
 92
 93# 使用範例
 94if __name__ == "__main__":
 95    team = ProductionAgentTeam(token_budget=30000)
 96
 97    # 處理請求
 98    result = team.process_request("""
 99    開發一個簡單的 REST API:
100    - 使用者註冊和登入
101    - JWT 認證
102    - 基本的 CRUD 操作
103    """)
104
105    print(f"\n結果:")
106    print(f"  狀態: {result['status']}")
107    print(f"  Tokens: {result['total_tokens']:,}")
108    print(f"  延遲: {result['elapsed_ms']:.0f}ms")
109
110    print(f"\n{team.get_metrics_report()}")

優化效果總結

專責化 + 協作優化的綜合效果:

┌─────────────────────────────────────────────────────────────────────┐
│                    優化前 vs 優化後對比                              │
├────────────────────┬───────────────┬───────────────┬────────────────┤
│ 指標               │ 優化前        │ 優化後        │ 改善           │
│                    │ (通用 Agent)  │ (專責+協作)   │                │
├────────────────────┼───────────────┼───────────────┼────────────────┤
│ System Prompt      │ 16,000 tok    │ 2,500 tok     │ -84%           │
│ 工具定義           │ 5,000 tok     │ 800 tok       │ -84%           │
│ 每任務固定成本     │ 21,000 tok    │ 3,300 tok     │ -84%           │
│ 協調開銷           │ 0             │ +1,500 tok    │ +1,500         │
│ 淨節省             │ -             │ -             │ -80%           │
├────────────────────┼───────────────┼───────────────┼────────────────┤
│ 5 任務總成本       │ 105,000 tok   │ 23,000 tok    │ -78%           │
│ 10 任務總成本      │ 210,000 tok   │ 43,000 tok    │ -80%           │
└────────────────────┴───────────────┴───────────────┴────────────────┘

額外效益:
✅ 模型差異化選擇:簡單任務用 Haiku,再省 50%+
✅ 並行執行:延遲降低 50-70%
✅ 錯誤隔離:單一 Agent 失敗不影響整體
✅ 可觀測性:精確追蹤各環節消耗

最佳實踐清單

專責化 Agent 協作 Checklist:

架構選擇
□ 根據任務特性選擇協作模式(Hub-Spoke / Pipeline / Parallel)
□ 確認協調開銷小於專責化節省
□ 設計清晰的 Agent 邊界和介面

路由設計
□ 實作關鍵字快速分類
□ 可選:LLM 精確分類(作為後備)
□ 持續優化分類準確度

狀態管理
□ 追蹤每個 Agent 的執行狀態
□ 支援斷點續傳
□ 定義步驟關鍵性(critical / optional)

錯誤處理
□ 實作重試機制(帶指數退避)
□ 定義降級策略
□ 適當的錯誤傳播和中止條件

資源管理
□ 設定 Token 預算上限
□ 各 Agent 獨立預算控制
□ 預留協調開銷空間

監控
□ 追蹤各 Agent 的 Token 消耗
□ 追蹤延遲和錯誤率
□ 定期審查和優化

總結

專責化 Agent 的協作設計是發揮其最大價值的關鍵。本文介紹的協作模式涵蓋:

模式適用場景Token 效率延遲
Hub-and-Spoke一般任務
Pipeline順序依賴
Parallel獨立子任務
Adaptive混合場景最高可變

核心原則:

  1. 最小化協調開銷:確保協調成本小於專責化節省
  2. 精準的 Context 傳遞:只傳遞必要的摘要,不傳完整輸出
  3. 智慧路由:根據任務特性自動選擇最佳模式
  4. 穩健的狀態管理:支援斷點續傳和錯誤恢復
  5. 持續監控優化:數據驅動的效能調校

透過合理的協作設計,專責化 Agent 團隊可以在保持 80%+ Token 節省的同時,提供更可靠、更快速的服務。

Yen

Yen

Yen