LangGraph AI 後端邏輯設計:狀態流、決策路由和條件轉移

LangGraph 的核心價值在於能表達複雜的決策邏輯動態的工作流。相比線性的 Chain,LangGraph 允許根據中間狀態做出決策,選擇不同的處理路徑。本文深入講解如何設計 LangGraph 的邏輯層。


狀態設計的藝術

狀態vs消息的權衡

消息模式(LangChain Chain):
input → [處理] → output

狀態模式(LangGraph):
state = {data1, data2, ..., flags, counters, history}
state → [Agent A] → state' → [Agent B] → state''

狀態的優勢:
1. 保留歷史信息
2. 支持複雜決策
3. 可追蹤性強
4. 支持重試和恢復

分層狀態設計

 1from dataclasses import dataclass, field
 2from typing import Optional, Annotated
 3from datetime import datetime
 4
 5@dataclass
 6class RequestMetadata:
 7    """請求元數據(不變)"""
 8    request_id: str
 9    user_id: str
10    timestamp: str
11    priority: str  # low, medium, high, critical
12
13@dataclass
14class ProcessingContext:
15    """處理上下文(易變)"""
16    current_step: str  # 當前步驟
17    step_history: list[str] = field(default_factory=list)  # 完成的步驟
18    retry_count: int = 0
19    last_error: Optional[str] = None
20    checkpoint_data: dict = field(default_factory=dict)  # 檢查點
21
22@dataclass
23class IntermediateResults:
24    """中間結果(逐步累積)"""
25    analysis: Optional[str] = None
26    validation_status: Optional[bool] = None
27    routing_decision: Optional[str] = None
28    extracted_data: dict = field(default_factory=dict)
29
30@dataclass
31class FinalOutput:
32    """最終輸出"""
33    response: Optional[str] = None
34    confidence_score: float = 0.0
35    model_version: Optional[str] = None
36
37@dataclass
38class CompleteState:
39    """完整狀態 = 元數據 + 上下文 + 中間結果 + 最終輸出"""
40    metadata: RequestMetadata
41    context: ProcessingContext
42    intermediate: IntermediateResults
43    output: FinalOutput
44
45# 狀態訪問模式
46def get_state_summary(state: CompleteState) -> dict:
47    """獲取狀態摘要用於決策"""
48    return {
49        "is_retrying": state.context.retry_count > 0,
50        "has_analysis": state.intermediate.analysis is not None,
51        "is_high_priority": state.metadata.priority == "critical",
52        "last_step": state.context.step_history[-1] if state.context.step_history else None
53    }

狀態更新的原則

 1# ❌ 反面:直接修改狀態
 2def bad_update(state: CompleteState):
 3    state.intermediate.analysis = result  # 不安全
 4    return state
 5
 6# ✅ 正確:構造新狀態
 7def good_update(state: CompleteState):
 8    return CompleteState(
 9        metadata=state.metadata,  # 保持不變
10        context=ProcessingContext(
11            current_step="analysis",
12            step_history=state.context.step_history + ["analysis"],
13            retry_count=state.context.retry_count
14        ),
15        intermediate=IntermediateResults(
16            analysis=result,
17            validation_status=state.intermediate.validation_status,
18            routing_decision=state.intermediate.routing_decision
19        ),
20        output=state.output
21    )

決策路由設計

1. 二元決策(If-Else)

 1def simple_condition(state: CompleteState) -> str:
 2    """簡單的是/否決策"""
 3    if state.intermediate.analysis:
 4        return "proceed"
 5    else:
 6        return "retry"
 7
 8workflow.add_conditional_edges(
 9    "analysis_node",
10    simple_condition,
11    {
12        "proceed": "validation_node",
13        "retry": "analysis_node"
14    }
15)

