AI Forward Deployed Engineer 必備技能指南(二):多智慧體系統與框架實戰

前言

多智慧體系統是現代 AI 應用的重要發展方向,能夠處理複雜的企業級任務。作為 AI FDE,掌握多智慧體框架的設計與實作是核心技能之一。本文將深入探討 LangGraph、CrewAI 等主流框架,以及 Model Context Protocol (MCP) 的實際應用。

1. 多智慧體系統核心概念

基礎架構原理

Agent 核心組件:

  • 感知器 (Perception):接收與理解環境信息
  • 決策器 (Decision Making):基於目標與狀態規劃行動
  • 執行器 (Action):與環境互動執行任務
  • 記憶體 (Memory):儲存狀態、經驗與知識

協作模式:

 1from enum import Enum
 2from dataclasses import dataclass
 3from typing import List, Dict, Any
 4
 5class CoordinationPattern(Enum):
 6    SEQUENTIAL = "sequential"      # 順序執行
 7    PARALLEL = "parallel"         # 並行執行
 8    HIERARCHICAL = "hierarchical" # 階層式管理
 9    COLLABORATIVE = "collaborative" # 協作式決策
10
11@dataclass
12class AgentTask:
13    task_id: str
14    description: str
15    agent_id: str
16    dependencies: List[str]
17    priority: int
18    metadata: Dict[str, Any]
19
20class MultiAgentOrchestrator:
21    def __init__(self, coordination_pattern: CoordinationPattern):
22        self.pattern = coordination_pattern
23        self.agents = {}
24        self.task_queue = []
25        
26    def execute_workflow(self, tasks: List[AgentTask]):
27        if self.pattern == CoordinationPattern.SEQUENTIAL:
28            return self._execute_sequential(tasks)
29        elif self.pattern == CoordinationPattern.PARALLEL:
30            return self._execute_parallel(tasks)
31        # 其他模式實作...

2. LangGraph 框架深度實作

狀態圖設計架構

核心概念實作:

 1from typing import TypedDict, Annotated
 2import operator
 3from langgraph.graph import StateGraph, END
 4from langgraph.prebuilt import ToolNode
 5
 6class AgentState(TypedDict):
 7    messages: Annotated[list, operator.add]
 8    current_task: str
 9    completed_tasks: Annotated[list, operator.add]
10    context: dict
11    iteration_count: int
12
13def create_research_workflow():
14    workflow = StateGraph(AgentState)
15    
16    # 定義節點
17    workflow.add_node("planner", planning_agent)
18    workflow.add_node("researcher", research_agent)
19    workflow.add_node("analyzer", analysis_agent)
20    workflow.add_node("synthesizer", synthesis_agent)
21    workflow.add_node("tools", ToolNode(research_tools))
22    
23    # 定義邊與條件
24    workflow.set_entry_point("planner")
25    
26    workflow.add_conditional_edges(
27        "planner",
28        should_continue,
29        {
30            "continue": "researcher",
31            "end": END
32        }
33    )
34    
35    workflow.add_edge("researcher", "tools")
36    workflow.add_edge("tools", "analyzer")
37    workflow.add_edge("analyzer", "synthesizer")
38    workflow.add_edge("synthesizer", "planner")
39    
40    return workflow.compile()
41
42def planning_agent(state: AgentState):
43    """規劃代理:分解任務與分配工作"""
44    messages = state["messages"]
45    current_task = state.get("current_task", "")
46    
47    planning_prompt = f"""
48    基於以下訊息規劃研究任務:
49    {messages[-1]["content"] if messages else ""}
50    
51    當前任務狀態:{current_task}
52    已完成任務:{state.get("completed_tasks", [])}
53    
54    請分析並決定下一步行動:
55    1. 如果需要更多研究,返回具體的研究計劃
56    2. 如果資訊足夠,返回 "完成研究" 
57    """
58    
59    # LLM 呼叫邏輯
60    response = llm.invoke(planning_prompt)
61    
62    return {
63        "messages": [{"role": "planner", "content": response.content}],
64        "current_task": response.content,
65        "iteration_count": state.get("iteration_count", 0) + 1
66    }
67
68def should_continue(state: AgentState) -> str:
69    """決定工作流程是否繼續"""
70    messages = state["messages"]
71    iteration_count = state.get("iteration_count", 0)
72    
73    if iteration_count > 10:
74        return "end"
75    
76    last_message = messages[-1]["content"] if messages else ""
77    if "完成研究" in last_message or "research complete" in last_message.lower():
78        return "end"
79    
80    return "continue"

