LangGraph AI 後端架構設計模式:從單體到分佈式

構建生產級的 LangGraph AI 後端不僅僅是寫代碼,更需要從系統層面考慮可擴展性、可靠性、可追蹤性。本文介紹如何設計健壯的 LangGraph 架構,從單機應用進化到分佈式系統。


單體應用架構

最簡單的架構

用戶請求
    ↓
[FastAPI 服務]
    ↓
[LangGraph 工作流]
    ├→ [Agent 1]
    ├→ [Agent 2]
    └→ [Agent 3]
    ↓
[數據庫]
 1from fastapi import FastAPI
 2from langgraph.graph import StateGraph, START, END
 3from dataclasses import dataclass
 4import json
 5
 6app = FastAPI()
 7
 8@dataclass
 9class RequestState:
10    user_input: str
11    agent_results: dict = None
12    final_output: str = None
13
14def build_simple_workflow():
15    """最簡單的工作流"""
16    workflow = StateGraph(RequestState)
17    
18    def process_agent(state: RequestState):
19        state.agent_results = {
20            "analysis": f"分析: {state.user_input}",
21            "summary": f"摘要: {state.user_input[:20]}..."
22        }
23        return state
24    
25    workflow.add_node("process", process_agent)
26    workflow.add_edge(START, "process")
27    workflow.add_edge("process", END)
28    return workflow.compile()
29
30workflow = build_simple_workflow()
31
32@app.post("/process")
33async def process_request(request: dict):
34    state = RequestState(user_input=request["input"])
35    result = workflow.invoke(state)
36    return {"output": result.agent_results}

問題:單點故障

┌─────────────────────────────────────┐
│      FastAPI (單點)                   │
│  ┌────────────────────────────────┐  │
│  │ LangGraph 工作流 (單進程)       │  │
│  └────────────────────────────────┘  │
└─────────────────────────────────────┘

故障:
- 服務崩潰 → 所有請求失敗
- 模型加載 → 啟動時間長
- 內存限制 → 無法擴展

分層架構

架構演進

客戶端層
    ↓
[API Gateway / Load Balancer]
    ↓
服務層
    ├→ [FastAPI Service 1]
    ├→ [FastAPI Service 2]
    └→ [FastAPI Service 3]
    ↓
Worker 層
    ├→ [LangGraph Worker 1]
    ├→ [LangGraph Worker 2]
    └→ [LangGraph Worker 3]
    ↓
數據層
    ├→ [PostgreSQL]
    ├→ [Redis Cache]
    └→ [Vector DB]

實現分層

 1# Layer 1: API 層
 2from fastapi import FastAPI, BackgroundTasks
 3from pydantic import BaseModel
 4import uuid
 5
 6app = FastAPI()
 7
 8class ProcessRequest(BaseModel):
 9    input: str
10    priority: str = "normal"
11
12@app.post("/tasks")
13async def create_task(request: ProcessRequest, bg_tasks: BackgroundTasks):
14    """創建異步任務"""
15    task_id = str(uuid.uuid4())
16    
17    # 立即返回,後台處理
18    bg_tasks.add_task(process_in_worker, task_id, request.input)
19    
20    return {
21        "task_id": task_id,
22        "status": "queued"
23    }
24
25@app.get("/tasks/{task_id}")
26async def get_task_status(task_id: str):
27    """查詢任務狀態"""
28    result = redis_client.get(f"task:{task_id}")
29    return json.loads(result) if result else {"status": "not_found"}
30
31# Layer 2: Worker 層
32def process_in_worker(task_id: str, user_input: str):
33    """在 Worker 中處理"""
34    state = RequestState(user_input=user_input)
35    result = workflow.invoke(state)
36    
37    # 保存結果
38    redis_client.setex(
39        f"task:{task_id}",
40        3600,  # 1 小時過期
41        json.dumps({
42            "status": "completed",
43            "result": result.agent_results,
44            "timestamp": datetime.now().isoformat()
45        })
46    )

Agent 拓撲設計

1. 線性拓撲(Simple Chain)

用戶輸入 → [Agent A] → [Agent B] → [Agent C] → 輸出

優點:
- 簡單、易調試
- 各 Agent 結果清晰

缺點:
- 無法並行
- 前面失敗影響後續
1workflow = StateGraph(State)
2workflow.add_node("agent_a", agent_a_node)
3workflow.add_node("agent_b", agent_b_node)
4workflow.add_node("agent_c", agent_c_node)
5
6workflow.add_edge(START, "agent_a")
7workflow.add_edge("agent_a", "agent_b")
8workflow.add_edge("agent_b", "agent_c")
9workflow.add_edge("agent_c", END)