2. 多路決策(Switch)

 1def multi_way_router(state: CompleteState) -> str:
 2    """根據類型選擇不同路徑"""
 3    category = state.intermediate.extracted_data.get("category")
 4    
 5    router_map = {
 6        "technical": "technical_handler",
 7        "billing": "billing_handler",
 8        "account": "account_handler",
 9        "other": "default_handler"
10    }
11    
12    return router_map.get(category, "default_handler")
13
14workflow.add_conditional_edges(
15    "classification_node",
16    multi_way_router,
17    {
18        "technical_handler": "technical_handler",
19        "billing_handler": "billing_handler",
20        "account_handler": "account_handler",
21        "default_handler": "default_handler"
22    }
23)

3. 優先級路由

 1def priority_router(state: CompleteState) -> str:
 2    """根據優先級選擇處理路徑"""
 3    priority = state.metadata.priority
 4    
 5    if priority == "critical":
 6        return "fast_track"
 7    elif priority == "high":
 8        return "priority_track"
 9    else:
10        return "normal_track"
11
12# 不同優先級的 SLA
13fast_track_sla = 60      # 1 分鐘
14priority_track_sla = 300 # 5 分鐘
15normal_track_sla = 3600  # 1 小時

4. 複雜決策(AI 輔助)

 1def ai_powered_router(state: CompleteState) -> str:
 2    """使用 AI 做複雜決策"""
 3    from langchain_anthropic import ChatAnthropic
 4    
 5    model = ChatAnthropic()
 6    
 7    prompt = f"""
 8    根據以下信息決定下一步:
 9    
10    分析結果:{state.intermediate.analysis}
11    驗證狀態:{state.intermediate.validation_status}
12    錯誤信息:{state.context.last_error}
13    重試次數:{state.context.retry_count}
14    
15    可選路徑:
16    1. "escalate" - 升級給人工
17    2. "retry" - 重試
18    3. "proceed" - 繼續
19    4. "abort" - 中止
20    
21    選擇最合適的路徑:
22    """
23    
24    response = model.invoke(prompt)
25    decision = response.content.strip().lower()
26    
27    # 確保返回有效的選擇
28    valid_choices = ["escalate", "retry", "proceed", "abort"]
29    return decision if decision in valid_choices else "proceed"
30
31workflow.add_conditional_edges(
32    "decision_node",
33    ai_powered_router,
34    {
35        "escalate": "human_escalation",
36        "retry": "analysis_node",
37        "proceed": "validation_node",
38        "abort": END
39    }
40)

複雜工作流邏輯

1. 重試邏輯

 1@dataclass
 2class RetryableState:
 3    max_retries: int = 3
 4    retry_count: int = 0
 5    last_error: Optional[str] = None
 6
 7def retry_decision(state: RetryableState) -> str:
 8    """決定是否重試"""
 9    if state.retry_count >= state.max_retries:
10        return "max_retries_exceeded"
11    
12    # 某些錯誤不應該重試
13    non_retryable_errors = [
14        "validation_error",
15        "unauthorized",
16        "not_found"
17    ]
18    
19    if any(error in state.last_error for error in non_retryable_errors):
20        return "non_retryable"
21    
22    return "retry"
23
24def retry_with_backoff(state: RetryableState) -> RetryableState:
25    """指數退避重試"""
26    import time
27    
28    backoff = 2 ** state.retry_count  # 1s, 2s, 4s
29    time.sleep(min(backoff, 30))  # 最多等待 30 秒
30    
31    state.retry_count += 1
32    return state

2. 並行執行後的合併邏輯

 1@dataclass
 2class ParallelTasksState:
 3    task_a_result: Optional[dict] = None
 4    task_b_result: Optional[dict] = None
 5    task_c_result: Optional[dict] = None
 6    all_succeeded: bool = False
 7
 8def merge_parallel_results(state: ParallelTasksState) -> ParallelTasksState:
 9    """合併並行任務的結果"""