進階功能實作

檢查點與狀態持久化:

 1from langgraph.checkpoint.sqlite import SqliteSaver
 2
 3# 設定檢查點保存
 4checkpoint_saver = SqliteSaver.from_conn_string(":memory:")
 5
 6# 編譯時加入檢查點功能
 7app = workflow.compile(checkpointer=checkpoint_saver)
 8
 9# 帶狀態恢復的執行
10thread_config = {"configurable": {"thread_id": "research-session-001"}}
11
12# 執行並自動保存狀態
13result = app.invoke(
14    {"messages": [{"role": "user", "content": "研究 AI 在醫療領域的應用"}]},
15    config=thread_config
16)
17
18# 從檢查點恢復並繼續執行
19resumed_result = app.invoke(
20    {"messages": [{"role": "user", "content": "請提供更詳細的分析"}]},
21    config=thread_config
22)

人工干預機制:

 1from langgraph.prebuilt import create_react_agent
 2
 3def create_human_in_loop_agent():
 4    def human_approval_node(state: AgentState):
 5        last_action = state["messages"][-1]
 6        
 7        # 檢查是否需要人工確認
 8        if requires_approval(last_action):
 9            print(f"需要確認以下行動:{last_action['content']}")
10            approval = input("是否繼續? (y/n): ")
11            
12            if approval.lower() != 'y':
13                return {
14                    "messages": [{"role": "system", "content": "行動已被用戶取消"}],
15                    "current_task": "awaiting_user_input"
16                }
17        
18        return state
19    
20    workflow.add_node("human_approval", human_approval_node)
21    workflow.add_edge("analyzer", "human_approval")
22    workflow.add_edge("human_approval", "synthesizer")
23
24def requires_approval(action):
25    """定義需要人工確認的行動類型"""
26    high_risk_actions = [
27        "發送郵件", "修改資料庫", "刪除檔案", 
28        "執行系統指令", "進行付款操作"
29    ]
30    
31    return any(risk_action in action["content"] for risk_action in high_risk_actions)

3. CrewAI 框架企業應用

團隊協作架構

