LangGraph 的核心價值在於能表達複雜的決策邏輯和動態的工作流。相比線性的 Chain,LangGraph 允許根據中間狀態做出決策,選擇不同的處理路徑。本文深入講解如何設計 LangGraph 的邏輯層。
狀態設計的藝術
狀態vs消息的權衡
消息模式(LangChain Chain):
input → [處理] → output
狀態模式(LangGraph):
state = {data1, data2, ..., flags, counters, history}
state → [Agent A] → state' → [Agent B] → state''
狀態的優勢:
1. 保留歷史信息
2. 支持複雜決策
3. 可追蹤性強
4. 支持重試和恢復
分層狀態設計
1from dataclasses import dataclass, field
2from typing import Optional, Annotated
3from datetime import datetime
4
5@dataclass
6class RequestMetadata:
7 """請求元數據(不變)"""
8 request_id: str
9 user_id: str
10 timestamp: str
11 priority: str # low, medium, high, critical
12
13@dataclass
14class ProcessingContext:
15 """處理上下文(易變)"""
16 current_step: str # 當前步驟
17 step_history: list[str] = field(default_factory=list) # 完成的步驟
18 retry_count: int = 0
19 last_error: Optional[str] = None
20 checkpoint_data: dict = field(default_factory=dict) # 檢查點
21
22@dataclass
23class IntermediateResults:
24 """中間結果(逐步累積)"""
25 analysis: Optional[str] = None
26 validation_status: Optional[bool] = None
27 routing_decision: Optional[str] = None
28 extracted_data: dict = field(default_factory=dict)
29
30@dataclass
31class FinalOutput:
32 """最終輸出"""
33 response: Optional[str] = None
34 confidence_score: float = 0.0
35 model_version: Optional[str] = None
36
37@dataclass
38class CompleteState:
39 """完整狀態 = 元數據 + 上下文 + 中間結果 + 最終輸出"""
40 metadata: RequestMetadata
41 context: ProcessingContext
42 intermediate: IntermediateResults
43 output: FinalOutput
44
45# 狀態訪問模式
46def get_state_summary(state: CompleteState) -> dict:
47 """獲取狀態摘要用於決策"""
48 return {
49 "is_retrying": state.context.retry_count > 0,
50 "has_analysis": state.intermediate.analysis is not None,
51 "is_high_priority": state.metadata.priority == "critical",
52 "last_step": state.context.step_history[-1] if state.context.step_history else None
53 }
狀態更新的原則
1# ❌ 反面:直接修改狀態
2def bad_update(state: CompleteState):
3 state.intermediate.analysis = result # 不安全
4 return state
5
6# ✅ 正確:構造新狀態
7def good_update(state: CompleteState):
8 return CompleteState(
9 metadata=state.metadata, # 保持不變
10 context=ProcessingContext(
11 current_step="analysis",
12 step_history=state.context.step_history + ["analysis"],
13 retry_count=state.context.retry_count
14 ),
15 intermediate=IntermediateResults(
16 analysis=result,
17 validation_status=state.intermediate.validation_status,
18 routing_decision=state.intermediate.routing_decision
19 ),
20 output=state.output
21 )
決策路由設計
1. 二元決策(If-Else)
1def simple_condition(state: CompleteState) -> str:
2 """簡單的是/否決策"""
3 if state.intermediate.analysis:
4 return "proceed"
5 else:
6 return "retry"
7
8workflow.add_conditional_edges(
9 "analysis_node",
10 simple_condition,
11 {
12 "proceed": "validation_node",
13 "retry": "analysis_node"
14 }
15)
2. 多路決策(Switch)
1def multi_way_router(state: CompleteState) -> str:
2 """根據類型選擇不同路徑"""
3 category = state.intermediate.extracted_data.get("category")
4
5 router_map = {
6 "technical": "technical_handler",
7 "billing": "billing_handler",
8 "account": "account_handler",
9 "other": "default_handler"
10 }
11
12 return router_map.get(category, "default_handler")
13
14workflow.add_conditional_edges(
15 "classification_node",
16 multi_way_router,
17 {
18 "technical_handler": "technical_handler",
19 "billing_handler": "billing_handler",
20 "account_handler": "account_handler",
21 "default_handler": "default_handler"
22 }
23)
3. 優先級路由
1def priority_router(state: CompleteState) -> str:
2 """根據優先級選擇處理路徑"""
3 priority = state.metadata.priority
4
5 if priority == "critical":
6 return "fast_track"
7 elif priority == "high":
8 return "priority_track"
9 else:
10 return "normal_track"
11
12# 不同優先級的 SLA
13fast_track_sla = 60 # 1 分鐘
14priority_track_sla = 300 # 5 分鐘
15normal_track_sla = 3600 # 1 小時
4. 複雜決策(AI 輔助)
1def ai_powered_router(state: CompleteState) -> str:
2 """使用 AI 做複雜決策"""
3 from langchain_anthropic import ChatAnthropic
4
5 model = ChatAnthropic()
6
7 prompt = f"""
8 根據以下信息決定下一步:
9
10 分析結果:{state.intermediate.analysis}
11 驗證狀態:{state.intermediate.validation_status}
12 錯誤信息:{state.context.last_error}
13 重試次數:{state.context.retry_count}
14
15 可選路徑:
16 1. "escalate" - 升級給人工
17 2. "retry" - 重試
18 3. "proceed" - 繼續
19 4. "abort" - 中止
20
21 選擇最合適的路徑:
22 """
23
24 response = model.invoke(prompt)
25 decision = response.content.strip().lower()
26
27 # 確保返回有效的選擇
28 valid_choices = ["escalate", "retry", "proceed", "abort"]
29 return decision if decision in valid_choices else "proceed"
30
31workflow.add_conditional_edges(
32 "decision_node",
33 ai_powered_router,
34 {
35 "escalate": "human_escalation",
36 "retry": "analysis_node",
37 "proceed": "validation_node",
38 "abort": END
39 }
40)
複雜工作流邏輯
1. 重試邏輯
1@dataclass
2class RetryableState:
3 max_retries: int = 3
4 retry_count: int = 0
5 last_error: Optional[str] = None
6
7def retry_decision(state: RetryableState) -> str:
8 """決定是否重試"""
9 if state.retry_count >= state.max_retries:
10 return "max_retries_exceeded"
11
12 # 某些錯誤不應該重試
13 non_retryable_errors = [
14 "validation_error",
15 "unauthorized",
16 "not_found"
17 ]
18
19 if any(error in state.last_error for error in non_retryable_errors):
20 return "non_retryable"
21
22 return "retry"
23
24def retry_with_backoff(state: RetryableState) -> RetryableState:
25 """指數退避重試"""
26 import time
27
28 backoff = 2 ** state.retry_count # 1s, 2s, 4s
29 time.sleep(min(backoff, 30)) # 最多等待 30 秒
30
31 state.retry_count += 1
32 return state
2. 並行執行後的合併邏輯
1@dataclass
2class ParallelTasksState:
3 task_a_result: Optional[dict] = None
4 task_b_result: Optional[dict] = None
5 task_c_result: Optional[dict] = None
6 all_succeeded: bool = False
7
8def merge_parallel_results(state: ParallelTasksState) -> ParallelTasksState:
9 """合併並行任務的結果"""
10
11 # 檢查是否所有任務都成功
12 all_results = [
13 state.task_a_result,
14 state.task_b_result,
15 state.task_c_result
16 ]
17
18 state.all_succeeded = all(r is not None for r in all_results)
19
20 if state.all_succeeded:
21 # 合併邏輯
22 merged = {
23 "summary": f"A: {state.task_a_result.get('summary', '')}",
24 "analysis": f"B: {state.task_b_result.get('analysis', '')}",
25 "recommendation": f"C: {state.task_c_result.get('recommendation', '')}"
26 }
27 return state
28 else:
29 # 某些任務失敗,決定是重試還是降級
30 return state
31
32def decide_after_parallel(state: ParallelTasksState) -> str:
33 """並行任務完成後的決策"""
34 if state.all_succeeded:
35 return "proceed"
36 elif state.task_a_result and state.task_b_result:
37 return "partial_proceed" # 至少 2 個成功
38 else:
39 return "retry_failed_tasks"
3. 分支合並(Join)邏輯
1@dataclass
2class BranchJoinState:
3 branch_path: Optional[str] = None
4 # 各分支的結果
5 path_a_result: Optional[dict] = None
6 path_b_result: Optional[dict] = None
7 path_c_result: Optional[dict] = None
8 join_result: Optional[dict] = None
9
10def join_branches(state: BranchJoinState) -> BranchJoinState:
11 """合併分支結果"""
12
13 if state.branch_path == "a":
14 result = state.path_a_result
15 elif state.branch_path == "b":
16 result = state.path_b_result
17 elif state.branch_path == "c":
18 result = state.path_c_result
19 else:
20 result = None
21
22 state.join_result = result
23 return state
24
25# 工作流示例
26workflow = StateGraph(BranchJoinState)
27
28def route_to_branch(state: BranchJoinState) -> str:
29 """路由到不同分支"""
30 priority = state.metadata.priority
31 if priority == "critical":
32 return "path_a"
33 elif priority == "high":
34 return "path_b"
35 else:
36 return "path_c"
37
38workflow.add_conditional_edges(
39 "route",
40 route_to_branch,
41 {
42 "path_a": "branch_a",
43 "path_b": "branch_b",
44 "path_c": "branch_c"
45 }
46)
47
48# 所有分支在 join 點匯聚
49workflow.add_edge("branch_a", "join")
50workflow.add_edge("branch_b", "join")
51workflow.add_edge("branch_c", "join")
狀態轉移圖設計
狀態機模式
1from enum import Enum
2
3class WorkflowStatus(Enum):
4 """工作流狀態定義"""
5 PENDING = "pending" # 等待開始
6 ANALYZING = "analyzing" # 分析中
7 VALIDATING = "validating" # 驗證中
8 ROUTING = "routing" # 路由中
9 PROCESSING = "processing" # 處理中
10 SUCCEEDED = "succeeded" # 成功
11 FAILED = "failed" # 失敗
12 ESCALATED = "escalated" # 升級
13
14# 有效的狀態轉移
15VALID_TRANSITIONS = {
16 WorkflowStatus.PENDING: [WorkflowStatus.ANALYZING],
17 WorkflowStatus.ANALYZING: [
18 WorkflowStatus.VALIDATING,
19 WorkflowStatus.FAILED
20 ],
21 WorkflowStatus.VALIDATING: [
22 WorkflowStatus.ROUTING,
23 WorkflowStatus.FAILED
24 ],
25 WorkflowStatus.ROUTING: [
26 WorkflowStatus.PROCESSING,
27 WorkflowStatus.ESCALATED
28 ],
29 WorkflowStatus.PROCESSING: [
30 WorkflowStatus.SUCCEEDED,
31 WorkflowStatus.FAILED
32 ],
33 WorkflowStatus.FAILED: [WorkflowStatus.ANALYZING], # 重試
34 WorkflowStatus.ESCALATED: [], # 終態
35 WorkflowStatus.SUCCEEDED: [] # 終態
36}
37
38def validate_transition(current: WorkflowStatus, next_status: WorkflowStatus) -> bool:
39 """驗證狀態轉移是否合法"""
40 return next_status in VALID_TRANSITIONS.get(current, [])
41
42@dataclass
43class StateMachineState:
44 current_status: WorkflowStatus = WorkflowStatus.PENDING
45
46 def transition_to(self, new_status: WorkflowStatus):
47 if validate_transition(self.current_status, new_status):
48 self.current_status = new_status
49 return True
50 else:
51 raise ValueError(
52 f"Invalid transition from {self.current_status} to {new_status}"
53 )
決策邏輯的可測試性
單元測試決策函數
1import pytest
2
3def test_priority_router():
4 """測試優先級路由"""
5 state = CompleteState(
6 metadata=RequestMetadata(
7 request_id="test",
8 user_id="user1",
9 timestamp="2024-01-01",
10 priority="critical"
11 ),
12 context=ProcessingContext(current_step="init"),
13 intermediate=IntermediateResults(),
14 output=FinalOutput()
15 )
16
17 result = priority_router(state)
18 assert result == "fast_track"
19
20def test_retry_logic():
21 """測試重試邏輯"""
22 state = RetryableState(max_retries=3, retry_count=0)
23 assert retry_decision(state) == "retry"
24
25 state.retry_count = 3
26 assert retry_decision(state) == "max_retries_exceeded"
27
28def test_merge_results():
29 """測試結果合併"""
30 state = ParallelTasksState(
31 task_a_result={"data": "a"},
32 task_b_result={"data": "b"},
33 task_c_result={"data": "c"}
34 )
35
36 result = merge_parallel_results(state)
37 assert result.all_succeeded == True
邏輯設計最佳實踐
1. 清晰的決策點
1# ❌ 不清楚
2def process(state):
3 if state.data and state.status and len(state.data) > 5:
4 return "path_a"
5 else:
6 return "path_b"
7
8# ✅ 清晰
9def should_use_fast_path(state: CompleteState) -> bool:
10 """檢查是否應該使用快速路徑"""
11 has_data = state.intermediate.analysis is not None
12 status_valid = state.intermediate.validation_status is True
13 sufficient_confidence = len(state.intermediate.analysis) > 100
14
15 return has_data and status_valid and sufficient_confidence
16
17def route_based_on_path_decision(state: CompleteState) -> str:
18 if should_use_fast_path(state):
19 return "fast_path"
20 else:
21 return "normal_path"
2. 決策的文檔化
1def complex_router(state: CompleteState) -> str:
2 """
3 複雜路由決策
4
5 決策樹:
6 1. 如果優先級是 critical → fast_track (SLA: 1 min)
7 2. 否則,如果有驗證錯誤 → escalate (SLA: 5 min)
8 3. 否則,如果分析置信度 > 0.9 → proceed (SLA: 30 min)
9 4. 否則 → manual_review (SLA: 1 hour)
10
11 Args:
12 state: 完整狀態
13
14 Returns:
15 下一個節點的名稱
16 """
17 # 實現...
18 pass
3. 降級策略
1def route_with_fallback(state: CompleteState) -> str:
2 """帶降級的路由"""
3 try:
4 # 首選:AI 輔助決策
5 return ai_powered_router(state)
6 except Exception as e:
7 logger.warning(f"AI router failed: {e}")
8 # 降級:規則型決策
9 return rule_based_router(state)
邏輯複雜度分析
| 邏輯類型 | 複雜度 | 可測試性 | 調試難度 |
|---|---|---|---|
| 簡單線性 | O(1) | 高 | 低 |
| 二元決策 | O(1) | 高 | 低 |
| 多路決策 | O(n) | 中 | 低 |
| 優先級決策 | O(n) | 中 | 中 |
| AI 輔助決策 | O(1) | 低 | 高 |
| 複雜並行 | O(n²) | 低 | 高 |
總結
LangGraph 的邏輯設計原則:
- 狀態第一:清晰的狀態定義是基礎
- 顯式決策:決策點應該明確、可追蹤
- 容錯設計:失敗應該導向降級而不是崩潰
- 可測試:邏輯應該能被單獨測試
- 可維護:複雜邏輯應該有文檔和清晰的命名
好的邏輯設計能讓 AI 後端系統既強大又可靠。