2. 並行拓撲(Fan-out/Fan-in)

                ┌→ [Agent A1] ──┐
輸入 → [分配器] ├→ [Agent A2] ──┤ → [合並] → 輸出
                └→ [Agent A3] ──┘

優點:
- 充分利用資源
- 加速處理

缺點:
- 複雜度高
- 需要合併邏輯
 1def distribute_work(state: State):
 2    """分配工作給多個 Agent"""
 3    return {
 4        "task1": process_task1(state),
 5        "task2": process_task2(state),
 6        "task3": process_task3(state)
 7    }
 8
 9def merge_results(state: State):
10    """合併結果"""
11    return {
12        "final": combine(state.task1, state.task2, state.task3)
13    }
14
15workflow = StateGraph(State)
16workflow.add_node("distribute", distribute_work)
17workflow.add_node("merge", merge_results)
18workflow.add_edge(START, "distribute")
19workflow.add_edge("distribute", "merge")
20workflow.add_edge("merge", END)

3. 循環拓撲(Agentic Loop)

    ┌─────────────────────┐
    │                     ↓
輸入 → [Decision Agent] → [執行 Agent] 
    ↑                     │
    └─────────────────────┘

Agent 決定是否繼續迴圈
 1def decision_agent(state: State) -> str:
 2    """決定是否繼續"""
 3    if should_continue(state):
 4        return "execute"
 5    else:
 6        return "end"
 7
 8workflow.add_conditional_edges(
 9    "decide",
10    decision_agent,
11    {
12        "execute": "execute",
13        "end": END
14    }
15)
16
17workflow.add_edge("execute", "decide")

4. 樹形拓撲(Tree Structure)

                    [根 Agent]
                        │
        ┌───────────────┼───────────────┐
        ↓               ↓               ↓
    [分類 Agent]    [驗證 Agent]    [路由 Agent]
        │               │               │
    ┌───┴───┐       ┌───┴───┐       ┌──┴───┐
    ↓       ↓       ↓       ↓       ↓      ↓
 [A1]    [A2]    [B1]    [B2]    [C1]   [C2]
 1def build_tree_workflow():
 2    workflow = StateGraph(State)
 3    
 4    # 第一層
 5    workflow.add_node("root", root_agent)
 6    workflow.add_edge(START, "root")
 7    
 8    # 第二層
 9    workflow.add_node("classify", classify_agent)
10    workflow.add_node("validate", validate_agent)
11    workflow.add_node("route", route_agent)
12    
13    # 連接
14    workflow.add_edge("root", "classify")
15    workflow.add_edge("root", "validate")
16    workflow.add_edge("root", "route")
17    
18    # 第三層...
19    return workflow.compile()

數據流架構

狀態管理模式

 1from typing import Annotated
 2from dataclasses import dataclass, field
 3
 4@dataclass
 5class DistributedState:
 6    """分佈式狀態設計"""
 7    # 请求信息(不變)
 8    request_id: str
 9    user_id: str
10    created_at: str
11    
12    # 处理中間狀態(易變)
13    current_agent: str
14    processing_history: list[dict] = field(default_factory=list)
15    
16    # 結果(最終)
17    results: dict = field(default_factory=dict)
18    
19    # 元數據
20    metadata: dict = field(default_factory=dict)
21
22# 狀態分區策略
23# 用於不同的 Worker 處理
24def partition_state(state: DistributedState) -> dict:
25    """分割狀態以支持分佈式處理"""
26    return {
27        "immutable": {
28            "request_id": state.request_id,
29            "user_id": state.user_id,
30        },
31        "mutable": {
32            "current_agent": state.current_agent,
33            "processing_history": state.processing_history,
34        },
35        "results": state.results
36    }

數據持久化模式

 1from sqlalchemy import create_engine, Column, String, JSON
 2from sqlalchemy.ext.declarative import declarative_base
 3from sqlalchemy.orm import sessionmaker
 4
 5Base = declarative_base()
 6
 7class WorkflowExecution(Base):
 8    """工作流執行記錄"""
 9    __tablename__ = "workflow_executions"