10    
11    # 檢查是否所有任務都成功
12    all_results = [
13        state.task_a_result,
14        state.task_b_result,
15        state.task_c_result
16    ]
17    
18    state.all_succeeded = all(r is not None for r in all_results)
19    
20    if state.all_succeeded:
21        # 合併邏輯
22        merged = {
23            "summary": f"A: {state.task_a_result.get('summary', '')}",
24            "analysis": f"B: {state.task_b_result.get('analysis', '')}",
25            "recommendation": f"C: {state.task_c_result.get('recommendation', '')}"
26        }
27        return state
28    else:
29        # 某些任務失敗,決定是重試還是降級
30        return state
31
32def decide_after_parallel(state: ParallelTasksState) -> str:
33    """並行任務完成後的決策"""
34    if state.all_succeeded:
35        return "proceed"
36    elif state.task_a_result and state.task_b_result:
37        return "partial_proceed"  # 至少 2 個成功
38    else:
39        return "retry_failed_tasks"

3. 分支合並(Join)邏輯

 1@dataclass
 2class BranchJoinState:
 3    branch_path: Optional[str] = None
 4    # 各分支的結果
 5    path_a_result: Optional[dict] = None
 6    path_b_result: Optional[dict] = None
 7    path_c_result: Optional[dict] = None
 8    join_result: Optional[dict] = None
 9
10def join_branches(state: BranchJoinState) -> BranchJoinState:
11    """合併分支結果"""
12    
13    if state.branch_path == "a":
14        result = state.path_a_result
15    elif state.branch_path == "b":
16        result = state.path_b_result
17    elif state.branch_path == "c":
18        result = state.path_c_result
19    else:
20        result = None
21    
22    state.join_result = result
23    return state
24
25# 工作流示例
26workflow = StateGraph(BranchJoinState)
27
28def route_to_branch(state: BranchJoinState) -> str:
29    """路由到不同分支"""
30    priority = state.metadata.priority
31    if priority == "critical":
32        return "path_a"
33    elif priority == "high":
34        return "path_b"
35    else:
36        return "path_c"
37
38workflow.add_conditional_edges(
39    "route",
40    route_to_branch,
41    {
42        "path_a": "branch_a",
43        "path_b": "branch_b",
44        "path_c": "branch_c"
45    }
46)
47
48# 所有分支在 join 點匯聚
49workflow.add_edge("branch_a", "join")
50workflow.add_edge("branch_b", "join")
51workflow.add_edge("branch_c", "join")

狀態轉移圖設計

狀態機模式

 1from enum import Enum
 2
 3class WorkflowStatus(Enum):
 4    """工作流狀態定義"""
 5    PENDING = "pending"           # 等待開始
 6    ANALYZING = "analyzing"       # 分析中
 7    VALIDATING = "validating"     # 驗證中
 8    ROUTING = "routing"           # 路由中
 9    PROCESSING = "processing"     # 處理中
10    SUCCEEDED = "succeeded"       # 成功
11    FAILED = "failed"             # 失敗
12    ESCALATED = "escalated"       # 升級
13
14# 有效的狀態轉移
15VALID_TRANSITIONS = {
16    WorkflowStatus.PENDING: [WorkflowStatus.ANALYZING],
17    WorkflowStatus.ANALYZING: [
18        WorkflowStatus.VALIDATING,
19        WorkflowStatus.FAILED
20    ],
21    WorkflowStatus.VALIDATING: [
22        WorkflowStatus.ROUTING,
23        WorkflowStatus.FAILED
24    ],
25    WorkflowStatus.ROUTING: [
26        WorkflowStatus.PROCESSING,
27        WorkflowStatus.ESCALATED
28    ],
29    WorkflowStatus.PROCESSING: [
30        WorkflowStatus.SUCCEEDED,
31        WorkflowStatus.FAILED
32    ],
33    WorkflowStatus.FAILED: [WorkflowStatus.ANALYZING],  # 重試
34    WorkflowStatus.ESCALATED: [],  # 終態
35    WorkflowStatus.SUCCEEDED: []   # 終態
36}
37
38def validate_transition(current: WorkflowStatus, next_status: WorkflowStatus) -> bool:
39    """驗證狀態轉移是否合法"""
40    return next_status in VALID_TRANSITIONS.get(current, [])
41
42@dataclass
43class StateMachineState:
44    current_status: WorkflowStatus = WorkflowStatus.PENDING
45    
46    def transition_to(self, new_status: WorkflowStatus):
47        if validate_transition(self.current_status, new_status):
48            self.current_status = new_status
49            return True
50        else:
51            raise ValueError(
52                f"Invalid transition from {self.current_status} to {new_status}"
53            )

