構建生產級的 LangGraph AI 後端不僅僅是寫代碼,更需要從系統層面考慮可擴展性、可靠性、可追蹤性。本文介紹如何設計健壯的 LangGraph 架構,從單機應用進化到分佈式系統。
單體應用架構
最簡單的架構
用戶請求
↓
[FastAPI 服務]
↓
[LangGraph 工作流]
├→ [Agent 1]
├→ [Agent 2]
└→ [Agent 3]
↓
[數據庫]
1from fastapi import FastAPI
2from langgraph.graph import StateGraph, START, END
3from dataclasses import dataclass
4import json
5
6app = FastAPI()
7
8@dataclass
9class RequestState:
10 user_input: str
11 agent_results: dict = None
12 final_output: str = None
13
14def build_simple_workflow():
15 """最簡單的工作流"""
16 workflow = StateGraph(RequestState)
17
18 def process_agent(state: RequestState):
19 state.agent_results = {
20 "analysis": f"分析: {state.user_input}",
21 "summary": f"摘要: {state.user_input[:20]}..."
22 }
23 return state
24
25 workflow.add_node("process", process_agent)
26 workflow.add_edge(START, "process")
27 workflow.add_edge("process", END)
28 return workflow.compile()
29
30workflow = build_simple_workflow()
31
32@app.post("/process")
33async def process_request(request: dict):
34 state = RequestState(user_input=request["input"])
35 result = workflow.invoke(state)
36 return {"output": result.agent_results}
問題:單點故障
┌─────────────────────────────────────┐
│ FastAPI (單點) │
│ ┌────────────────────────────────┐ │
│ │ LangGraph 工作流 (單進程) │ │
│ └────────────────────────────────┘ │
└─────────────────────────────────────┘
故障:
- 服務崩潰 → 所有請求失敗
- 模型加載 → 啟動時間長
- 內存限制 → 無法擴展
分層架構
架構演進
客戶端層
↓
[API Gateway / Load Balancer]
↓
服務層
├→ [FastAPI Service 1]
├→ [FastAPI Service 2]
└→ [FastAPI Service 3]
↓
Worker 層
├→ [LangGraph Worker 1]
├→ [LangGraph Worker 2]
└→ [LangGraph Worker 3]
↓
數據層
├→ [PostgreSQL]
├→ [Redis Cache]
└→ [Vector DB]
實現分層
1# Layer 1: API 層
2from fastapi import FastAPI, BackgroundTasks
3from pydantic import BaseModel
4import uuid
5
6app = FastAPI()
7
8class ProcessRequest(BaseModel):
9 input: str
10 priority: str = "normal"
11
12@app.post("/tasks")
13async def create_task(request: ProcessRequest, bg_tasks: BackgroundTasks):
14 """創建異步任務"""
15 task_id = str(uuid.uuid4())
16
17 # 立即返回,後台處理
18 bg_tasks.add_task(process_in_worker, task_id, request.input)
19
20 return {
21 "task_id": task_id,
22 "status": "queued"
23 }
24
25@app.get("/tasks/{task_id}")
26async def get_task_status(task_id: str):
27 """查詢任務狀態"""
28 result = redis_client.get(f"task:{task_id}")
29 return json.loads(result) if result else {"status": "not_found"}
30
31# Layer 2: Worker 層
32def process_in_worker(task_id: str, user_input: str):
33 """在 Worker 中處理"""
34 state = RequestState(user_input=user_input)
35 result = workflow.invoke(state)
36
37 # 保存結果
38 redis_client.setex(
39 f"task:{task_id}",
40 3600, # 1 小時過期
41 json.dumps({
42 "status": "completed",
43 "result": result.agent_results,
44 "timestamp": datetime.now().isoformat()
45 })
46 )
Agent 拓撲設計
1. 線性拓撲(Simple Chain)
用戶輸入 → [Agent A] → [Agent B] → [Agent C] → 輸出
優點:
- 簡單、易調試
- 各 Agent 結果清晰
缺點:
- 無法並行
- 前面失敗影響後續
1workflow = StateGraph(State)
2workflow.add_node("agent_a", agent_a_node)
3workflow.add_node("agent_b", agent_b_node)
4workflow.add_node("agent_c", agent_c_node)
5
6workflow.add_edge(START, "agent_a")
7workflow.add_edge("agent_a", "agent_b")
8workflow.add_edge("agent_b", "agent_c")
9workflow.add_edge("agent_c", END)
2. 並行拓撲(Fan-out/Fan-in)
┌→ [Agent A1] ──┐
輸入 → [分配器] ├→ [Agent A2] ──┤ → [合並] → 輸出
└→ [Agent A3] ──┘
優點:
- 充分利用資源
- 加速處理
缺點:
- 複雜度高
- 需要合併邏輯
1def distribute_work(state: State):
2 """分配工作給多個 Agent"""
3 return {
4 "task1": process_task1(state),
5 "task2": process_task2(state),
6 "task3": process_task3(state)
7 }
8
9def merge_results(state: State):
10 """合併結果"""
11 return {
12 "final": combine(state.task1, state.task2, state.task3)
13 }
14
15workflow = StateGraph(State)
16workflow.add_node("distribute", distribute_work)
17workflow.add_node("merge", merge_results)
18workflow.add_edge(START, "distribute")
19workflow.add_edge("distribute", "merge")
20workflow.add_edge("merge", END)
3. 循環拓撲(Agentic Loop)
┌─────────────────────┐
│ ↓
輸入 → [Decision Agent] → [執行 Agent]
↑ │
└─────────────────────┘
Agent 決定是否繼續迴圈
1def decision_agent(state: State) -> str:
2 """決定是否繼續"""
3 if should_continue(state):
4 return "execute"
5 else:
6 return "end"
7
8workflow.add_conditional_edges(
9 "decide",
10 decision_agent,
11 {
12 "execute": "execute",
13 "end": END
14 }
15)
16
17workflow.add_edge("execute", "decide")
4. 樹形拓撲(Tree Structure)
[根 Agent]
│
┌───────────────┼───────────────┐
↓ ↓ ↓
[分類 Agent] [驗證 Agent] [路由 Agent]
│ │ │
┌───┴───┐ ┌───┴───┐ ┌──┴───┐
↓ ↓ ↓ ↓ ↓ ↓
[A1] [A2] [B1] [B2] [C1] [C2]
1def build_tree_workflow():
2 workflow = StateGraph(State)
3
4 # 第一層
5 workflow.add_node("root", root_agent)
6 workflow.add_edge(START, "root")
7
8 # 第二層
9 workflow.add_node("classify", classify_agent)
10 workflow.add_node("validate", validate_agent)
11 workflow.add_node("route", route_agent)
12
13 # 連接
14 workflow.add_edge("root", "classify")
15 workflow.add_edge("root", "validate")
16 workflow.add_edge("root", "route")
17
18 # 第三層...
19 return workflow.compile()
數據流架構
狀態管理模式
1from typing import Annotated
2from dataclasses import dataclass, field
3
4@dataclass
5class DistributedState:
6 """分佈式狀態設計"""
7 # 请求信息(不變)
8 request_id: str
9 user_id: str
10 created_at: str
11
12 # 处理中間狀態(易變)
13 current_agent: str
14 processing_history: list[dict] = field(default_factory=list)
15
16 # 結果(最終)
17 results: dict = field(default_factory=dict)
18
19 # 元數據
20 metadata: dict = field(default_factory=dict)
21
22# 狀態分區策略
23# 用於不同的 Worker 處理
24def partition_state(state: DistributedState) -> dict:
25 """分割狀態以支持分佈式處理"""
26 return {
27 "immutable": {
28 "request_id": state.request_id,
29 "user_id": state.user_id,
30 },
31 "mutable": {
32 "current_agent": state.current_agent,
33 "processing_history": state.processing_history,
34 },
35 "results": state.results
36 }
數據持久化模式
1from sqlalchemy import create_engine, Column, String, JSON
2from sqlalchemy.ext.declarative import declarative_base
3from sqlalchemy.orm import sessionmaker
4
5Base = declarative_base()
6
7class WorkflowExecution(Base):
8 """工作流執行記錄"""
9 __tablename__ = "workflow_executions"
10
11 execution_id = Column(String, primary_key=True)
12 request_id = Column(String)
13 current_state = Column(JSON) # 序列化的 State
14 step_history = Column(JSON) # 每個步驟的結果
15 status = Column(String) # queued, running, completed, failed
16 created_at = Column(String)
17 updated_at = Column(String)
18
19# 保存進度
20def save_checkpoint(execution_id: str, state: DistributedState):
21 """保存檢查點"""
22 session = SessionLocal()
23
24 execution = session.query(WorkflowExecution).filter(
25 WorkflowExecution.execution_id == execution_id
26 ).first()
27
28 if execution:
29 execution.current_state = state.dict()
30 execution.updated_at = datetime.now().isoformat()
31 session.commit()
32
33# 恢復進度
34def restore_from_checkpoint(execution_id: str) -> DistributedState:
35 """從檢查點恢復"""
36 session = SessionLocal()
37 execution = session.query(WorkflowExecution).filter(
38 WorkflowExecution.execution_id == execution_id
39 ).first()
40
41 if execution:
42 return DistributedState(**execution.current_state)
43 return None
錯誤恢復架構
重試策略
1from tenacity import retry, stop_after_attempt, wait_exponential
2
3class ResilientAgent:
4 @retry(
5 stop=stop_after_attempt(3),
6 wait=wait_exponential(multiplier=1, min=2, max=10)
7 )
8 def execute(self, state: State) -> State:
9 """自動重試,指數退避"""
10 try:
11 return self.process(state)
12 except Exception as e:
13 state.errors.append({
14 "error": str(e),
15 "timestamp": datetime.now().isoformat()
16 })
17 raise
降級策略
1def fallback_agent(state: State) -> State:
2 """降級到簡化版本"""
3 try:
4 return full_featured_agent(state)
5 except TimeoutError:
6 # 降級到快速版本
7 return fast_fallback_agent(state)
8 except Exception:
9 # 最後降級到緩存結果
10 return use_cached_result(state)
監控和可觀測性架構
嵌入式監控
1from datetime import datetime
2import logging
3
4class ObservableWorkflow:
5 def __init__(self):
6 self.logger = logging.getLogger(__name__)
7
8 def instrument_node(self, node_name: str):
9 """為節點添加監控"""
10 def wrapper(func):
11 def inner(state):
12 start_time = datetime.now()
13
14 try:
15 result = func(state)
16 duration = (datetime.now() - start_time).total_seconds()
17
18 self.logger.info(f"Node {node_name} completed", extra={
19 "node": node_name,
20 "duration_ms": duration * 1000,
21 "status": "success"
22 })
23
24 return result
25 except Exception as e:
26 duration = (datetime.now() - start_time).total_seconds()
27
28 self.logger.error(f"Node {node_name} failed", extra={
29 "node": node_name,
30 "duration_ms": duration * 1000,
31 "error": str(e),
32 "status": "failed"
33 })
34 raise
35
36 return inner
37 return wrapper
38
39# 使用
40observable = ObservableWorkflow()
41
42@observable.instrument_node("analysis")
43def analysis_node(state: State) -> State:
44 return state
指標收集
1from prometheus_client import Counter, Histogram, Gauge
2
3# 定義指標
4workflow_executions = Counter(
5 'workflow_executions_total',
6 'Total workflow executions',
7 ['status']
8)
9
10workflow_duration = Histogram(
11 'workflow_duration_seconds',
12 'Workflow duration'
13)
14
15active_workflows = Gauge(
16 'active_workflows',
17 'Active workflows'
18)
19
20# 記錄指標
21def track_workflow(workflow_func):
22 def wrapper(*args, **kwargs):
23 active_workflows.inc()
24 start = datetime.now()
25
26 try:
27 result = workflow_func(*args, **kwargs)
28 workflow_executions.labels(status='success').inc()
29 return result
30 except Exception as e:
31 workflow_executions.labels(status='failure').inc()
32 raise
33 finally:
34 duration = (datetime.now() - start).total_seconds()
35 workflow_duration.observe(duration)
36 active_workflows.dec()
37
38 return wrapper
架構選擇指南
| 場景 | 架構推薦 | 原因 |
|---|---|---|
| MVP / 原型 | 單體 | 快速驗證 |
| 低流量應用 | 單體 + 緩存 | 簡單可靠 |
| 中等流量 | 分層 | 平衡複雜度和性能 |
| 高流量 | 微服務 | 獨立擴展 |
| 複雜工作流 | 樹形/循環拓撲 | 滿足業務邏輯 |
總結
設計 LangGraph AI 後端的關鍵原則:
- 分層:API 層、服務層、Worker 層、數據層
- Agent 拓撲:根據業務邏輯選擇合適的結構
- 狀態管理:清晰的狀態定義和持久化
- 容錯:重試、降級、熔斷
- 可觀測:完整的日誌、指標、追蹤
好的架構設計能讓系統在流量增長時平穩擴展,故障時快速恢復。