10    
11    execution_id = Column(String, primary_key=True)
12    request_id = Column(String)
13    current_state = Column(JSON)  # 序列化的 State
14    step_history = Column(JSON)   # 每個步驟的結果
15    status = Column(String)        # queued, running, completed, failed
16    created_at = Column(String)
17    updated_at = Column(String)
18
19# 保存進度
20def save_checkpoint(execution_id: str, state: DistributedState):
21    """保存檢查點"""
22    session = SessionLocal()
23    
24    execution = session.query(WorkflowExecution).filter(
25        WorkflowExecution.execution_id == execution_id
26    ).first()
27    
28    if execution:
29        execution.current_state = state.dict()
30        execution.updated_at = datetime.now().isoformat()
31        session.commit()
32
33# 恢復進度
34def restore_from_checkpoint(execution_id: str) -> DistributedState:
35    """從檢查點恢復"""
36    session = SessionLocal()
37    execution = session.query(WorkflowExecution).filter(
38        WorkflowExecution.execution_id == execution_id
39    ).first()
40    
41    if execution:
42        return DistributedState(**execution.current_state)
43    return None

錯誤恢復架構

重試策略

 1from tenacity import retry, stop_after_attempt, wait_exponential
 2
 3class ResilientAgent:
 4    @retry(
 5        stop=stop_after_attempt(3),
 6        wait=wait_exponential(multiplier=1, min=2, max=10)
 7    )
 8    def execute(self, state: State) -> State:
 9        """自動重試,指數退避"""
10        try:
11            return self.process(state)
12        except Exception as e:
13            state.errors.append({
14                "error": str(e),
15                "timestamp": datetime.now().isoformat()
16            })
17            raise

降級策略

 1def fallback_agent(state: State) -> State:
 2    """降級到簡化版本"""
 3    try:
 4        return full_featured_agent(state)
 5    except TimeoutError:
 6        # 降級到快速版本
 7        return fast_fallback_agent(state)
 8    except Exception:
 9        # 最後降級到緩存結果
10        return use_cached_result(state)

監控和可觀測性架構

嵌入式監控

 1from datetime import datetime
 2import logging
 3
 4class ObservableWorkflow:
 5    def __init__(self):
 6        self.logger = logging.getLogger(__name__)
 7    
 8    def instrument_node(self, node_name: str):
 9        """為節點添加監控"""
10        def wrapper(func):
11            def inner(state):
12                start_time = datetime.now()
13                
14                try:
15                    result = func(state)
16                    duration = (datetime.now() - start_time).total_seconds()
17                    
18                    self.logger.info(f"Node {node_name} completed", extra={
19                        "node": node_name,
20                        "duration_ms": duration * 1000,
21                        "status": "success"
22                    })
23                    
24                    return result
25                except Exception as e:
26                    duration = (datetime.now() - start_time).total_seconds()
27                    
28                    self.logger.error(f"Node {node_name} failed", extra={
29                        "node": node_name,
30                        "duration_ms": duration * 1000,
31                        "error": str(e),
32                        "status": "failed"
33                    })
34                    raise
35            
36            return inner
37        return wrapper
38
39# 使用
40observable = ObservableWorkflow()
41
42@observable.instrument_node("analysis")
43def analysis_node(state: State) -> State:
44    return state

指標收集

 1from prometheus_client import Counter, Histogram, Gauge
 2
 3# 定義指標
 4workflow_executions = Counter(
 5    'workflow_executions_total',
 6    'Total workflow executions',
 7    ['status']
 8)
 9
10workflow_duration = Histogram(
11    'workflow_duration_seconds',
12    'Workflow duration'
13)
14
15active_workflows = Gauge(
16    'active_workflows',
17    'Active workflows'
18)
19
20# 記錄指標
21def track_workflow(workflow_func):
22    def wrapper(*args, **kwargs):
23        active_workflows.inc()
24        start = datetime.now()
25        
26        try:
27            result = workflow_func(*args, **kwargs)
28            workflow_executions.labels(status='success').inc()
29            return result
30        except Exception as e:
31            workflow_executions.labels(status='failure').inc()
32            raise
33        finally:
34            duration = (datetime.now() - start).total_seconds()
35            workflow_duration.observe(duration)
36            active_workflows.dec()
37    
38    return wrapper

架構選擇指南

場景架構推薦原因
MVP / 原型單體快速驗證
低流量應用單體 + 緩存簡單可靠
中等流量分層平衡複雜度和性能
高流量微服務獨立擴展
複雜工作流樹形/循環拓撲滿足業務邏輯

總結

設計 LangGraph AI 後端的關鍵原則:

  1. 分層:API 層、服務層、Worker 層、數據層
  2. Agent 拓撲:根據業務邏輯選擇合適的結構
  3. 狀態管理:清晰的狀態定義和持久化
  4. 容錯:重試、降級、熔斷
  5. 可觀測:完整的日誌、指標、追蹤

好的架構設計能讓系統在流量增長時平穩擴展,故障時快速恢復。

Yen

Yen

Yen