在前一篇《Agent 專責化實戰指南》中,我們學會了如何設計單一職責的精簡 Agent。然而,擁有一群優秀的專家還不夠——關鍵在於如何讓他們高效協作。本文將深入探討專責化 Agent 的協作模式,從團隊架構設計到生產級的協調機制。
核心挑戰:從個體到團隊
專責化帶來的協調複雜度
專責化前(單體 Agent):
┌───────────────────────────────────────────────────────┐
│ 全能 Agent │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 分析 → 設計 → 開發 → 審查 → 文件 │ │
│ │ (全部在同一個 Context 內完成) │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ 優點:無協調開銷 │
│ 缺點:System Prompt 巨大,Token 浪費 │
└───────────────────────────────────────────────────────┘
專責化後(Agent 團隊):
┌───────────────────────────────────────────────────────┐
│ │
│ ┌──────────────┐ │
│ │ Orchestrator │ ←── 協調開銷 │
│ └──────┬───────┘ │
│ │ │
│ ┌────┴────┬────────┬────────┬────────┐ │
│ ▼ ▼ ▼ ▼ ▼ │
│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
│ │分析│ │設計│ │開發│ │審查│ │文件│ │
│ └────┘ └────┘ └────┘ └────┘ └────┘ │
│ │
│ 優點:各 Agent System Prompt 精簡 │
│ 缺點:需要協調機制,可能增加總呼叫次數 │
└───────────────────────────────────────────────────────┘
關鍵問題:協調開銷是否小於 System Prompt 節省?
答案:設計得當時,節省 >> 開銷
協調的 Token 成本模型
1def calculate_coordination_overhead(
2 num_agents: int,
3 orchestrator_tokens: int,
4 context_passing_tokens: int,
5 avg_handoff_tokens: int
6) -> int:
7 """
8 計算協調的額外 Token 開銷
9
10 組成:
11 1. Orchestrator 呼叫(任務分解 + 結果整合)
12 2. Agent 間 Context 傳遞
13 3. Handoff 訊息
14 """
15 orchestrator_overhead = orchestrator_tokens * 2 # 開始 + 結束
16 context_overhead = context_passing_tokens * (num_agents - 1)
17 handoff_overhead = avg_handoff_tokens * num_agents
18
19 return orchestrator_overhead + context_overhead + handoff_overhead
20
21
22def calculate_system_prompt_savings(
23 general_agent_prompt: int,
24 specialized_prompts: list[int],
25 num_calls: int
26) -> int:
27 """
28 計算 System Prompt 節省
29
30 假設:每個任務只呼叫相關的專責 Agent
31 """
32 # 通用 Agent:每次呼叫都發送完整 prompt
33 general_cost = general_agent_prompt * num_calls
34
35 # 專責 Agent:只發送該 Agent 的精簡 prompt
36 specialized_cost = sum(specialized_prompts) # 假設每個 Agent 呼叫一次
37
38 return general_cost - specialized_cost
39
40
41# 範例計算
42general_prompt = 16000 # 通用 Agent 的 System Prompt
43specialized_prompts = [2500, 3000, 2800, 2200, 1800] # 各專責 Agent
44num_agents = len(specialized_prompts)
45
46# 假設完成一個完整任務需要呼叫 5 個功能
47# 通用 Agent:呼叫 5 次,每次 16000 tokens
48# 專責 Agent:每個專責 Agent 呼叫 1 次
49
50savings = calculate_system_prompt_savings(
51 general_prompt,
52 specialized_prompts,
53 num_calls=5
54)
55
56overhead = calculate_coordination_overhead(
57 num_agents=5,
58 orchestrator_tokens=2000,
59 context_passing_tokens=500,
60 avg_handoff_tokens=300
61)
62
63print(f"System Prompt 節省: {savings:,} tokens")
64print(f"協調開銷: {overhead:,} tokens")
65print(f"淨節省: {savings - overhead:,} tokens")
66print(f"節省比例: {(savings - overhead) / (general_prompt * 5) * 100:.1f}%")
67
68# 輸出:
69# System Prompt 節省: 67,700 tokens
70# 協調開銷: 7,500 tokens
71# 淨節省: 60,200 tokens
72# 節省比例: 75.3%
協作架構模式
模式一:Hub-and-Spoke(中心輻射)
架構圖:
┌──────────────────┐
│ Orchestrator │
│ (Hub 中心) │
└────────┬─────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Agent A │ │ Agent B │ │ Agent C │
│ (Spoke) │ │ (Spoke) │ │ (Spoke) │
└─────────┘ └─────────┘ └─────────┘
特點:
- 所有通訊經過 Orchestrator
- 簡單、可控
- Orchestrator 可能成為瓶頸
- 適合:任務明確、Agent 數量適中的場景
1import anthropic
2from dataclasses import dataclass, field
3from typing import Optional, Callable
4from enum import Enum
5
6client = anthropic.Anthropic()
7
8
9class AgentRole(Enum):
10 ORCHESTRATOR = "orchestrator"
11 ANALYST = "analyst"
12 DEVELOPER = "developer"
13 REVIEWER = "reviewer"
14 DOC_WRITER = "doc_writer"
15
16
17@dataclass
18class TaskResult:
19 """標準化的任務結果"""
20 agent: AgentRole
21 task_id: str
22 status: str # "success", "partial", "failed"
23 output: str
24 summary: str # 精簡摘要(用於傳遞給下游)
25 tokens_used: int
26 metadata: dict = field(default_factory=dict)
27
28
29class HubAndSpokeOrchestrator:
30 """
31 Hub-and-Spoke 協作模式
32
33 所有 Agent 通訊經過中心 Orchestrator:
34 1. Orchestrator 接收任務,分解為子任務
35 2. 依序(或並行)派發給各 Spoke Agent
36 3. 收集結果,決定下一步或整合最終輸出
37 """
38
39 def __init__(self):
40 self.agents = self._initialize_agents()
41 self.task_results: list[TaskResult] = []
42 self._token_budget = 0
43 self._tokens_used = 0
44
45 def _initialize_agents(self) -> dict:
46 """初始化各專責 Agent 的配置"""
47 return {
48 AgentRole.ORCHESTRATOR: {
49 "model": "claude-sonnet-4-20250514",
50 "system": """你是任務協調者。
51
52職責:
531. 分解複雜任務為子任務
542. 決定執行順序和依賴關係
553. 整合各 Agent 的輸出
56
57輸出格式(JSON):
58{
59 "subtasks": [
60 {"id": "1", "agent": "analyst", "task": "...", "depends_on": []},
61 {"id": "2", "agent": "developer", "task": "...", "depends_on": ["1"]}
62 ],
63 "execution_plan": "sequential|parallel|mixed"
64}""",
65 "max_tokens": 2048
66 },
67
68 AgentRole.ANALYST: {
69 "model": "claude-sonnet-4-20250514",
70 "system": """你是需求分析師。
71
72職責:分析需求,提取關鍵資訊。
73
74輸出格式:
75- 功能需求(列表)
76- 非功能需求(列表)
77- 關鍵限制(列表)
78- 建議(列表)""",
79 "max_tokens": 2048
80 },
81
82 AgentRole.DEVELOPER: {
83 "model": "claude-sonnet-4-20250514",
84 "system": """你是程式開發者。
85
86職責:根據需求實作程式碼。
87
88輸出格式:
891. 完整可執行的程式碼
902. 簡短說明(3-5 行)
913. 使用方式""",
92 "max_tokens": 4096
93 },
94
95 AgentRole.REVIEWER: {
96 "model": "claude-3-5-haiku-20241022", # 輕量任務用 Haiku
97 "system": """你是程式碼審查員。
98
99職責:審查程式碼品質。
100
101輸出格式(按嚴重度排序):
102🔴 嚴重問題:[...]
103🟡 建議改進:[...]
104🟢 良好實踐:[...]""",
105 "max_tokens": 1024
106 },
107
108 AgentRole.DOC_WRITER: {
109 "model": "claude-3-5-haiku-20241022",
110 "system": """你是技術文件撰寫者。
111
112職責:撰寫清晰的技術文件。
113
114輸出格式(Markdown):
115## 概述
116## 安裝
117## 使用方式
118## API 參考""",
119 "max_tokens": 2048
120 }
121 }
122
123 def _call_agent(
124 self,
125 role: AgentRole,
126 task: str,
127 context: str = ""
128 ) -> TaskResult:
129 """呼叫單個 Agent"""
130 config = self.agents[role]
131
132 # 組合輸入(Context + Task)
133 full_input = f"{context}\n\n任務:{task}" if context else task
134
135 response = client.messages.create(
136 model=config["model"],
137 max_tokens=config["max_tokens"],
138 system=config["system"],
139 messages=[{"role": "user", "content": full_input}]
140 )
141
142 output = response.content[0].text
143 tokens = response.usage.input_tokens + response.usage.output_tokens
144 self._tokens_used += tokens
145
146 # 生成精簡摘要(前 500 字元)
147 summary = output[:500] + "..." if len(output) > 500 else output
148
149 return TaskResult(
150 agent=role,
151 task_id=f"{role.value}_{len(self.task_results)}",
152 status="success",
153 output=output,
154 summary=summary,
155 tokens_used=tokens
156 )
157
158 def _build_context_for_agent(
159 self,
160 target_agent: AgentRole,
161 dependency_results: list[TaskResult]
162 ) -> str:
163 """
164 為目標 Agent 建構最小化 Context
165
166 核心:只傳遞摘要,不傳遞完整輸出
167 """
168 if not dependency_results:
169 return ""
170
171 context_parts = []
172 for result in dependency_results:
173 context_parts.append(
174 f"[{result.agent.value} 的分析結果]\n{result.summary}"
175 )
176
177 return "\n\n---\n\n".join(context_parts)
178
179 def execute(self, task: str, token_budget: int = 50000) -> dict:
180 """
181 執行完整的協作流程
182
183 Args:
184 task: 使用者任務
185 token_budget: Token 預算上限
186 """
187 self._token_budget = token_budget
188 self._tokens_used = 0
189 self.task_results = []
190
191 print(f"\n{'='*60}")
192 print("Hub-and-Spoke 協作模式")
193 print(f"{'='*60}")
194 print(f"任務: {task[:100]}...")
195 print(f"Token 預算: {token_budget:,}")
196
197 # Step 1: Orchestrator 分解任務
198 print("\n[Step 1] Orchestrator 分解任務...")
199 plan_result = self._call_agent(
200 AgentRole.ORCHESTRATOR,
201 f"請分解以下任務並制定執行計畫:\n\n{task}"
202 )
203 self.task_results.append(plan_result)
204 print(f" ✓ 計畫完成,tokens: {plan_result.tokens_used}")
205
206 # Step 2: 依序執行各 Agent
207 # (簡化版:固定流程,實際可根據計畫動態調整)
208 pipeline = [
209 (AgentRole.ANALYST, "分析需求"),
210 (AgentRole.DEVELOPER, "實作程式碼"),
211 (AgentRole.REVIEWER, "審查程式碼"),
212 (AgentRole.DOC_WRITER, "撰寫文件")
213 ]
214
215 for i, (role, action) in enumerate(pipeline, start=2):
216 print(f"\n[Step {i}] {role.value}: {action}...")
217
218 # 檢查 Token 預算
219 if self._tokens_used >= self._token_budget * 0.9:
220 print(f" ⚠️ 接近 Token 預算,跳過後續步驟")
221 break
222
223 # 建構 Context(只傳遞前一步的摘要)
224 context = self._build_context_for_agent(
225 role,
226 self.task_results[-2:] # 只取最近 2 個結果
227 )
228
229 # 執行
230 result = self._call_agent(role, task, context)
231 self.task_results.append(result)
232 print(f" ✓ 完成,tokens: {result.tokens_used}")
233
234 # Step 3: Orchestrator 整合結果
235 print("\n[Final] Orchestrator 整合結果...")
236 final_context = self._build_context_for_agent(
237 AgentRole.ORCHESTRATOR,
238 self.task_results
239 )
240
241 final_result = self._call_agent(
242 AgentRole.ORCHESTRATOR,
243 "請整合所有專家的輸出,生成最終報告",
244 final_context
245 )
246 self.task_results.append(final_result)
247
248 # 統計
249 print(f"\n{'='*60}")
250 print("執行統計")
251 print(f"{'='*60}")
252 print(f"總 Token 使用: {self._tokens_used:,} / {self._token_budget:,}")
253 print(f"Agent 呼叫次數: {len(self.task_results)}")
254
255 return {
256 "final_output": final_result.output,
257 "all_results": self.task_results,
258 "total_tokens": self._tokens_used,
259 "budget_usage": f"{self._tokens_used / self._token_budget * 100:.1f}%"
260 }
模式二:Pipeline(流水線)
架構圖:
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Agent A│───▶│ Agent B│───▶│ Agent C│───▶│ Agent D│───▶│ Agent E│
│ 分析 │ │ 設計 │ │ 開發 │ │ 審查 │ │ 文件 │
└────────┘ └────────┘ └────────┘ └────────┘ └────────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
需求文件 設計文件 程式碼 審查報告 文件
特點:
- 線性執行,前一個完成後啟動下一個
- 無需複雜的協調邏輯
- 延遲較高(串行)
- 適合:有明確順序依賴的任務流程
1from dataclasses import dataclass
2from typing import Callable, Any
3
4@dataclass
5class PipelineStage:
6 """流水線階段"""
7 role: AgentRole
8 system_prompt: str
9 model: str
10 max_tokens: int
11 input_transformer: Optional[Callable[[str, list], str]] = None
12 output_validator: Optional[Callable[[str], bool]] = None
13
14
15class PipelineOrchestrator:
16 """
17 Pipeline 協作模式
18
19 特點:
20 1. 嚴格的線性執行順序
21 2. 每個階段的輸出是下一階段的輸入
22 3. 可選的輸入轉換和輸出驗證
23 """
24
25 def __init__(self, stages: list[PipelineStage]):
26 self.stages = stages
27 self.execution_log: list[dict] = []
28
29 def _transform_input(
30 self,
31 stage: PipelineStage,
32 original_task: str,
33 previous_outputs: list[str]
34 ) -> str:
35 """轉換輸入"""
36 if stage.input_transformer:
37 return stage.input_transformer(original_task, previous_outputs)
38
39 # 預設:組合原始任務 + 上一階段輸出
40 if previous_outputs:
41 last_output = previous_outputs[-1]
42 # 只取摘要(前 1000 字元)
43 summary = last_output[:1000] + "..." if len(last_output) > 1000 else last_output
44 return f"背景:\n{summary}\n\n任務:{original_task}"
45
46 return original_task
47
48 def _validate_output(
49 self,
50 stage: PipelineStage,
51 output: str
52 ) -> tuple[bool, str]:
53 """驗證輸出"""
54 if stage.output_validator:
55 is_valid = stage.output_validator(output)
56 return is_valid, "" if is_valid else "輸出驗證失敗"
57
58 # 預設驗證:非空
59 if not output or len(output.strip()) < 10:
60 return False, "輸出過短或為空"
61
62 return True, ""
63
64 def execute(
65 self,
66 task: str,
67 stop_on_failure: bool = True
68 ) -> dict:
69 """
70 執行流水線
71
72 Args:
73 task: 原始任務
74 stop_on_failure: 驗證失敗時是否停止
75 """
76 outputs: list[str] = []
77 self.execution_log = []
78 total_tokens = 0
79
80 print(f"\n{'='*60}")
81 print("Pipeline 協作模式")
82 print(f"{'='*60}")
83 print(f"階段數: {len(self.stages)}")
84
85 for i, stage in enumerate(self.stages):
86 print(f"\n[{i+1}/{len(self.stages)}] {stage.role.value}")
87
88 # 準備輸入
89 stage_input = self._transform_input(stage, task, outputs)
90
91 # 呼叫 Agent
92 response = client.messages.create(
93 model=stage.model,
94 max_tokens=stage.max_tokens,
95 system=stage.system_prompt,
96 messages=[{"role": "user", "content": stage_input}]
97 )
98
99 output = response.content[0].text
100 tokens = response.usage.input_tokens + response.usage.output_tokens
101 total_tokens += tokens
102
103 # 驗證輸出
104 is_valid, error = self._validate_output(stage, output)
105
106 log_entry = {
107 "stage": i + 1,
108 "role": stage.role.value,
109 "tokens": tokens,
110 "valid": is_valid,
111 "error": error
112 }
113 self.execution_log.append(log_entry)
114
115 if is_valid:
116 outputs.append(output)
117 print(f" ✓ 完成,tokens: {tokens}")
118 else:
119 print(f" ✗ 驗證失敗: {error}")
120 if stop_on_failure:
121 print(" ⚠️ 流水線中止")
122 break
123 outputs.append("") # 空輸出,繼續下一階段
124
125 return {
126 "final_output": outputs[-1] if outputs else "",
127 "all_outputs": outputs,
128 "total_tokens": total_tokens,
129 "stages_completed": len([l for l in self.execution_log if l["valid"]]),
130 "log": self.execution_log
131 }
132
133
134# 建立開發流水線
135def create_development_pipeline() -> PipelineOrchestrator:
136 """創建標準開發流水線"""
137 stages = [
138 PipelineStage(
139 role=AgentRole.ANALYST,
140 system_prompt="你是需求分析師。提取功能需求、非功能需求、限制條件。輸出 JSON。",
141 model="claude-sonnet-4-20250514",
142 max_tokens=2048
143 ),
144 PipelineStage(
145 role=AgentRole.DEVELOPER,
146 system_prompt="你是程式開發者。根據需求實作 Python 程式碼。輸出完整可執行程式碼。",
147 model="claude-sonnet-4-20250514",
148 max_tokens=4096,
149 output_validator=lambda x: "def " in x or "class " in x # 簡單驗證
150 ),
151 PipelineStage(
152 role=AgentRole.REVIEWER,
153 system_prompt="你是程式碼審查員。審查程式碼的正確性、可讀性、安全性。",
154 model="claude-3-5-haiku-20241022",
155 max_tokens=1024
156 ),
157 PipelineStage(
158 role=AgentRole.DOC_WRITER,
159 system_prompt="你是文件撰寫者。為程式碼撰寫 README 文件。",
160 model="claude-3-5-haiku-20241022",
161 max_tokens=2048
162 ),
163 ]
164
165 return PipelineOrchestrator(stages)
模式三:Parallel with Merge(並行合併)
架構圖:
┌──────────────────┐
│ Task Splitter │
│ (任務拆分) │
└────────┬─────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Agent A │ │ Agent B │ │ Agent C │
│ (並行) │ │ (並行) │ │ (並行) │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└───────────────────┼───────────────────┘
│
┌────────▼─────────┐
│ Result Merger │
│ (結果合併) │
└──────────────────┘
特點:
- 獨立子任務並行執行
- 大幅縮短總延遲
- 需要設計好合併邏輯
- 適合:可並行的獨立審查、多角度分析
1import asyncio
2from anthropic import AsyncAnthropic
3
4async_client = AsyncAnthropic()
5
6
7@dataclass
8class ParallelTask:
9 """並行任務定義"""
10 id: str
11 role: AgentRole
12 task: str
13 system_prompt: str
14 model: str
15 max_tokens: int
16
17
18class ParallelMergeOrchestrator:
19 """
20 Parallel with Merge 協作模式
21
22 適用場景:
23 1. 多角度分析(安全、效能、可讀性審查)
24 2. 多專家意見收集
25 3. 獨立子任務處理
26 """
27
28 def __init__(self):
29 self.results: dict[str, TaskResult] = {}
30
31 async def _execute_single(self, task: ParallelTask) -> TaskResult:
32 """非同步執行單個任務"""
33 try:
34 response = await async_client.messages.create(
35 model=task.model,
36 max_tokens=task.max_tokens,
37 system=task.system_prompt,
38 messages=[{"role": "user", "content": task.task}]
39 )
40
41 output = response.content[0].text
42 tokens = response.usage.input_tokens + response.usage.output_tokens
43
44 return TaskResult(
45 agent=task.role,
46 task_id=task.id,
47 status="success",
48 output=output,
49 summary=output[:500],
50 tokens_used=tokens
51 )
52
53 except Exception as e:
54 return TaskResult(
55 agent=task.role,
56 task_id=task.id,
57 status="failed",
58 output="",
59 summary="",
60 tokens_used=0,
61 metadata={"error": str(e)}
62 )
63
64 async def execute_parallel(
65 self,
66 tasks: list[ParallelTask],
67 timeout: float = 60.0
68 ) -> list[TaskResult]:
69 """並行執行所有任務"""
70 print(f"\n並行執行 {len(tasks)} 個任務...")
71
72 # 創建所有協程
73 coroutines = [self._execute_single(task) for task in tasks]
74
75 # 並行執行(帶超時)
76 try:
77 results = await asyncio.wait_for(
78 asyncio.gather(*coroutines, return_exceptions=True),
79 timeout=timeout
80 )
81 except asyncio.TimeoutError:
82 print("⚠️ 部分任務超時")
83 results = []
84
85 # 過濾結果
86 valid_results = []
87 for r in results:
88 if isinstance(r, TaskResult):
89 valid_results.append(r)
90 self.results[r.task_id] = r
91 elif isinstance(r, Exception):
92 print(f" ✗ 任務失敗: {r}")
93
94 return valid_results
95
96 def merge_results(
97 self,
98 results: list[TaskResult],
99 merge_strategy: str = "concatenate"
100 ) -> str:
101 """
102 合併並行結果
103
104 策略:
105 - concatenate: 串連所有輸出
106 - summarize: 使用 LLM 總結
107 - structured: 結構化整合
108 """
109 if merge_strategy == "concatenate":
110 merged = []
111 for r in results:
112 merged.append(f"## {r.agent.value}\n\n{r.output}")
113 return "\n\n---\n\n".join(merged)
114
115 elif merge_strategy == "structured":
116 # 結構化整合
117 structured = {
118 "results": [],
119 "summary": "",
120 "consensus": []
121 }
122
123 for r in results:
124 structured["results"].append({
125 "agent": r.agent.value,
126 "key_points": r.summary
127 })
128
129 return str(structured)
130
131 else:
132 raise ValueError(f"未知的合併策略: {merge_strategy}")
133
134
135# 建立並行審查系統
136def create_parallel_review_system():
137 """創建並行程式碼審查系統"""
138
139 async def review_code(code: str) -> dict:
140 orchestrator = ParallelMergeOrchestrator()
141
142 # 定義並行審查任務
143 tasks = [
144 ParallelTask(
145 id="security_review",
146 role=AgentRole.REVIEWER,
147 task=f"從安全性角度審查以下程式碼:\n\n{code}",
148 system_prompt="你是安全專家。專注於:SQL 注入、XSS、認證漏洞、敏感資料洩露。",
149 model="claude-3-5-haiku-20241022",
150 max_tokens=1024
151 ),
152 ParallelTask(
153 id="performance_review",
154 role=AgentRole.REVIEWER,
155 task=f"從效能角度審查以下程式碼:\n\n{code}",
156 system_prompt="你是效能專家。專注於:時間複雜度、記憶體使用、N+1 查詢、快取策略。",
157 model="claude-3-5-haiku-20241022",
158 max_tokens=1024
159 ),
160 ParallelTask(
161 id="maintainability_review",
162 role=AgentRole.REVIEWER,
163 task=f"從可維護性角度審查以下程式碼:\n\n{code}",
164 system_prompt="你是架構師。專注於:程式碼結構、命名、單一職責、可讀性。",
165 model="claude-3-5-haiku-20241022",
166 max_tokens=1024
167 ),
168 ]
169
170 # 並行執行
171 results = await orchestrator.execute_parallel(tasks)
172
173 # 合併結果
174 merged = orchestrator.merge_results(results, "concatenate")
175
176 total_tokens = sum(r.tokens_used for r in results)
177
178 return {
179 "merged_review": merged,
180 "individual_reviews": {r.task_id: r.output for r in results},
181 "total_tokens": total_tokens,
182 "parallel_speedup": f"{len(tasks)}x (理論值)"
183 }
184
185 return review_code
186
187
188# 使用範例
189async def main():
190 review_code = create_parallel_review_system()
191
192 sample_code = '''
193def get_user(user_id):
194 query = f"SELECT * FROM users WHERE id = {user_id}"
195 result = db.execute(query)
196 return result
197'''
198
199 result = await review_code(sample_code)
200 print(result["merged_review"])
201
202# asyncio.run(main())
動態路由與任務分配
智慧路由器設計
1from enum import Enum
2from dataclasses import dataclass
3from typing import Optional
4import re
5
6
7class TaskCategory(Enum):
8 """任務類別"""
9 CODE_GENERATION = "code_generation"
10 CODE_REVIEW = "code_review"
11 DATA_ANALYSIS = "data_analysis"
12 DOCUMENTATION = "documentation"
13 ARCHITECTURE = "architecture"
14 TESTING = "testing"
15 DEBUGGING = "debugging"
16 UNKNOWN = "unknown"
17
18
19@dataclass
20class RoutingDecision:
21 """路由決策"""
22 category: TaskCategory
23 primary_agent: AgentRole
24 supporting_agents: list[AgentRole]
25 confidence: float
26 reasoning: str
27
28
29class IntelligentRouter:
30 """
31 智慧任務路由器
32
33 功能:
34 1. 分析任務類型
35 2. 選擇最適合的 Agent
36 3. 決定是否需要多 Agent 協作
37 """
38
39 # 關鍵字到類別的映射
40 KEYWORD_PATTERNS = {
41 TaskCategory.CODE_GENERATION: [
42 r"寫[一個]?程式", r"實作", r"開發", r"create", r"implement",
43 r"build", r"code", r"function", r"class"
44 ],
45 TaskCategory.CODE_REVIEW: [
46 r"審查", r"review", r"檢查", r"check", r"分析.*程式碼",
47 r"有.*問題", r"bug", r"錯誤"
48 ],
49 TaskCategory.DATA_ANALYSIS: [
50 r"分析.*資料", r"統計", r"趨勢", r"data", r"analysis",
51 r"chart", r"圖表", r"視覺化"
52 ],
53 TaskCategory.DOCUMENTATION: [
54 r"文件", r"document", r"說明", r"readme", r"api.*doc",
55 r"寫.*文"
56 ],
57 TaskCategory.ARCHITECTURE: [
58 r"架構", r"設計", r"architecture", r"design", r"系統",
59 r"微服務", r"database.*設計"
60 ],
61 TaskCategory.TESTING: [
62 r"測試", r"test", r"單元", r"整合", r"unit", r"integration"
63 ],
64 TaskCategory.DEBUGGING: [
65 r"除錯", r"debug", r"修復", r"fix", r"錯誤", r"crash"
66 ]
67 }
68
69 # 類別到 Agent 的映射
70 CATEGORY_AGENT_MAP = {
71 TaskCategory.CODE_GENERATION: {
72 "primary": AgentRole.DEVELOPER,
73 "supporting": [AgentRole.ANALYST, AgentRole.REVIEWER]
74 },
75 TaskCategory.CODE_REVIEW: {
76 "primary": AgentRole.REVIEWER,
77 "supporting": []
78 },
79 TaskCategory.DATA_ANALYSIS: {
80 "primary": AgentRole.ANALYST,
81 "supporting": []
82 },
83 TaskCategory.DOCUMENTATION: {
84 "primary": AgentRole.DOC_WRITER,
85 "supporting": [AgentRole.ANALYST]
86 },
87 TaskCategory.ARCHITECTURE: {
88 "primary": AgentRole.ANALYST, # 架構師(若有的話)
89 "supporting": [AgentRole.DEVELOPER]
90 },
91 TaskCategory.TESTING: {
92 "primary": AgentRole.DEVELOPER,
93 "supporting": [AgentRole.REVIEWER]
94 },
95 TaskCategory.DEBUGGING: {
96 "primary": AgentRole.DEVELOPER,
97 "supporting": [AgentRole.REVIEWER]
98 }
99 }
100
101 def __init__(self, use_llm_fallback: bool = True):
102 self.use_llm_fallback = use_llm_fallback
103 self._routing_history: list[RoutingDecision] = []
104
105 def _keyword_classify(self, task: str) -> tuple[TaskCategory, float]:
106 """基於關鍵字的快速分類"""
107 task_lower = task.lower()
108 scores = {}
109
110 for category, patterns in self.KEYWORD_PATTERNS.items():
111 score = 0
112 for pattern in patterns:
113 if re.search(pattern, task_lower):
114 score += 1
115 scores[category] = score
116
117 if not scores or max(scores.values()) == 0:
118 return TaskCategory.UNKNOWN, 0.0
119
120 best_category = max(scores, key=scores.get)
121 # 信心度 = 匹配數 / 總模式數
122 confidence = scores[best_category] / len(self.KEYWORD_PATTERNS[best_category])
123
124 return best_category, min(confidence, 1.0)
125
126 def _llm_classify(self, task: str) -> tuple[TaskCategory, float]:
127 """使用 LLM 進行精確分類(較貴但更準確)"""
128 classification_prompt = f"""分類以下任務。只回答類別名稱。
129
130類別:
131- code_generation: 寫程式碼、實作功能
132- code_review: 審查現有程式碼
133- data_analysis: 分析資料、統計
134- documentation: 撰寫文件
135- architecture: 系統設計、架構規劃
136- testing: 撰寫或執行測試
137- debugging: 除錯、修復問題
138- unknown: 無法分類
139
140任務:{task}
141
142類別:"""
143
144 response = client.messages.create(
145 model="claude-3-5-haiku-20241022", # 使用便宜的模型做分類
146 max_tokens=20,
147 messages=[{"role": "user", "content": classification_prompt}]
148 )
149
150 result = response.content[0].text.strip().lower()
151
152 # 解析結果
153 for category in TaskCategory:
154 if category.value in result:
155 return category, 0.9 # LLM 分類給予較高信心度
156
157 return TaskCategory.UNKNOWN, 0.5
158
159 def route(self, task: str) -> RoutingDecision:
160 """
161 路由任務到適當的 Agent
162
163 邏輯:
164 1. 先用關鍵字快速分類
165 2. 信心度低時,使用 LLM 分類
166 3. 根據類別選擇 Agent
167 """
168 # 快速分類
169 category, confidence = self._keyword_classify(task)
170
171 # 信心度低時使用 LLM
172 if confidence < 0.5 and self.use_llm_fallback:
173 llm_category, llm_confidence = self._llm_classify(task)
174 if llm_confidence > confidence:
175 category = llm_category
176 confidence = llm_confidence
177
178 # 選擇 Agent
179 agent_config = self.CATEGORY_AGENT_MAP.get(
180 category,
181 {"primary": AgentRole.ORCHESTRATOR, "supporting": []}
182 )
183
184 decision = RoutingDecision(
185 category=category,
186 primary_agent=agent_config["primary"],
187 supporting_agents=agent_config["supporting"],
188 confidence=confidence,
189 reasoning=f"根據任務內容,分類為 {category.value},信心度 {confidence:.2f}"
190 )
191
192 self._routing_history.append(decision)
193 return decision
194
195 def get_routing_stats(self) -> dict:
196 """取得路由統計"""
197 if not self._routing_history:
198 return {"total": 0}
199
200 category_counts = {}
201 for decision in self._routing_history:
202 cat = decision.category.value
203 category_counts[cat] = category_counts.get(cat, 0) + 1
204
205 return {
206 "total": len(self._routing_history),
207 "by_category": category_counts,
208 "avg_confidence": sum(d.confidence for d in self._routing_history) / len(self._routing_history)
209 }
210
211
212class AdaptiveOrchestrator:
213 """
214 自適應協調器
215
216 根據任務特性自動選擇協作模式:
217 - 簡單任務 → 單 Agent
218 - 中等任務 → Pipeline
219 - 複雜任務 → Hub-and-Spoke
220 - 可並行任務 → Parallel
221 """
222
223 def __init__(self):
224 self.router = IntelligentRouter()
225 self.hub_spoke = HubAndSpokeOrchestrator()
226 self.pipeline = create_development_pipeline()
227
228 def _estimate_complexity(self, task: str, routing: RoutingDecision) -> str:
229 """估算任務複雜度"""
230 # 簡單啟發式
231 word_count = len(task.split())
232
233 if routing.confidence > 0.8 and word_count < 30:
234 return "simple"
235 elif len(routing.supporting_agents) == 0:
236 return "simple"
237 elif len(routing.supporting_agents) >= 2:
238 return "complex"
239 else:
240 return "medium"
241
242 def execute(self, task: str) -> dict:
243 """自適應執行任務"""
244 # 路由決策
245 routing = self.router.route(task)
246 complexity = self._estimate_complexity(task, routing)
247
248 print(f"\n路由決策:")
249 print(f" 類別: {routing.category.value}")
250 print(f" 主要 Agent: {routing.primary_agent.value}")
251 print(f" 支援 Agent: {[a.value for a in routing.supporting_agents]}")
252 print(f" 複雜度: {complexity}")
253 print(f" 信心度: {routing.confidence:.2f}")
254
255 # 根據複雜度選擇模式
256 if complexity == "simple":
257 # 單 Agent 執行
258 print("\n使用模式:單 Agent")
259 result = self._execute_single_agent(task, routing.primary_agent)
260
261 elif complexity == "medium":
262 # Pipeline 執行
263 print("\n使用模式:Pipeline")
264 result = self.pipeline.execute(task)
265
266 else:
267 # Hub-and-Spoke 執行
268 print("\n使用模式:Hub-and-Spoke")
269 result = self.hub_spoke.execute(task)
270
271 return {
272 "routing": routing,
273 "mode": complexity,
274 "result": result
275 }
276
277 def _execute_single_agent(self, task: str, role: AgentRole) -> dict:
278 """單 Agent 執行"""
279 config = self.hub_spoke.agents[role]
280
281 response = client.messages.create(
282 model=config["model"],
283 max_tokens=config["max_tokens"],
284 system=config["system"],
285 messages=[{"role": "user", "content": task}]
286 )
287
288 return {
289 "output": response.content[0].text,
290 "tokens": response.usage.input_tokens + response.usage.output_tokens
291 }
狀態管理與錯誤處理
分散式狀態管理
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from enum import Enum
4import json
5import time
6
7
8class TaskStatus(Enum):
9 """任務狀態"""
10 PENDING = "pending"
11 IN_PROGRESS = "in_progress"
12 COMPLETED = "completed"
13 FAILED = "failed"
14 BLOCKED = "blocked"
15
16
17@dataclass
18class AgentState:
19 """Agent 狀態"""
20 role: AgentRole
21 current_task: Optional[str] = None
22 status: TaskStatus = TaskStatus.PENDING
23 last_output: Optional[str] = None
24 error: Optional[str] = None
25 retries: int = 0
26 tokens_consumed: int = 0
27
28
29@dataclass
30class WorkflowState:
31 """工作流程狀態"""
32 workflow_id: str
33 original_task: str
34 agent_states: dict[str, AgentState] = field(default_factory=dict)
35 shared_context: dict[str, Any] = field(default_factory=dict)
36 execution_order: list[str] = field(default_factory=list)
37 current_step: int = 0
38 start_time: float = field(default_factory=time.time)
39 end_time: Optional[float] = None
40
41 def to_dict(self) -> dict:
42 """序列化為字典(用於持久化)"""
43 return {
44 "workflow_id": self.workflow_id,
45 "original_task": self.original_task,
46 "agent_states": {
47 k: {
48 "role": v.role.value,
49 "status": v.status.value,
50 "retries": v.retries,
51 "tokens_consumed": v.tokens_consumed
52 }
53 for k, v in self.agent_states.items()
54 },
55 "current_step": self.current_step,
56 "total_steps": len(self.execution_order),
57 "elapsed_time": time.time() - self.start_time
58 }
59
60
61class StatefulOrchestrator:
62 """
63 帶狀態管理的協調器
64
65 功能:
66 1. 追蹤每個 Agent 的狀態
67 2. 支援斷點續傳
68 3. 錯誤恢復和重試
69 """
70
71 MAX_RETRIES = 3
72 RETRY_DELAY = 1.0
73
74 def __init__(self):
75 self.current_workflow: Optional[WorkflowState] = None
76 self.agents = self._initialize_agents()
77
78 def _initialize_agents(self) -> dict:
79 """初始化 Agent 配置"""
80 return {
81 AgentRole.ANALYST: {
82 "model": "claude-sonnet-4-20250514",
83 "system": "你是需求分析師。分析需求並輸出 JSON。",
84 "max_tokens": 2048
85 },
86 AgentRole.DEVELOPER: {
87 "model": "claude-sonnet-4-20250514",
88 "system": "你是程式開發者。根據需求實作程式碼。",
89 "max_tokens": 4096
90 },
91 AgentRole.REVIEWER: {
92 "model": "claude-3-5-haiku-20241022",
93 "system": "你是審查員。審查程式碼並列出問題。",
94 "max_tokens": 1024
95 }
96 }
97
98 def _create_workflow(self, task: str) -> WorkflowState:
99 """創建新的工作流程"""
100 workflow_id = f"wf_{int(time.time())}"
101
102 workflow = WorkflowState(
103 workflow_id=workflow_id,
104 original_task=task,
105 execution_order=[
106 AgentRole.ANALYST.value,
107 AgentRole.DEVELOPER.value,
108 AgentRole.REVIEWER.value
109 ]
110 )
111
112 # 初始化各 Agent 狀態
113 for role_name in workflow.execution_order:
114 role = AgentRole(role_name)
115 workflow.agent_states[role_name] = AgentState(role=role)
116
117 return workflow
118
119 def _execute_agent_with_retry(
120 self,
121 role: AgentRole,
122 task: str,
123 context: str = ""
124 ) -> tuple[str, int]:
125 """帶重試的 Agent 執行"""
126 config = self.agents[role]
127 state = self.current_workflow.agent_states[role.value]
128
129 full_input = f"{context}\n\n任務:{task}" if context else task
130
131 for attempt in range(self.MAX_RETRIES):
132 try:
133 state.status = TaskStatus.IN_PROGRESS
134 state.current_task = task
135
136 response = client.messages.create(
137 model=config["model"],
138 max_tokens=config["max_tokens"],
139 system=config["system"],
140 messages=[{"role": "user", "content": full_input}]
141 )
142
143 output = response.content[0].text
144 tokens = response.usage.input_tokens + response.usage.output_tokens
145
146 state.status = TaskStatus.COMPLETED
147 state.last_output = output
148 state.tokens_consumed += tokens
149 state.error = None
150
151 return output, tokens
152
153 except Exception as e:
154 state.retries += 1
155 state.error = str(e)
156
157 if attempt < self.MAX_RETRIES - 1:
158 print(f" ⚠️ 重試 {attempt + 1}/{self.MAX_RETRIES}: {e}")
159 time.sleep(self.RETRY_DELAY * (attempt + 1))
160 else:
161 state.status = TaskStatus.FAILED
162 raise
163
164 def execute(self, task: str, resume_from: Optional[str] = None) -> dict:
165 """
166 執行工作流程
167
168 Args:
169 task: 任務描述
170 resume_from: 可選,從指定工作流程 ID 續傳
171 """
172 # 創建或恢復工作流程
173 if resume_from:
174 # TODO: 從持久化存儲載入
175 print(f"續傳模式:{resume_from}")
176 else:
177 self.current_workflow = self._create_workflow(task)
178
179 print(f"\n{'='*60}")
180 print(f"工作流程: {self.current_workflow.workflow_id}")
181 print(f"{'='*60}")
182
183 results = []
184 total_tokens = 0
185
186 # 執行各步驟
187 for i, role_name in enumerate(self.current_workflow.execution_order):
188 if i < self.current_workflow.current_step:
189 print(f"[{i+1}] {role_name}: 跳過(已完成)")
190 continue
191
192 role = AgentRole(role_name)
193 state = self.current_workflow.agent_states[role_name]
194
195 print(f"\n[{i+1}] {role_name}")
196
197 try:
198 # 構建 Context(前一步的摘要)
199 context = ""
200 if results:
201 last_result = results[-1]
202 context = f"[上一步輸出摘要]\n{last_result[:1000]}..."
203
204 # 執行
205 output, tokens = self._execute_agent_with_retry(role, task, context)
206 results.append(output)
207 total_tokens += tokens
208
209 print(f" ✓ 完成,tokens: {tokens}")
210
211 # 更新進度
212 self.current_workflow.current_step = i + 1
213
214 # 保存 checkpoint(可選:持久化)
215 self._save_checkpoint()
216
217 except Exception as e:
218 print(f" ✗ 失敗: {e}")
219 return {
220 "status": "failed",
221 "failed_at": role_name,
222 "error": str(e),
223 "partial_results": results,
224 "workflow_state": self.current_workflow.to_dict()
225 }
226
227 # 完成
228 self.current_workflow.end_time = time.time()
229
230 return {
231 "status": "completed",
232 "workflow_id": self.current_workflow.workflow_id,
233 "final_output": results[-1] if results else "",
234 "all_outputs": results,
235 "total_tokens": total_tokens,
236 "elapsed_time": self.current_workflow.end_time - self.current_workflow.start_time,
237 "workflow_state": self.current_workflow.to_dict()
238 }
239
240 def _save_checkpoint(self):
241 """保存檢查點(用於斷點續傳)"""
242 # 實際應用中,這裡會寫入資料庫或檔案
243 state = self.current_workflow.to_dict()
244 # print(f" 📍 Checkpoint: step {state['current_step']}/{state['total_steps']}")
245
246
247class ErrorRecoveryOrchestrator(StatefulOrchestrator):
248 """
249 帶錯誤恢復的協調器
250
251 策略:
252 1. 重試(帶退避)
253 2. 降級(使用備用 Agent)
254 3. 跳過(標記為可選步驟)
255 4. 中止(關鍵步驟失敗)
256 """
257
258 def __init__(self):
259 super().__init__()
260
261 # 定義步驟的關鍵性
262 self.step_criticality = {
263 AgentRole.ANALYST.value: "critical", # 失敗則中止
264 AgentRole.DEVELOPER.value: "critical",
265 AgentRole.REVIEWER.value: "optional", # 失敗可跳過
266 }
267
268 # 備用 Agent 映射
269 self.fallback_agents = {
270 AgentRole.DEVELOPER: AgentRole.ANALYST, # 開發者失敗時,嘗試分析師
271 }
272
273 def _handle_failure(
274 self,
275 role: AgentRole,
276 error: Exception
277 ) -> tuple[str, str]:
278 """
279 處理失敗
280
281 Returns:
282 (action, reason): 行動和原因
283 """
284 criticality = self.step_criticality.get(role.value, "critical")
285
286 if criticality == "optional":
287 return "skip", f"{role.value} 為可選步驟,跳過"
288
289 if role in self.fallback_agents:
290 fallback = self.fallback_agents[role]
291 return "fallback", f"嘗試使用備用 Agent: {fallback.value}"
292
293 return "abort", f"關鍵步驟 {role.value} 失敗,中止流程"
生產級最佳實踐
Token 預算管理
1@dataclass
2class TokenBudget:
3 """Token 預算配置"""
4 total_budget: int
5 per_agent_limits: dict[str, int]
6 reserve_ratio: float = 0.1 # 保留 10% 給協調開銷
7
8 def get_available(self, role: AgentRole) -> int:
9 """取得可用 Token"""
10 limit = self.per_agent_limits.get(role.value, 2000)
11 return limit
12
13 def can_afford(self, estimated_tokens: int, used: int) -> bool:
14 """檢查是否能負擔"""
15 available = self.total_budget * (1 - self.reserve_ratio) - used
16 return estimated_tokens <= available
17
18
19class BudgetAwareOrchestrator:
20 """Token 預算感知的協調器"""
21
22 def __init__(self, budget: TokenBudget):
23 self.budget = budget
24 self.tokens_used = 0
25
26 def _estimate_tokens(self, task: str, role: AgentRole) -> int:
27 """估算任務所需 Token"""
28 # 簡單估算:System Prompt + 輸入 + 預期輸出
29 base_estimate = {
30 AgentRole.ANALYST: 3000,
31 AgentRole.DEVELOPER: 6000,
32 AgentRole.REVIEWER: 2000,
33 AgentRole.DOC_WRITER: 3000,
34 AgentRole.ORCHESTRATOR: 2500
35 }
36 return base_estimate.get(role, 3000)
37
38 def execute_within_budget(self, task: str) -> dict:
39 """在預算內執行"""
40 steps = [
41 AgentRole.ANALYST,
42 AgentRole.DEVELOPER,
43 AgentRole.REVIEWER
44 ]
45
46 results = []
47 for role in steps:
48 estimated = self._estimate_tokens(task, role)
49
50 if not self.budget.can_afford(estimated, self.tokens_used):
51 print(f"⚠️ 預算不足,跳過 {role.value}")
52 continue
53
54 # 執行...
55 # result, tokens = self._execute(role, task)
56 # self.tokens_used += tokens
57 # results.append(result)
58
59 return {
60 "results": results,
61 "tokens_used": self.tokens_used,
62 "budget_remaining": self.budget.total_budget - self.tokens_used
63 }
監控與可觀測性
1import time
2from dataclasses import dataclass, field
3
4
5@dataclass
6class AgentMetrics:
7 """Agent 指標"""
8 role: str
9 invocations: int = 0
10 total_tokens: int = 0
11 total_latency_ms: float = 0
12 errors: int = 0
13 retries: int = 0
14
15 @property
16 def avg_tokens(self) -> float:
17 return self.total_tokens / self.invocations if self.invocations > 0 else 0
18
19 @property
20 def avg_latency_ms(self) -> float:
21 return self.total_latency_ms / self.invocations if self.invocations > 0 else 0
22
23 @property
24 def error_rate(self) -> float:
25 return self.errors / self.invocations if self.invocations > 0 else 0
26
27
28class MetricsCollector:
29 """指標收集器"""
30
31 def __init__(self):
32 self.agent_metrics: dict[str, AgentMetrics] = {}
33 self.workflow_metrics: list[dict] = []
34
35 def record_agent_call(
36 self,
37 role: AgentRole,
38 tokens: int,
39 latency_ms: float,
40 success: bool
41 ):
42 """記錄 Agent 呼叫"""
43 role_key = role.value
44
45 if role_key not in self.agent_metrics:
46 self.agent_metrics[role_key] = AgentMetrics(role=role_key)
47
48 metrics = self.agent_metrics[role_key]
49 metrics.invocations += 1
50 metrics.total_tokens += tokens
51 metrics.total_latency_ms += latency_ms
52
53 if not success:
54 metrics.errors += 1
55
56 def get_report(self) -> str:
57 """生成報告"""
58 lines = [
59 "="*60,
60 "Agent 協作系統監控報告",
61 "="*60,
62 ""
63 ]
64
65 for role, m in self.agent_metrics.items():
66 lines.extend([
67 f"【{role}】",
68 f" 呼叫次數: {m.invocations}",
69 f" 總 Tokens: {m.total_tokens:,}",
70 f" 平均 Tokens: {m.avg_tokens:.0f}",
71 f" 平均延遲: {m.avg_latency_ms:.0f}ms",
72 f" 錯誤率: {m.error_rate*100:.1f}%",
73 ""
74 ])
75
76 # 總計
77 total_tokens = sum(m.total_tokens for m in self.agent_metrics.values())
78 total_calls = sum(m.invocations for m in self.agent_metrics.values())
79
80 lines.extend([
81 "-"*60,
82 f"總呼叫: {total_calls}",
83 f"總 Tokens: {total_tokens:,}",
84 "="*60
85 ])
86
87 return "\n".join(lines)
完整範例:生產級開發團隊
1"""
2生產級專責化 Agent 開發團隊
3
4整合:
51. 智慧路由
62. Hub-and-Spoke 協調
73. 狀態管理
84. 錯誤恢復
95. Token 預算
106. 監控
11"""
12
13class ProductionAgentTeam:
14 """生產級 Agent 團隊"""
15
16 def __init__(
17 self,
18 token_budget: int = 50000,
19 enable_metrics: bool = True
20 ):
21 # 核心組件
22 self.router = IntelligentRouter()
23 self.orchestrator = StatefulOrchestrator()
24
25 # 預算
26 self.budget = TokenBudget(
27 total_budget=token_budget,
28 per_agent_limits={
29 "analyst": 3000,
30 "developer": 8000,
31 "reviewer": 2000,
32 "doc_writer": 3000,
33 "orchestrator": 2500
34 }
35 )
36
37 # 監控
38 self.metrics = MetricsCollector() if enable_metrics else None
39
40 self.tokens_used = 0
41
42 def process_request(self, request: str) -> dict:
43 """處理使用者請求"""
44 start_time = time.time()
45
46 print(f"\n{'='*60}")
47 print("生產級 Agent 團隊")
48 print(f"{'='*60}")
49 print(f"請求: {request[:100]}...")
50 print(f"Token 預算: {self.budget.total_budget:,}")
51
52 # 1. 路由決策
53 routing = self.router.route(request)
54 print(f"\n路由決策: {routing.primary_agent.value} (信心度: {routing.confidence:.2f})")
55
56 # 2. 執行工作流程
57 result = self.orchestrator.execute(request)
58
59 # 3. 更新統計
60 elapsed = (time.time() - start_time) * 1000
61
62 if self.metrics:
63 # 從工作流程狀態提取各 Agent 的指標
64 for role_name, state in self.orchestrator.current_workflow.agent_states.items():
65 self.metrics.record_agent_call(
66 role=AgentRole(role_name),
67 tokens=state.tokens_consumed,
68 latency_ms=elapsed / len(self.orchestrator.current_workflow.agent_states),
69 success=state.status == TaskStatus.COMPLETED
70 )
71
72 # 4. 返回結果
73 return {
74 "status": result["status"],
75 "output": result.get("final_output", ""),
76 "total_tokens": result.get("total_tokens", 0),
77 "elapsed_ms": elapsed,
78 "routing": {
79 "category": routing.category.value,
80 "primary_agent": routing.primary_agent.value,
81 "confidence": routing.confidence
82 },
83 "workflow_id": result.get("workflow_id")
84 }
85
86 def get_metrics_report(self) -> str:
87 """取得監控報告"""
88 if self.metrics:
89 return self.metrics.get_report()
90 return "監控未啟用"
91
92
93# 使用範例
94if __name__ == "__main__":
95 team = ProductionAgentTeam(token_budget=30000)
96
97 # 處理請求
98 result = team.process_request("""
99 開發一個簡單的 REST API:
100 - 使用者註冊和登入
101 - JWT 認證
102 - 基本的 CRUD 操作
103 """)
104
105 print(f"\n結果:")
106 print(f" 狀態: {result['status']}")
107 print(f" Tokens: {result['total_tokens']:,}")
108 print(f" 延遲: {result['elapsed_ms']:.0f}ms")
109
110 print(f"\n{team.get_metrics_report()}")
優化效果總結
專責化 + 協作優化的綜合效果:
┌─────────────────────────────────────────────────────────────────────┐
│ 優化前 vs 優化後對比 │
├────────────────────┬───────────────┬───────────────┬────────────────┤
│ 指標 │ 優化前 │ 優化後 │ 改善 │
│ │ (通用 Agent) │ (專責+協作) │ │
├────────────────────┼───────────────┼───────────────┼────────────────┤
│ System Prompt │ 16,000 tok │ 2,500 tok │ -84% │
│ 工具定義 │ 5,000 tok │ 800 tok │ -84% │
│ 每任務固定成本 │ 21,000 tok │ 3,300 tok │ -84% │
│ 協調開銷 │ 0 │ +1,500 tok │ +1,500 │
│ 淨節省 │ - │ - │ -80% │
├────────────────────┼───────────────┼───────────────┼────────────────┤
│ 5 任務總成本 │ 105,000 tok │ 23,000 tok │ -78% │
│ 10 任務總成本 │ 210,000 tok │ 43,000 tok │ -80% │
└────────────────────┴───────────────┴───────────────┴────────────────┘
額外效益:
✅ 模型差異化選擇:簡單任務用 Haiku,再省 50%+
✅ 並行執行:延遲降低 50-70%
✅ 錯誤隔離:單一 Agent 失敗不影響整體
✅ 可觀測性:精確追蹤各環節消耗
最佳實踐清單
專責化 Agent 協作 Checklist:
架構選擇
□ 根據任務特性選擇協作模式(Hub-Spoke / Pipeline / Parallel)
□ 確認協調開銷小於專責化節省
□ 設計清晰的 Agent 邊界和介面
路由設計
□ 實作關鍵字快速分類
□ 可選:LLM 精確分類(作為後備)
□ 持續優化分類準確度
狀態管理
□ 追蹤每個 Agent 的執行狀態
□ 支援斷點續傳
□ 定義步驟關鍵性(critical / optional)
錯誤處理
□ 實作重試機制(帶指數退避)
□ 定義降級策略
□ 適當的錯誤傳播和中止條件
資源管理
□ 設定 Token 預算上限
□ 各 Agent 獨立預算控制
□ 預留協調開銷空間
監控
□ 追蹤各 Agent 的 Token 消耗
□ 追蹤延遲和錯誤率
□ 定期審查和優化
總結
專責化 Agent 的協作設計是發揮其最大價值的關鍵。本文介紹的協作模式涵蓋:
| 模式 | 適用場景 | Token 效率 | 延遲 |
|---|---|---|---|
| Hub-and-Spoke | 一般任務 | 高 | 中 |
| Pipeline | 順序依賴 | 高 | 高 |
| Parallel | 獨立子任務 | 高 | 低 |
| Adaptive | 混合場景 | 最高 | 可變 |
核心原則:
- 最小化協調開銷:確保協調成本小於專責化節省
- 精準的 Context 傳遞:只傳遞必要的摘要,不傳完整輸出
- 智慧路由:根據任務特性自動選擇最佳模式
- 穩健的狀態管理:支援斷點續傳和錯誤恢復
- 持續監控優化:數據驅動的效能調校
透過合理的協作設計,專責化 Agent 團隊可以在保持 80%+ Token 節省的同時,提供更可靠、更快速的服務。