決策邏輯的可測試性

單元測試決策函數

 1import pytest
 2
 3def test_priority_router():
 4    """測試優先級路由"""
 5    state = CompleteState(
 6        metadata=RequestMetadata(
 7            request_id="test",
 8            user_id="user1",
 9            timestamp="2024-01-01",
10            priority="critical"
11        ),
12        context=ProcessingContext(current_step="init"),
13        intermediate=IntermediateResults(),
14        output=FinalOutput()
15    )
16    
17    result = priority_router(state)
18    assert result == "fast_track"
19
20def test_retry_logic():
21    """測試重試邏輯"""
22    state = RetryableState(max_retries=3, retry_count=0)
23    assert retry_decision(state) == "retry"
24    
25    state.retry_count = 3
26    assert retry_decision(state) == "max_retries_exceeded"
27
28def test_merge_results():
29    """測試結果合併"""
30    state = ParallelTasksState(
31        task_a_result={"data": "a"},
32        task_b_result={"data": "b"},
33        task_c_result={"data": "c"}
34    )
35    
36    result = merge_parallel_results(state)
37    assert result.all_succeeded == True

邏輯設計最佳實踐

1. 清晰的決策點

 1# ❌ 不清楚
 2def process(state):
 3    if state.data and state.status and len(state.data) > 5:
 4        return "path_a"
 5    else:
 6        return "path_b"
 7
 8# ✅ 清晰
 9def should_use_fast_path(state: CompleteState) -> bool:
10    """檢查是否應該使用快速路徑"""
11    has_data = state.intermediate.analysis is not None
12    status_valid = state.intermediate.validation_status is True
13    sufficient_confidence = len(state.intermediate.analysis) > 100
14    
15    return has_data and status_valid and sufficient_confidence
16
17def route_based_on_path_decision(state: CompleteState) -> str:
18    if should_use_fast_path(state):
19        return "fast_path"
20    else:
21        return "normal_path"

2. 決策的文檔化

 1def complex_router(state: CompleteState) -> str:
 2    """
 3    複雜路由決策
 4    
 5    決策樹:
 6    1. 如果優先級是 critical → fast_track (SLA: 1 min)
 7    2. 否則,如果有驗證錯誤 → escalate (SLA: 5 min)
 8    3. 否則,如果分析置信度 > 0.9 → proceed (SLA: 30 min)
 9    4. 否則 → manual_review (SLA: 1 hour)
10    
11    Args:
12        state: 完整狀態
13    
14    Returns:
15        下一個節點的名稱
16    """
17    # 實現...
18    pass

3. 降級策略

1def route_with_fallback(state: CompleteState) -> str:
2    """帶降級的路由"""
3    try:
4        # 首選:AI 輔助決策
5        return ai_powered_router(state)
6    except Exception as e:
7        logger.warning(f"AI router failed: {e}")
8        # 降級:規則型決策
9        return rule_based_router(state)

邏輯複雜度分析

邏輯類型複雜度可測試性調試難度
簡單線性O(1)
二元決策O(1)
多路決策O(n)
優先級決策O(n)
AI 輔助決策O(1)
複雜並行O(n²)

總結

LangGraph 的邏輯設計原則:

  1. 狀態第一:清晰的狀態定義是基礎
  2. 顯式決策:決策點應該明確、可追蹤
  3. 容錯設計:失敗應該導向降級而不是崩潰
  4. 可測試:邏輯應該能被單獨測試
  5. 可維護:複雜邏輯應該有文檔和清晰的命名

好的邏輯設計能讓 AI 後端系統既強大又可靠。

Yen

Yen

Yen