CrewAI 核心實作:

  1from crewai import Agent, Task, Crew
  2from crewai.tools import BaseTool
  3from langchain_openai import ChatOpenAI
  4
  5class DataAnalysisTool(BaseTool):
  6    name: str = "數據分析工具"
  7    description: str = "分析業務數據並產生洞察報告"
  8    
  9    def _run(self, data_query: str) -> str:
 10        # 實際數據分析邏輯
 11        analysis_result = perform_data_analysis(data_query)
 12        return f"數據分析結果:{analysis_result}"
 13
 14class MarketResearchTool(BaseTool):
 15    name: str = "市場研究工具"
 16    description: str = "收集市場趨勢與競爭分析"
 17    
 18    def _run(self, market_segment: str) -> str:
 19        market_data = fetch_market_data(market_segment)
 20        return f"市場研究報告:{market_data}"
 21
 22def create_business_intelligence_crew():
 23    # 初始化 LLM
 24    llm = ChatOpenAI(model="gpt-4")
 25    
 26    # 定義專業代理
 27    data_analyst = Agent(
 28        role="資深數據分析師",
 29        goal="分析業務數據並識別關鍵趨勢與機會",
 30        backstory="""
 31        您是一位擁有 10 年經驗的資深數據分析師,
 32        專長於商業智能與數據挖掘,能夠從複雜數據中
 33        提取有價值的商業洞察。
 34        """,
 35        tools=[DataAnalysisTool()],
 36        llm=llm,
 37        verbose=True
 38    )
 39    
 40    market_researcher = Agent(
 41        role="市場研究專家",
 42        goal="提供深度市場分析與競爭情報",
 43        backstory="""
 44        您是市場研究領域的專家,具備敏銳的市場洞察力,
 45        能夠識別新興趨勢並評估競爭威脅與機會。
 46        """,
 47        tools=[MarketResearchTool()],
 48        llm=llm,
 49        verbose=True
 50    )
 51    
 52    strategy_consultant = Agent(
 53        role="策略顧問",
 54        goal="整合分析結果並制定可執行的業務策略",
 55        backstory="""
 56        您是一位經驗豐富的策略顧問,擅長將數據洞察
 57        轉化為具體的業務行動計劃與投資建議。
 58        """,
 59        llm=llm,
 60        verbose=True
 61    )
 62    
 63    # 定義任務流程
 64    data_analysis_task = Task(
 65        description="""
 66        分析過去 12 個月的業務數據:
 67        1. 識別收入趨勢與季節性模式
 68        2. 分析客戶行為變化
 69        3. 評估產品效能表現
 70        4. 提供數據驅動的洞察
 71        """,
 72        expected_output="詳細的數據分析報告,包含關鍵指標與趨勢分析",
 73        agent=data_analyst
 74    )
 75    
 76    market_research_task = Task(
 77        description="""
 78        進行全面市場研究:
 79        1. 分析市場規模與成長潛力
 80        2. 識別主要競爭對手與其策略
 81        3. 評估新興技術趨勢影響
 82        4. 分析客戶需求變化
 83        """,
 84        expected_output="市場研究報告,包含競爭分析與機會評估",
 85        agent=market_researcher,
 86        dependencies=[data_analysis_task]
 87    )
 88    
 89    strategy_formulation_task = Task(
 90        description="""
 91        基於數據分析與市場研究結果制定策略:
 92        1. 整合所有分析結果
 93        2. 識別核心業務機會
 94        3. 制定具體行動計劃
 95        4. 設定關鍵績效指標 (KPI)
 96        """,
 97        expected_output="完整的業務策略文件,包含執行計劃與成功指標",
 98        agent=strategy_consultant,
 99        dependencies=[data_analysis_task, market_research_task]
100    )
101    
102    # 組建團隊
103    crew = Crew(
104        agents=[data_analyst, market_researcher, strategy_consultant],
105        tasks=[data_analysis_task, market_research_task, strategy_formulation_task],
106        verbose=True,
107        process="sequential"  # 或 "hierarchical" 用於階層式管理
108    )
109    
110    return crew
111
112# 執行業務智能分析
113def run_business_intelligence_analysis():
114    crew = create_business_intelligence_crew()
115    
116    result = crew.kickoff(inputs={
117        "business_context": "電子商務平台",
118        "analysis_period": "2024年度",
119        "focus_areas": ["客戶增長", "產品優化", "市場擴張"]
120    })
121    
122    return result

階層式代理管理

Manager Agent 實作:

 1from crewai.process import Process
 2
 3def create_hierarchical_crew():
 4    # 管理者代理
 5    manager = Agent(
 6        role="專案經理",
 7        goal="協調團隊工作並確保專案按時交付",
 8        backstory="""
 9        您是一位經驗豐富的專案經理,擅長團隊協調與資源分配,
10        能夠識別潛在風險並制定緩解策略。
11        """,
12        llm=llm,
13        allow_delegation=True,  # 允許委派任務
14        max_delegation_depth=2  # 最大委派層級
15    )
16    
17    # 專業執行代理
18    technical_lead = Agent(
19        role="技術主管",
20        goal="負責技術決策與架構設計",
21        backstory="您是技術團隊的領導者,負責確保技術方案的可行性。",
22        llm=llm
23    )
24    
25    quality_assurance = Agent(
26        role="品質保證專員",
27        goal="確保交付成果符合品質標準",
28        backstory="您專注於品質控制與測試驗證。",
29        llm=llm
30    )
31    
32    crew = Crew(
33        agents=[manager, technical_lead, quality_assurance],
34        tasks=[project_planning_task, implementation_task, quality_review_task],
35        process=Process.hierarchical,  # 階層式流程
36        manager_llm=llm  # 管理者使用的 LLM
37    )
38    
39    return crew

