前言
多智慧體系統是現代 AI 應用的重要發展方向,能夠處理複雜的企業級任務。作為 AI FDE,掌握多智慧體框架的設計與實作是核心技能之一。本文將深入探討 LangGraph、CrewAI 等主流框架,以及 Model Context Protocol (MCP) 的實際應用。
1. 多智慧體系統核心概念
基礎架構原理
Agent 核心組件:
- 感知器 (Perception):接收與理解環境信息
- 決策器 (Decision Making):基於目標與狀態規劃行動
- 執行器 (Action):與環境互動執行任務
- 記憶體 (Memory):儲存狀態、經驗與知識
協作模式:
1from enum import Enum
2from dataclasses import dataclass
3from typing import List, Dict, Any
4
5class CoordinationPattern(Enum):
6 SEQUENTIAL = "sequential" # 順序執行
7 PARALLEL = "parallel" # 並行執行
8 HIERARCHICAL = "hierarchical" # 階層式管理
9 COLLABORATIVE = "collaborative" # 協作式決策
10
11@dataclass
12class AgentTask:
13 task_id: str
14 description: str
15 agent_id: str
16 dependencies: List[str]
17 priority: int
18 metadata: Dict[str, Any]
19
20class MultiAgentOrchestrator:
21 def __init__(self, coordination_pattern: CoordinationPattern):
22 self.pattern = coordination_pattern
23 self.agents = {}
24 self.task_queue = []
25
26 def execute_workflow(self, tasks: List[AgentTask]):
27 if self.pattern == CoordinationPattern.SEQUENTIAL:
28 return self._execute_sequential(tasks)
29 elif self.pattern == CoordinationPattern.PARALLEL:
30 return self._execute_parallel(tasks)
31 # 其他模式實作...
2. LangGraph 框架深度實作
狀態圖設計架構
核心概念實作:
1from typing import TypedDict, Annotated
2import operator
3from langgraph.graph import StateGraph, END
4from langgraph.prebuilt import ToolNode
5
6class AgentState(TypedDict):
7 messages: Annotated[list, operator.add]
8 current_task: str
9 completed_tasks: Annotated[list, operator.add]
10 context: dict
11 iteration_count: int
12
13def create_research_workflow():
14 workflow = StateGraph(AgentState)
15
16 # 定義節點
17 workflow.add_node("planner", planning_agent)
18 workflow.add_node("researcher", research_agent)
19 workflow.add_node("analyzer", analysis_agent)
20 workflow.add_node("synthesizer", synthesis_agent)
21 workflow.add_node("tools", ToolNode(research_tools))
22
23 # 定義邊與條件
24 workflow.set_entry_point("planner")
25
26 workflow.add_conditional_edges(
27 "planner",
28 should_continue,
29 {
30 "continue": "researcher",
31 "end": END
32 }
33 )
34
35 workflow.add_edge("researcher", "tools")
36 workflow.add_edge("tools", "analyzer")
37 workflow.add_edge("analyzer", "synthesizer")
38 workflow.add_edge("synthesizer", "planner")
39
40 return workflow.compile()
41
42def planning_agent(state: AgentState):
43 """規劃代理:分解任務與分配工作"""
44 messages = state["messages"]
45 current_task = state.get("current_task", "")
46
47 planning_prompt = f"""
48 基於以下訊息規劃研究任務:
49 {messages[-1]["content"] if messages else ""}
50
51 當前任務狀態:{current_task}
52 已完成任務:{state.get("completed_tasks", [])}
53
54 請分析並決定下一步行動:
55 1. 如果需要更多研究,返回具體的研究計劃
56 2. 如果資訊足夠,返回 "完成研究"
57 """
58
59 # LLM 呼叫邏輯
60 response = llm.invoke(planning_prompt)
61
62 return {
63 "messages": [{"role": "planner", "content": response.content}],
64 "current_task": response.content,
65 "iteration_count": state.get("iteration_count", 0) + 1
66 }
67
68def should_continue(state: AgentState) -> str:
69 """決定工作流程是否繼續"""
70 messages = state["messages"]
71 iteration_count = state.get("iteration_count", 0)
72
73 if iteration_count > 10:
74 return "end"
75
76 last_message = messages[-1]["content"] if messages else ""
77 if "完成研究" in last_message or "research complete" in last_message.lower():
78 return "end"
79
80 return "continue"
進階功能實作
檢查點與狀態持久化:
1from langgraph.checkpoint.sqlite import SqliteSaver
2
3# 設定檢查點保存
4checkpoint_saver = SqliteSaver.from_conn_string(":memory:")
5
6# 編譯時加入檢查點功能
7app = workflow.compile(checkpointer=checkpoint_saver)
8
9# 帶狀態恢復的執行
10thread_config = {"configurable": {"thread_id": "research-session-001"}}
11
12# 執行並自動保存狀態
13result = app.invoke(
14 {"messages": [{"role": "user", "content": "研究 AI 在醫療領域的應用"}]},
15 config=thread_config
16)
17
18# 從檢查點恢復並繼續執行
19resumed_result = app.invoke(
20 {"messages": [{"role": "user", "content": "請提供更詳細的分析"}]},
21 config=thread_config
22)
人工干預機制:
1from langgraph.prebuilt import create_react_agent
2
3def create_human_in_loop_agent():
4 def human_approval_node(state: AgentState):
5 last_action = state["messages"][-1]
6
7 # 檢查是否需要人工確認
8 if requires_approval(last_action):
9 print(f"需要確認以下行動:{last_action['content']}")
10 approval = input("是否繼續? (y/n): ")
11
12 if approval.lower() != 'y':
13 return {
14 "messages": [{"role": "system", "content": "行動已被用戶取消"}],
15 "current_task": "awaiting_user_input"
16 }
17
18 return state
19
20 workflow.add_node("human_approval", human_approval_node)
21 workflow.add_edge("analyzer", "human_approval")
22 workflow.add_edge("human_approval", "synthesizer")
23
24def requires_approval(action):
25 """定義需要人工確認的行動類型"""
26 high_risk_actions = [
27 "發送郵件", "修改資料庫", "刪除檔案",
28 "執行系統指令", "進行付款操作"
29 ]
30
31 return any(risk_action in action["content"] for risk_action in high_risk_actions)
3. CrewAI 框架企業應用
團隊協作架構
CrewAI 核心實作:
1from crewai import Agent, Task, Crew
2from crewai.tools import BaseTool
3from langchain_openai import ChatOpenAI
4
5class DataAnalysisTool(BaseTool):
6 name: str = "數據分析工具"
7 description: str = "分析業務數據並產生洞察報告"
8
9 def _run(self, data_query: str) -> str:
10 # 實際數據分析邏輯
11 analysis_result = perform_data_analysis(data_query)
12 return f"數據分析結果:{analysis_result}"
13
14class MarketResearchTool(BaseTool):
15 name: str = "市場研究工具"
16 description: str = "收集市場趨勢與競爭分析"
17
18 def _run(self, market_segment: str) -> str:
19 market_data = fetch_market_data(market_segment)
20 return f"市場研究報告:{market_data}"
21
22def create_business_intelligence_crew():
23 # 初始化 LLM
24 llm = ChatOpenAI(model="gpt-4")
25
26 # 定義專業代理
27 data_analyst = Agent(
28 role="資深數據分析師",
29 goal="分析業務數據並識別關鍵趨勢與機會",
30 backstory="""
31 您是一位擁有 10 年經驗的資深數據分析師,
32 專長於商業智能與數據挖掘,能夠從複雜數據中
33 提取有價值的商業洞察。
34 """,
35 tools=[DataAnalysisTool()],
36 llm=llm,
37 verbose=True
38 )
39
40 market_researcher = Agent(
41 role="市場研究專家",
42 goal="提供深度市場分析與競爭情報",
43 backstory="""
44 您是市場研究領域的專家,具備敏銳的市場洞察力,
45 能夠識別新興趨勢並評估競爭威脅與機會。
46 """,
47 tools=[MarketResearchTool()],
48 llm=llm,
49 verbose=True
50 )
51
52 strategy_consultant = Agent(
53 role="策略顧問",
54 goal="整合分析結果並制定可執行的業務策略",
55 backstory="""
56 您是一位經驗豐富的策略顧問,擅長將數據洞察
57 轉化為具體的業務行動計劃與投資建議。
58 """,
59 llm=llm,
60 verbose=True
61 )
62
63 # 定義任務流程
64 data_analysis_task = Task(
65 description="""
66 分析過去 12 個月的業務數據:
67 1. 識別收入趨勢與季節性模式
68 2. 分析客戶行為變化
69 3. 評估產品效能表現
70 4. 提供數據驅動的洞察
71 """,
72 expected_output="詳細的數據分析報告,包含關鍵指標與趨勢分析",
73 agent=data_analyst
74 )
75
76 market_research_task = Task(
77 description="""
78 進行全面市場研究:
79 1. 分析市場規模與成長潛力
80 2. 識別主要競爭對手與其策略
81 3. 評估新興技術趨勢影響
82 4. 分析客戶需求變化
83 """,
84 expected_output="市場研究報告,包含競爭分析與機會評估",
85 agent=market_researcher,
86 dependencies=[data_analysis_task]
87 )
88
89 strategy_formulation_task = Task(
90 description="""
91 基於數據分析與市場研究結果制定策略:
92 1. 整合所有分析結果
93 2. 識別核心業務機會
94 3. 制定具體行動計劃
95 4. 設定關鍵績效指標 (KPI)
96 """,
97 expected_output="完整的業務策略文件,包含執行計劃與成功指標",
98 agent=strategy_consultant,
99 dependencies=[data_analysis_task, market_research_task]
100 )
101
102 # 組建團隊
103 crew = Crew(
104 agents=[data_analyst, market_researcher, strategy_consultant],
105 tasks=[data_analysis_task, market_research_task, strategy_formulation_task],
106 verbose=True,
107 process="sequential" # 或 "hierarchical" 用於階層式管理
108 )
109
110 return crew
111
112# 執行業務智能分析
113def run_business_intelligence_analysis():
114 crew = create_business_intelligence_crew()
115
116 result = crew.kickoff(inputs={
117 "business_context": "電子商務平台",
118 "analysis_period": "2024年度",
119 "focus_areas": ["客戶增長", "產品優化", "市場擴張"]
120 })
121
122 return result
階層式代理管理
Manager Agent 實作:
1from crewai.process import Process
2
3def create_hierarchical_crew():
4 # 管理者代理
5 manager = Agent(
6 role="專案經理",
7 goal="協調團隊工作並確保專案按時交付",
8 backstory="""
9 您是一位經驗豐富的專案經理,擅長團隊協調與資源分配,
10 能夠識別潛在風險並制定緩解策略。
11 """,
12 llm=llm,
13 allow_delegation=True, # 允許委派任務
14 max_delegation_depth=2 # 最大委派層級
15 )
16
17 # 專業執行代理
18 technical_lead = Agent(
19 role="技術主管",
20 goal="負責技術決策與架構設計",
21 backstory="您是技術團隊的領導者,負責確保技術方案的可行性。",
22 llm=llm
23 )
24
25 quality_assurance = Agent(
26 role="品質保證專員",
27 goal="確保交付成果符合品質標準",
28 backstory="您專注於品質控制與測試驗證。",
29 llm=llm
30 )
31
32 crew = Crew(
33 agents=[manager, technical_lead, quality_assurance],
34 tasks=[project_planning_task, implementation_task, quality_review_task],
35 process=Process.hierarchical, # 階層式流程
36 manager_llm=llm # 管理者使用的 LLM
37 )
38
39 return crew
4. Model Context Protocol (MCP) 實作
MCP 核心架構
協定實作框架:
1from typing import Protocol, runtime_checkable
2import asyncio
3import json
4
5@runtime_checkable
6class MCPServer(Protocol):
7 async def handle_request(self, request: dict) -> dict:
8 """處理 MCP 請求"""
9 pass
10
11 async def initialize(self, capabilities: dict) -> dict:
12 """初始化服務器能力"""
13 pass
14
15class CustomMCPServer:
16 def __init__(self, name: str, version: str):
17 self.name = name
18 self.version = version
19 self.capabilities = {
20 "tools": {},
21 "resources": {},
22 "prompts": {}
23 }
24
25 async def handle_request(self, request: dict) -> dict:
26 method = request.get("method")
27 params = request.get("params", {})
28
29 if method == "tools/call":
30 return await self.handle_tool_call(params)
31 elif method == "resources/read":
32 return await self.handle_resource_read(params)
33 elif method == "prompts/get":
34 return await self.handle_prompt_get(params)
35 else:
36 return {"error": f"Unsupported method: {method}"}
37
38 async def handle_tool_call(self, params: dict):
39 tool_name = params.get("name")
40 arguments = params.get("arguments", {})
41
42 if tool_name == "database_query":
43 return await self.execute_database_query(arguments)
44 elif tool_name == "api_request":
45 return await self.execute_api_request(arguments)
46
47 return {"error": f"Unknown tool: {tool_name}"}
48
49 async def execute_database_query(self, arguments: dict):
50 """執行資料庫查詢工具"""
51 query = arguments.get("query")
52
53 try:
54 # 實際資料庫操作
55 results = await execute_sql_query(query)
56 return {
57 "content": [
58 {
59 "type": "text",
60 "text": f"查詢結果:{json.dumps(results, ensure_ascii=False)}"
61 }
62 ]
63 }
64 except Exception as e:
65 return {"error": f"資料庫查詢失敗:{str(e)}"}
66
67 def register_tool(self, name: str, description: str, schema: dict):
68 """註冊新工具"""
69 self.capabilities["tools"][name] = {
70 "description": description,
71 "inputSchema": schema
72 }
企業級 MCP 整合
多服務協調架構:
1class MCPOrchestrator:
2 def __init__(self):
3 self.servers = {}
4 self.routing_rules = {}
5
6 def register_server(self, name: str, server: MCPServer, capabilities: list):
7 """註冊 MCP 服務器"""
8 self.servers[name] = server
9
10 for capability in capabilities:
11 if capability not in self.routing_rules:
12 self.routing_rules[capability] = []
13 self.routing_rules[capability].append(name)
14
15 async def route_request(self, capability: str, request: dict):
16 """智能路由請求到適當的服務器"""
17 if capability not in self.routing_rules:
18 return {"error": f"No server available for capability: {capability}"}
19
20 # 負載均衡與故障轉移
21 available_servers = self.routing_rules[capability]
22
23 for server_name in available_servers:
24 try:
25 server = self.servers[server_name]
26 result = await server.handle_request(request)
27
28 if "error" not in result:
29 return result
30
31 except Exception as e:
32 print(f"Server {server_name} failed: {e}")
33 continue
34
35 return {"error": "All servers failed to handle the request"}
36
37# 實際使用範例
38async def setup_enterprise_mcp():
39 orchestrator = MCPOrchestrator()
40
41 # 註冊資料庫服務器
42 db_server = CustomMCPServer("database-service", "1.0.0")
43 db_server.register_tool("query", "Execute SQL queries", {
44 "type": "object",
45 "properties": {
46 "query": {"type": "string", "description": "SQL query to execute"}
47 }
48 })
49
50 orchestrator.register_server("database", db_server, ["data_access"])
51
52 # 註冊 API 服務器
53 api_server = CustomMCPServer("api-service", "1.0.0")
54 api_server.register_tool("http_request", "Make HTTP requests", {
55 "type": "object",
56 "properties": {
57 "url": {"type": "string"},
58 "method": {"type": "string"},
59 "headers": {"type": "object"}
60 }
61 })
62
63 orchestrator.register_server("api", api_server, ["external_integration"])
64
65 return orchestrator
5. 生產級部署與最佳實務
效能監控與可觀測性
代理效能追蹤:
1import time
2import logging
3from functools import wraps
4from typing import Dict, Any
5
6class AgentPerformanceMonitor:
7 def __init__(self):
8 self.metrics = {
9 "task_completion_time": {},
10 "success_rate": {},
11 "resource_usage": {}
12 }
13
14 # 設定日誌
15 logging.basicConfig(level=logging.INFO)
16 self.logger = logging.getLogger(__name__)
17
18 def monitor_agent_performance(self, agent_id: str):
19 """代理效能監控裝飾器"""
20 def decorator(func):
21 @wraps(func)
22 async def wrapper(*args, **kwargs):
23 start_time = time.time()
24
25 try:
26 result = await func(*args, **kwargs)
27
28 # 記錄成功執行
29 execution_time = time.time() - start_time
30 self._record_success(agent_id, execution_time)
31
32 self.logger.info(
33 f"Agent {agent_id} completed task in {execution_time:.2f}s"
34 )
35
36 return result
37
38 except Exception as e:
39 # 記錄失敗
40 self._record_failure(agent_id, str(e))
41 self.logger.error(f"Agent {agent_id} failed: {e}")
42 raise
43
44 return wrapper
45 return decorator
46
47 def _record_success(self, agent_id: str, execution_time: float):
48 """記錄成功執行指標"""
49 if agent_id not in self.metrics["task_completion_time"]:
50 self.metrics["task_completion_time"][agent_id] = []
51
52 self.metrics["task_completion_time"][agent_id].append(execution_time)
53
54 def _record_failure(self, agent_id: str, error_message: str):
55 """記錄失敗案例"""
56 if agent_id not in self.metrics["success_rate"]:
57 self.metrics["success_rate"][agent_id] = {"success": 0, "failure": 0}
58
59 self.metrics["success_rate"][agent_id]["failure"] += 1
60
61 def get_performance_report(self) -> Dict[str, Any]:
62 """生成效能報告"""
63 report = {}
64
65 for agent_id in self.metrics["task_completion_time"]:
66 times = self.metrics["task_completion_time"][agent_id]
67
68 if times:
69 report[agent_id] = {
70 "average_completion_time": sum(times) / len(times),
71 "min_completion_time": min(times),
72 "max_completion_time": max(times),
73 "total_tasks": len(times)
74 }
75
76 return report
77
78# 使用範例
79monitor = AgentPerformanceMonitor()
80
81@monitor.monitor_agent_performance("research-agent")
82async def research_agent_task(query: str):
83 # 代理執行邏輯
84 await asyncio.sleep(1) # 模擬處理時間
85 return f"Research completed for: {query}"
錯誤處理與恢復機制
彈性化錯誤處理:
1from retry import retry
2import asyncio
3
4class RobustAgentExecutor:
5 def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0):
6 self.max_retries = max_retries
7 self.backoff_factor = backoff_factor
8
9 @retry(tries=3, delay=1, backoff=2)
10 async def execute_with_retry(self, agent_func, *args, **kwargs):
11 """帶重試機制的代理執行"""
12 try:
13 return await agent_func(*args, **kwargs)
14 except Exception as e:
15 print(f"Agent execution failed: {e}")
16
17 # 檢查是否為可恢復錯誤
18 if self.is_recoverable_error(e):
19 raise # 觸發重試
20 else:
21 # 不可恢復錯誤,直接失敗
22 raise Exception(f"Non-recoverable error: {e}")
23
24 def is_recoverable_error(self, error: Exception) -> bool:
25 """判斷錯誤是否可恢復"""
26 recoverable_errors = [
27 "timeout",
28 "connection",
29 "rate limit",
30 "temporary",
31 "503",
32 "502"
33 ]
34
35 error_message = str(error).lower()
36 return any(recoverable in error_message for recoverable in recoverable_errors)
37
38 async def execute_with_fallback(self, primary_agent, fallback_agent, *args, **kwargs):
39 """主要代理失敗時的後備執行"""
40 try:
41 return await self.execute_with_retry(primary_agent, *args, **kwargs)
42 except Exception as e:
43 print(f"Primary agent failed, using fallback: {e}")
44 return await self.execute_with_retry(fallback_agent, *args, **kwargs)
6. 安全性與權限管理
代理安全框架
權限控制系統:
1from enum import Enum
2from typing import Set, Dict
3
4class Permission(Enum):
5 READ_DATA = "read_data"
6 WRITE_DATA = "write_data"
7 EXECUTE_COMMANDS = "execute_commands"
8 ACCESS_EXTERNAL_APIs = "access_external_apis"
9 MANAGE_USERS = "manage_users"
10
11class SecurityContext:
12 def __init__(self, agent_id: str, permissions: Set[Permission]):
13 self.agent_id = agent_id
14 self.permissions = permissions
15 self.session_token = self._generate_session_token()
16
17 def has_permission(self, permission: Permission) -> bool:
18 return permission in self.permissions
19
20 def _generate_session_token(self) -> str:
21 import secrets
22 return secrets.token_urlsafe(32)
23
24class SecureAgentDecorator:
25 def __init__(self, security_context: SecurityContext):
26 self.security_context = security_context
27
28 def require_permission(self, permission: Permission):
29 """權限檢查裝飾器"""
30 def decorator(func):
31 @wraps(func)
32 async def wrapper(*args, **kwargs):
33 if not self.security_context.has_permission(permission):
34 raise PermissionError(
35 f"Agent {self.security_context.agent_id} "
36 f"lacks permission: {permission.value}"
37 )
38
39 return await func(*args, **kwargs)
40 return wrapper
41 return decorator
42
43# 安全代理實作範例
44class SecureDataAgent:
45 def __init__(self, security_context: SecurityContext):
46 self.security_context = security_context
47 self.secure_decorator = SecureAgentDecorator(security_context)
48
49 @SecureAgentDecorator.require_permission(Permission.READ_DATA)
50 async def read_sensitive_data(self, data_id: str):
51 """讀取敏感數據"""
52 print(f"Reading data {data_id} with agent {self.security_context.agent_id}")
53 return f"Sensitive data: {data_id}"
54
55 @SecureAgentDecorator.require_permission(Permission.EXECUTE_COMMANDS)
56 async def execute_system_command(self, command: str):
57 """執行系統命令"""
58 if self.is_safe_command(command):
59 print(f"Executing command: {command}")
60 return f"Command executed: {command}"
61 else:
62 raise SecurityError("Potentially dangerous command blocked")
63
64 def is_safe_command(self, command: str) -> bool:
65 """檢查命令是否安全"""
66 dangerous_commands = ["rm -rf", "del", "format", "shutdown"]
67 return not any(dangerous in command.lower() for dangerous in dangerous_commands)
總結
本文深入介紹了多智慧體系統的核心概念與實作:
- 架構設計:Agent 組件、協作模式與工作流程管理
- LangGraph 框架:狀態圖設計、檢查點機制與人工干預
- CrewAI 企業應用:團隊協作、階層管理與業務智能
- MCP 協定:服務器實作、企業整合與多服務協調
- 生產部署:效能監控、錯誤處理與恢復機制
- 安全管理:權限控制、安全框架與風險緩解
下一篇將探討企業級 AI 整合與部署策略,包含雲端平台部署、安全性管理與數據管道架構。