4. Model Context Protocol (MCP) 實作

MCP 核心架構

協定實作框架:

 1from typing import Protocol, runtime_checkable
 2import asyncio
 3import json
 4
 5@runtime_checkable
 6class MCPServer(Protocol):
 7    async def handle_request(self, request: dict) -> dict:
 8        """處理 MCP 請求"""
 9        pass
10    
11    async def initialize(self, capabilities: dict) -> dict:
12        """初始化服務器能力"""
13        pass
14
15class CustomMCPServer:
16    def __init__(self, name: str, version: str):
17        self.name = name
18        self.version = version
19        self.capabilities = {
20            "tools": {},
21            "resources": {},
22            "prompts": {}
23        }
24    
25    async def handle_request(self, request: dict) -> dict:
26        method = request.get("method")
27        params = request.get("params", {})
28        
29        if method == "tools/call":
30            return await self.handle_tool_call(params)
31        elif method == "resources/read":
32            return await self.handle_resource_read(params)
33        elif method == "prompts/get":
34            return await self.handle_prompt_get(params)
35        else:
36            return {"error": f"Unsupported method: {method}"}
37    
38    async def handle_tool_call(self, params: dict):
39        tool_name = params.get("name")
40        arguments = params.get("arguments", {})
41        
42        if tool_name == "database_query":
43            return await self.execute_database_query(arguments)
44        elif tool_name == "api_request":
45            return await self.execute_api_request(arguments)
46        
47        return {"error": f"Unknown tool: {tool_name}"}
48    
49    async def execute_database_query(self, arguments: dict):
50        """執行資料庫查詢工具"""
51        query = arguments.get("query")
52        
53        try:
54            # 實際資料庫操作
55            results = await execute_sql_query(query)
56            return {
57                "content": [
58                    {
59                        "type": "text",
60                        "text": f"查詢結果:{json.dumps(results, ensure_ascii=False)}"
61                    }
62                ]
63            }
64        except Exception as e:
65            return {"error": f"資料庫查詢失敗:{str(e)}"}
66    
67    def register_tool(self, name: str, description: str, schema: dict):
68        """註冊新工具"""
69        self.capabilities["tools"][name] = {
70            "description": description,
71            "inputSchema": schema
72        }

企業級 MCP 整合

多服務協調架構:

 1class MCPOrchestrator:
 2    def __init__(self):
 3        self.servers = {}
 4        self.routing_rules = {}
 5    
 6    def register_server(self, name: str, server: MCPServer, capabilities: list):
 7        """註冊 MCP 服務器"""
 8        self.servers[name] = server
 9        
10        for capability in capabilities:
11            if capability not in self.routing_rules:
12                self.routing_rules[capability] = []
13            self.routing_rules[capability].append(name)
14    
15    async def route_request(self, capability: str, request: dict):
16        """智能路由請求到適當的服務器"""
17        if capability not in self.routing_rules:
18            return {"error": f"No server available for capability: {capability}"}
19        
20        # 負載均衡與故障轉移
21        available_servers = self.routing_rules[capability]
22        
23        for server_name in available_servers:
24            try:
25                server = self.servers[server_name]
26                result = await server.handle_request(request)
27                
28                if "error" not in result:
29                    return result
30                    
31            except Exception as e:
32                print(f"Server {server_name} failed: {e}")
33                continue
34        
35        return {"error": "All servers failed to handle the request"}
36
37# 實際使用範例
38async def setup_enterprise_mcp():
39    orchestrator = MCPOrchestrator()
40    
41    # 註冊資料庫服務器
42    db_server = CustomMCPServer("database-service", "1.0.0")
43    db_server.register_tool("query", "Execute SQL queries", {
44        "type": "object",
45        "properties": {
46            "query": {"type": "string", "description": "SQL query to execute"}
47        }
48    })
49    
50    orchestrator.register_server("database", db_server, ["data_access"])
51    
52    # 註冊 API 服務器
53    api_server = CustomMCPServer("api-service", "1.0.0")
54    api_server.register_tool("http_request", "Make HTTP requests", {
55        "type": "object",
56        "properties": {
57            "url": {"type": "string"},
58            "method": {"type": "string"},
59            "headers": {"type": "object"}
60        }
61    })
62    
63    orchestrator.register_server("api", api_server, ["external_integration"])
64    
65    return orchestrator

5. 生產級部署與最佳實務

效能監控與可觀測性

代理效能追蹤:

 1import time
 2import logging
 3from functools import wraps
 4from typing import Dict, Any
 5
 6class AgentPerformanceMonitor:
 7    def __init__(self):
 8        self.metrics = {
 9            "task_completion_time": {},
10            "success_rate": {},
11            "resource_usage": {}
12        }
13        
14        # 設定日誌
15        logging.basicConfig(level=logging.INFO)
16        self.logger = logging.getLogger(__name__)
17    
18    def monitor_agent_performance(self, agent_id: str):
19        """代理效能監控裝飾器"""
20        def decorator(func):
21            @wraps(func)
22            async def wrapper(*args, **kwargs):
23                start_time = time.time()
24                
25                try:
26                    result = await func(*args, **kwargs)
27                    
28                    # 記錄成功執行
29                    execution_time = time.time() - start_time
30                    self._record_success(agent_id, execution_time)
31                    
32                    self.logger.info(
33                        f"Agent {agent_id} completed task in {execution_time:.2f}s"
34                    )
35                    
36                    return result
37                    
38                except Exception as e:
39                    # 記錄失敗
40                    self._record_failure(agent_id, str(e))
41                    self.logger.error(f"Agent {agent_id} failed: {e}")
42                    raise
43                    
44            return wrapper
45        return decorator
46    
47    def _record_success(self, agent_id: str, execution_time: float):
48        """記錄成功執行指標"""
49        if agent_id not in self.metrics["task_completion_time"]:
50            self.metrics["task_completion_time"][agent_id] = []
51        
52        self.metrics["task_completion_time"][agent_id].append(execution_time)
53        
54    def _record_failure(self, agent_id: str, error_message: str):
55        """記錄失敗案例"""
56        if agent_id not in self.metrics["success_rate"]:
57            self.metrics["success_rate"][agent_id] = {"success": 0, "failure": 0}
58        
59        self.metrics["success_rate"][agent_id]["failure"] += 1
60    
61    def get_performance_report(self) -> Dict[str, Any]:
62        """生成效能報告"""
63        report = {}
64        
65        for agent_id in self.metrics["task_completion_time"]:
66            times = self.metrics["task_completion_time"][agent_id]
67            
68            if times:
69                report[agent_id] = {
70                    "average_completion_time": sum(times) / len(times),
71                    "min_completion_time": min(times),
72                    "max_completion_time": max(times),
73                    "total_tasks": len(times)
74                }
75        
76        return report
77
78# 使用範例
79monitor = AgentPerformanceMonitor()
80
81@monitor.monitor_agent_performance("research-agent")
82async def research_agent_task(query: str):
83    # 代理執行邏輯
84    await asyncio.sleep(1)  # 模擬處理時間
85    return f"Research completed for: {query}"

錯誤處理與恢復機制

彈性化錯誤處理:

 1from retry import retry
 2import asyncio
 3
 4class RobustAgentExecutor:
 5    def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0):
 6        self.max_retries = max_retries
 7        self.backoff_factor = backoff_factor
 8    
 9    @retry(tries=3, delay=1, backoff=2)
10    async def execute_with_retry(self, agent_func, *args, **kwargs):
11        """帶重試機制的代理執行"""
12        try:
13            return await agent_func(*args, **kwargs)
14        except Exception as e:
15            print(f"Agent execution failed: {e}")
16            
17            # 檢查是否為可恢復錯誤
18            if self.is_recoverable_error(e):
19                raise  # 觸發重試
20            else:
21                # 不可恢復錯誤,直接失敗
22                raise Exception(f"Non-recoverable error: {e}")
23    
24    def is_recoverable_error(self, error: Exception) -> bool:
25        """判斷錯誤是否可恢復"""
26        recoverable_errors = [
27            "timeout",
28            "connection",
29            "rate limit",
30            "temporary",
31            "503",
32            "502"
33        ]
34        
35        error_message = str(error).lower()
36        return any(recoverable in error_message for recoverable in recoverable_errors)
37    
38    async def execute_with_fallback(self, primary_agent, fallback_agent, *args, **kwargs):
39        """主要代理失敗時的後備執行"""
40        try:
41            return await self.execute_with_retry(primary_agent, *args, **kwargs)
42        except Exception as e:
43            print(f"Primary agent failed, using fallback: {e}")
44            return await self.execute_with_retry(fallback_agent, *args, **kwargs)

6. 安全性與權限管理

代理安全框架

權限控制系統:

 1from enum import Enum
 2from typing import Set, Dict
 3
 4class Permission(Enum):
 5    READ_DATA = "read_data"
 6    WRITE_DATA = "write_data"
 7    EXECUTE_COMMANDS = "execute_commands"
 8    ACCESS_EXTERNAL_APIs = "access_external_apis"
 9    MANAGE_USERS = "manage_users"
10
11class SecurityContext:
12    def __init__(self, agent_id: str, permissions: Set[Permission]):
13        self.agent_id = agent_id
14        self.permissions = permissions
15        self.session_token = self._generate_session_token()
16    
17    def has_permission(self, permission: Permission) -> bool:
18        return permission in self.permissions
19    
20    def _generate_session_token(self) -> str:
21        import secrets
22        return secrets.token_urlsafe(32)
23
24class SecureAgentDecorator:
25    def __init__(self, security_context: SecurityContext):
26        self.security_context = security_context
27    
28    def require_permission(self, permission: Permission):
29        """權限檢查裝飾器"""
30        def decorator(func):
31            @wraps(func)
32            async def wrapper(*args, **kwargs):
33                if not self.security_context.has_permission(permission):
34                    raise PermissionError(
35                        f"Agent {self.security_context.agent_id} "
36                        f"lacks permission: {permission.value}"
37                    )
38                
39                return await func(*args, **kwargs)
40            return wrapper
41        return decorator
42
43# 安全代理實作範例
44class SecureDataAgent:
45    def __init__(self, security_context: SecurityContext):
46        self.security_context = security_context
47        self.secure_decorator = SecureAgentDecorator(security_context)
48    
49    @SecureAgentDecorator.require_permission(Permission.READ_DATA)
50    async def read_sensitive_data(self, data_id: str):
51        """讀取敏感數據"""
52        print(f"Reading data {data_id} with agent {self.security_context.agent_id}")
53        return f"Sensitive data: {data_id}"
54    
55    @SecureAgentDecorator.require_permission(Permission.EXECUTE_COMMANDS)
56    async def execute_system_command(self, command: str):
57        """執行系統命令"""
58        if self.is_safe_command(command):
59            print(f"Executing command: {command}")
60            return f"Command executed: {command}"
61        else:
62            raise SecurityError("Potentially dangerous command blocked")
63    
64    def is_safe_command(self, command: str) -> bool:
65        """檢查命令是否安全"""
66        dangerous_commands = ["rm -rf", "del", "format", "shutdown"]
67        return not any(dangerous in command.lower() for dangerous in dangerous_commands)

總結

本文深入介紹了多智慧體系統的核心概念與實作:

  1. 架構設計:Agent 組件、協作模式與工作流程管理
  2. LangGraph 框架:狀態圖設計、檢查點機制與人工干預
  3. CrewAI 企業應用:團隊協作、階層管理與業務智能
  4. MCP 協定:服務器實作、企業整合與多服務協調
  5. 生產部署:效能監控、錯誤處理與恢復機制
  6. 安全管理:權限控制、安全框架與風險緩解

下一篇將探討企業級 AI 整合與部署策略,包含雲端平台部署、安全性管理與數據管道架構。

Yen

Yen

Yen