從概念到代碼,這篇文章提供完整可運行的 LangGraph AI 後端實現。我們會構建一個真實的系統:接收請求、執行工作流、持久化結果、提供監控。
完整項目結構
ai-backend/
├── main.py # FastAPI 應用入口
├── models/ # 數據模型
│ ├── __init__.py
│ └── schemas.py # Pydantic schemas
├── workflows/ # LangGraph 工作流
│ ├── __init__.py
│ ├── base.py # 基礎工作流類
│ └── ticket_workflow.py
├── agents/ # Agent 實現
│ ├── __init__.py
│ ├── base_agent.py
│ ├── classifier.py
│ ├── analyzer.py
│ └── response_generator.py
├── persistence/ # 數據持久化
│ ├── __init__.py
│ └── database.py
├── monitoring/ # 監控和日誌
│ ├── __init__.py
│ ├── logger.py
│ └── metrics.py
├── config.py # 配置管理
├── requirements.txt
└── docker-compose.yml
數據模型層(models/schemas.py)
1from pydantic import BaseModel, Field
2from typing import Optional, List
3from datetime import datetime
4from enum import Enum
5
6class PriorityLevel(str, Enum):
7 LOW = "low"
8 MEDIUM = "medium"
9 HIGH = "high"
10 CRITICAL = "critical"
11
12class TicketStatus(str, Enum):
13 OPEN = "open"
14 CLASSIFYING = "classifying"
15 ASSIGNED = "assigned"
16 IN_PROGRESS = "in_progress"
17 RESOLVED = "resolved"
18 CLOSED = "closed"
19
20# 請求模型
21class CreateTicketRequest(BaseModel):
22 user_id: str = Field(..., min_length=1, description="用戶 ID")
23 subject: str = Field(..., min_length=5, max_length=500)
24 description: str = Field(..., min_length=10, max_length=5000)
25 priority: Optional[PriorityLevel] = PriorityLevel.MEDIUM
26 attachments: Optional[List[str]] = []
27
28# 回應模型
29class TicketResponse(BaseModel):
30 ticket_id: str
31 status: TicketStatus
32 category: Optional[str]
33 priority: PriorityLevel
34 assigned_department: Optional[str]
35 response: Optional[str]
36 created_at: datetime
37 updated_at: datetime
38 processing_time_ms: int
39
40# 任務查詢模型
41class TaskStatusResponse(BaseModel):
42 task_id: str
43 status: str # queued, processing, completed, failed
44 progress: float # 0-1
45 result: Optional[dict] = None
46 error: Optional[str] = None
47 updated_at: datetime
核心配置(config.py)
1from pydantic_settings import BaseSettings
2from typing import Optional
3
4class Settings(BaseSettings):
5 """應用配置"""
6
7 # 基礎配置
8 APP_NAME: str = "AI Backend"
9 DEBUG: bool = False
10 LOG_LEVEL: str = "INFO"
11
12 # 模型配置
13 LLM_MODEL: str = "claude-3-5-sonnet-20241022"
14 LLM_TEMPERATURE: float = 0.7
15 LLM_MAX_TOKENS: int = 4096
16
17 # API 配置
18 API_HOST: str = "0.0.0.0"
19 API_PORT: int = 8000
20 API_WORKERS: int = 4
21
22 # 數據庫配置
23 DATABASE_URL: str = "postgresql://user:password@localhost:5432/ai_backend"
24 REDIS_URL: str = "redis://localhost:6379/0"
25
26 # 工作流配置
27 MAX_WORKFLOW_RETRIES: int = 3
28 WORKFLOW_TIMEOUT_SECONDS: int = 300
29
30 # LangGraph 配置
31 ENABLE_CHECKPOINTING: bool = True
32 CHECKPOINT_DIR: str = ".langraph_checkpoints"
33
34 # 監控配置
35 PROMETHEUS_PORT: int = 8001
36 ENABLE_METRICS: bool = True
37
38 class Config:
39 env_file = ".env"
40 case_sensitive = True
41
42# 全局配置實例
43settings = Settings()
數據庫層(persistence/database.py)
1from sqlalchemy import (
2 create_engine, Column, String, Integer, JSON,
3 DateTime, Boolean, Text, Enum as SQLEnum
4)
5from sqlalchemy.ext.declarative import declarative_base
6from sqlalchemy.orm import sessionmaker, Session
7from datetime import datetime
8from typing import Optional
9from config import settings
10
11Base = declarative_base()
12
13# ORM 模型
14class TicketModel(Base):
15 __tablename__ = "tickets"
16
17 ticket_id = Column(String, primary_key=True, index=True)
18 user_id = Column(String, index=True)
19
20 # 工單信息
21 subject = Column(String(500))
22 description = Column(Text)
23 status = Column(String, default="open")
24 category = Column(String, nullable=True)
25 priority = Column(String, default="medium")
26
27 # 處理信息
28 assigned_department = Column(String, nullable=True)
29 assigned_agent_id = Column(String, nullable=True)
30
31 # AI 生成的內容
32 ai_response = Column(Text, nullable=True)
33 response_quality_score = Column(Integer, nullable=True)
34
35 # 工作流追蹤
36 workflow_state = Column(JSON) # 完整的狀態快照
37 processing_steps = Column(JSON) # [{step, timestamp, result}]
38
39 # 时间戳
40 created_at = Column(DateTime, default=datetime.utcnow, index=True)
41 updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
42 completed_at = Column(DateTime, nullable=True)
43
44 # 错误追踪
45 error_message = Column(Text, nullable=True)
46 retry_count = Column(Integer, default=0)
47
48# 工作流執行記錄
49class WorkflowExecutionModel(Base):
50 __tablename__ = "workflow_executions"
51
52 execution_id = Column(String, primary_key=True, index=True)
53 ticket_id = Column(String, index=True)
54
55 # 執行狀態
56 status = Column(String) # queued, running, completed, failed
57
58 # 執行細節
59 state_snapshot = Column(JSON) # 當前狀態快照
60 steps_completed = Column(JSON) # 已完成的步驟列表
61
62 # 性能指標
63 start_time = Column(DateTime)
64 end_time = Column(DateTime, nullable=True)
65 duration_ms = Column(Integer, nullable=True)
66
67 # 錯誤信息
68 error = Column(Text, nullable=True)
69 retry_count = Column(Integer, default=0)
70
71# 數據庫操作類
72class Database:
73 def __init__(self):
74 self.engine = create_engine(
75 settings.DATABASE_URL,
76 pool_pre_ping=True,
77 echo=settings.DEBUG
78 )
79 self.SessionLocal = sessionmaker(
80 autocommit=False,
81 autoflush=False,
82 bind=self.engine
83 )
84 self.create_tables()
85
86 def create_tables(self):
87 """創建所有表"""
88 Base.metadata.create_all(bind=self.engine)
89
90 def get_session(self) -> Session:
91 """獲取數據庫會話"""
92 return self.SessionLocal()
93
94 def save_ticket(self, session: Session, ticket: dict):
95 """保存工單"""
96 db_ticket = TicketModel(**ticket)
97 session.add(db_ticket)
98 session.commit()
99 session.refresh(db_ticket)
100 return db_ticket
101
102 def update_ticket(self, session: Session, ticket_id: str, updates: dict):
103 """更新工單"""
104 ticket = session.query(TicketModel).filter(
105 TicketModel.ticket_id == ticket_id
106 ).first()
107
108 if ticket:
109 for key, value in updates.items():
110 setattr(ticket, key, value)
111 ticket.updated_at = datetime.utcnow()
112 session.commit()
113
114 return ticket
115
116 def get_ticket(self, session: Session, ticket_id: str) -> Optional[TicketModel]:
117 """獲取工單"""
118 return session.query(TicketModel).filter(
119 TicketModel.ticket_id == ticket_id
120 ).first()
121
122# 全局數據庫實例
123db = Database()
Agent 實現(agents/base_agent.py 和具體 Agent)
1from abc import ABC, abstractmethod
2from typing import Dict, Any, Optional
3from datetime import datetime
4import logging
5from langchain_anthropic import ChatAnthropic
6from langchain_core.prompts import ChatPromptTemplate
7
8logger = logging.getLogger(__name__)
9
10class BaseAgent(ABC):
11 """所有 Agent 的基類"""
12
13 def __init__(self, name: str, description: str):
14 self.name = name
15 self.description = description
16 self.model = ChatAnthropic(
17 model="claude-3-5-sonnet-20241022",
18 temperature=0.7,
19 max_tokens=4096
20 )
21 self.execution_count = 0
22 self.total_duration_ms = 0
23
24 @abstractmethod
25 def get_prompt_template(self) -> ChatPromptTemplate:
26 """獲取 Agent 的提示詞模板"""
27 pass
28
29 @abstractmethod
30 def parse_response(self, response: str) -> Dict[str, Any]:
31 """解析模型回應"""
32 pass
33
34 async def execute(self, **kwargs) -> Dict[str, Any]:
35 """執行 Agent"""
36 start_time = datetime.now()
37
38 try:
39 prompt = self.get_prompt_template()
40 chain = prompt | self.model
41
42 response = chain.invoke(kwargs)
43 result = self.parse_response(response.content)
44
45 duration_ms = int((datetime.now() - start_time).total_seconds() * 1000)
46 self.execution_count += 1
47 self.total_duration_ms += duration_ms
48
49 logger.info(
50 f"{self.name} executed successfully",
51 extra={
52 "agent": self.name,
53 "duration_ms": duration_ms,
54 "result_keys": list(result.keys())
55 }
56 )
57
58 return result
59
60 except Exception as e:
61 logger.error(
62 f"{self.name} execution failed",
63 extra={"agent": self.name, "error": str(e)},
64 exc_info=True
65 )
66 raise
67
68# 具體 Agent 實現
69class ClassificationAgent(BaseAgent):
70 def __init__(self):
71 super().__init__(
72 name="ClassificationAgent",
73 description="分類工單類型"
74 )
75
76 def get_prompt_template(self) -> ChatPromptTemplate:
77 return ChatPromptTemplate.from_template("""
78 分析以下工單,提取類別和關鍵信息。
79
80 主題:{subject}
81 描述:{description}
82
83 返回 JSON:
84 {{
85 "category": "technical|billing|account|feature_request|bug_report|other",
86 "confidence": 0.0-1.0,
87 "key_issues": ["issue1", "issue2"],
88 "severity": "low|medium|high|critical"
89 }}
90 """)
91
92 def parse_response(self, response: str) -> Dict[str, Any]:
93 import json
94 return json.loads(response)
95
96class AnalysisAgent(BaseAgent):
97 def __init__(self):
98 super().__init__(
99 name="AnalysisAgent",
100 description="分析工單內容"
101 )
102
103 def get_prompt_template(self) -> ChatPromptTemplate:
104 return ChatPromptTemplate.from_template("""
105 根據以下信息進行深入分析。
106
107 工單:{subject}
108 類別:{category}
109
110 返回 JSON:
111 {{
112 "analysis": "詳細分析...",
113 "affected_systems": ["sys1", "sys2"],
114 "suggested_solution": "...",
115 "confidence": 0.0-1.0
116 }}
117 """)
118
119 def parse_response(self, response: str) -> Dict[str, Any]:
120 import json
121 return json.loads(response)
122
123class ResponseGeneratorAgent(BaseAgent):
124 def __init__(self):
125 super().__init__(
126 name="ResponseGeneratorAgent",
127 description="生成客服回應"
128 )
129
130 def get_prompt_template(self) -> ChatPromptTemplate:
131 return ChatPromptTemplate.from_template("""
132 根據以下工單信息生成專業的客服回覆。
133
134 主題:{subject}
135 類別:{category}
136 分析:{analysis}
137
138 生成友好、專業的回覆(中文,200-400 字):
139 """)
140
141 def parse_response(self, response: str) -> Dict[str, Any]:
142 return {"response": response}
LangGraph 工作流(workflows/ticket_workflow.py)
1from dataclasses import dataclass, field
2from typing import Optional
3from langgraph.graph import StateGraph, START, END
4from datetime import datetime
5import logging
6
7from agents.base_agent import (
8 ClassificationAgent,
9 AnalysisAgent,
10 ResponseGeneratorAgent
11)
12
13logger = logging.getLogger(__name__)
14
15@dataclass
16class TicketWorkflowState:
17 """工作流狀態"""
18 ticket_id: str
19 user_id: str
20 subject: str
21 description: str
22
23 # 處理結果
24 category: Optional[str] = None
25 analysis: Optional[str] = None
26 response: Optional[str] = None
27
28 # 狀態追蹤
29 current_step: str = "init"
30 steps_completed: list = field(default_factory=list)
31 errors: list = field(default_factory=list)
32 retry_count: int = 0
33
34class TicketWorkflow:
35 def __init__(self):
36 self.classification_agent = ClassificationAgent()
37 self.analysis_agent = AnalysisAgent()
38 self.response_agent = ResponseGeneratorAgent()
39
40 def classify_step(self, state: TicketWorkflowState) -> TicketWorkflowState:
41 """第 1 步:分類"""
42 try:
43 result = self.classification_agent.execute(
44 subject=state.subject,
45 description=state.description
46 )
47
48 state.category = result["category"]
49 state.steps_completed.append("classify")
50 state.current_step = "classify"
51
52 except Exception as e:
53 state.errors.append(f"Classification failed: {str(e)}")
54 logger.error(f"Classification error: {e}")
55
56 return state
57
58 def analyze_step(self, state: TicketWorkflowState) -> TicketWorkflowState:
59 """第 2 步:分析"""
60 try:
61 result = self.analysis_agent.execute(
62 subject=state.subject,
63 category=state.category
64 )
65
66 state.analysis = result["analysis"]
67 state.steps_completed.append("analyze")
68 state.current_step = "analyze"
69
70 except Exception as e:
71 state.errors.append(f"Analysis failed: {str(e)}")
72
73 return state
74
75 def generate_response_step(self, state: TicketWorkflowState) -> TicketWorkflowState:
76 """第 3 步:生成回應"""
77 try:
78 result = self.response_agent.execute(
79 subject=state.subject,
80 category=state.category,
81 analysis=state.analysis
82 )
83
84 state.response = result["response"]
85 state.steps_completed.append("generate_response")
86 state.current_step = "generate_response"
87
88 except Exception as e:
89 state.errors.append(f"Response generation failed: {str(e)}")
90
91 return state
92
93 def should_continue(self, state: TicketWorkflowState) -> str:
94 """決定是否繼續"""
95 if state.errors and state.retry_count < 3:
96 state.retry_count += 1
97 return "classify" # 重新開始
98 elif state.errors:
99 return "end"
100 else:
101 return "end"
102
103 def build(self) -> StateGraph:
104 """構建工作流圖"""
105 workflow = StateGraph(TicketWorkflowState)
106
107 # 添加節點
108 workflow.add_node("classify", self.classify_step)
109 workflow.add_node("analyze", self.analyze_step)
110 workflow.add_node("generate_response", self.generate_response_step)
111
112 # 添加邊
113 workflow.add_edge(START, "classify")
114 workflow.add_edge("classify", "analyze")
115 workflow.add_edge("analyze", "generate_response")
116 workflow.add_conditional_edges(
117 "generate_response",
118 self.should_continue,
119 {"classify": "classify", "end": END}
120 )
121
122 return workflow.compile()
123
124# 全局工作流實例
125ticket_workflow = TicketWorkflow().build()
FastAPI 應用(main.py)
1from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends
2from fastapi.responses import JSONResponse
3from sqlalchemy.orm import Session
4import uuid
5from datetime import datetime
6import asyncio
7import logging
8
9from config import settings
10from models.schemas import (
11 CreateTicketRequest,
12 TicketResponse,
13 TaskStatusResponse
14)
15from persistence.database import db, TicketModel
16from workflows.ticket_workflow import ticket_workflow, TicketWorkflowState
17
18# 設置日誌
19logging.basicConfig(
20 level=settings.LOG_LEVEL,
21 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
22)
23logger = logging.getLogger(__name__)
24
25# 創建 FastAPI 應用
26app = FastAPI(
27 title=settings.APP_NAME,
28 debug=settings.DEBUG
29)
30
31# 任務隊列(簡單的內存存儲,生產應使用 Celery/Redis)
32task_storage = {}
33
34def get_db() -> Session:
35 """獲取數據庫連接"""
36 db_session = db.get_session()
37 try:
38 yield db_session
39 finally:
40 db_session.close()
41
42async def process_ticket_async(
43 ticket_id: str,
44 request: CreateTicketRequest,
45 db_session: Session
46):
47 """非同步處理工單"""
48 start_time = datetime.utcnow()
49
50 try:
51 # 建立初始狀態
52 workflow_state = TicketWorkflowState(
53 ticket_id=ticket_id,
54 user_id=request.user_id,
55 subject=request.subject,
56 description=request.description
57 )
58
59 # 執行工作流
60 result = ticket_workflow.invoke(workflow_state)
61
62 # 計算耗時
63 duration_ms = int((datetime.utcnow() - start_time).total_seconds() * 1000)
64
65 # 保存結果
66 db.update_ticket(db_session, ticket_id, {
67 "category": result.category,
68 "ai_response": result.response,
69 "status": "completed",
70 "workflow_state": result.__dict__,
71 "processing_steps": result.steps_completed,
72 "completed_at": datetime.utcnow()
73 })
74
75 # 存儲任務結果
76 task_storage[ticket_id] = {
77 "status": "completed",
78 "result": {
79 "ticket_id": ticket_id,
80 "category": result.category,
81 "response": result.response,
82 "steps": result.steps_completed
83 },
84 "duration_ms": duration_ms
85 }
86
87 logger.info(
88 f"Ticket {ticket_id} processed successfully",
89 extra={"duration_ms": duration_ms}
90 )
91
92 except Exception as e:
93 logger.error(
94 f"Ticket {ticket_id} processing failed",
95 extra={"error": str(e)},
96 exc_info=True
97 )
98
99 task_storage[ticket_id] = {
100 "status": "failed",
101 "error": str(e)
102 }
103
104 db.update_ticket(db_session, ticket_id, {
105 "status": "failed",
106 "error_message": str(e)
107 })
108
109@app.post("/tickets", response_model=dict)
110async def create_ticket(
111 request: CreateTicketRequest,
112 background_tasks: BackgroundTasks,
113 db_session: Session = Depends(get_db)
114):
115 """創建新工單"""
116
117 ticket_id = f"TKT-{uuid.uuid4().hex[:8].upper()}"
118
119 try:
120 # 保存工單到數據庫
121 ticket_data = {
122 "ticket_id": ticket_id,
123 "user_id": request.user_id,
124 "subject": request.subject,
125 "description": request.description,
126 "priority": request.priority,
127 "status": "open"
128 }
129
130 db.save_ticket(db_session, ticket_data)
131
132 # 後台處理
133 background_tasks.add_task(
134 process_ticket_async,
135 ticket_id,
136 request,
137 db_session
138 )
139
140 # 初始化任務存儲
141 task_storage[ticket_id] = {
142 "status": "queued",
143 "created_at": datetime.utcnow().isoformat()
144 }
145
146 return {
147 "ticket_id": ticket_id,
148 "status": "accepted",
149 "message": "工單已接收,正在處理中"
150 }
151
152 except Exception as e:
153 logger.error(f"Failed to create ticket: {e}")
154 raise HTTPException(status_code=500, detail=str(e))
155
156@app.get("/tickets/{ticket_id}")
157async def get_ticket(ticket_id: str, db_session: Session = Depends(get_db)):
158 """獲取工單詳情"""
159
160 ticket = db.get_ticket(db_session, ticket_id)
161
162 if not ticket:
163 raise HTTPException(status_code=404, detail="工單不存在")
164
165 return {
166 "ticket_id": ticket.ticket_id,
167 "status": ticket.status,
168 "category": ticket.category,
169 "priority": ticket.priority,
170 "response": ticket.ai_response,
171 "created_at": ticket.created_at,
172 "updated_at": ticket.updated_at
173 }
174
175@app.get("/tasks/{ticket_id}")
176async def get_task_status(ticket_id: str):
177 """查詢工單處理狀態"""
178
179 task = task_storage.get(ticket_id)
180
181 if not task:
182 raise HTTPException(status_code=404, detail="任務不存在")
183
184 return {
185 "ticket_id": ticket_id,
186 "status": task["status"],
187 "result": task.get("result"),
188 "error": task.get("error"),
189 "duration_ms": task.get("duration_ms")
190 }
191
192@app.get("/health")
193async def health_check():
194 """健康檢查"""
195 return {"status": "healthy"}
196
197if __name__ == "__main__":
198 import uvicorn
199 uvicorn.run(
200 app,
201 host=settings.API_HOST,
202 port=settings.API_PORT,
203 workers=settings.API_WORKERS
204 )
監控層(monitoring/logger.py 和 metrics.py)
1# monitoring/logger.py
2import logging
3import json
4from datetime import datetime
5
6class JSONFormatter(logging.Formatter):
7 """JSON 格式的日誌"""
8
9 def format(self, record):
10 log_obj = {
11 "timestamp": datetime.utcnow().isoformat(),
12 "level": record.levelname,
13 "logger": record.name,
14 "message": record.getMessage(),
15 "module": record.module,
16 "function": record.funcName,
17 "line": record.lineno
18 }
19
20 # 添加自定義字段
21 if hasattr(record, '__dict__'):
22 log_obj.update(record.__dict__)
23
24 return json.dumps(log_obj, ensure_ascii=False)
25
26def setup_logging():
27 """設置日誌系統"""
28 logger = logging.getLogger()
29
30 # 控制台處理器
31 console_handler = logging.StreamHandler()
32 console_handler.setFormatter(JSONFormatter())
33
34 logger.addHandler(console_handler)
35 logger.setLevel(logging.INFO)
36
37 return logger
38
39# monitoring/metrics.py
40from prometheus_client import Counter, Histogram, Gauge
41
42# 定義指標
43tickets_created = Counter(
44 'tickets_created_total',
45 'Total tickets created',
46 ['priority']
47)
48
49workflow_duration = Histogram(
50 'workflow_duration_seconds',
51 'Workflow execution duration',
52 ['status']
53)
54
55active_workflows = Gauge(
56 'active_workflows',
57 'Number of active workflows'
58)
59
60agent_execution_time = Histogram(
61 'agent_execution_seconds',
62 'Agent execution time',
63 ['agent_name']
64)
65
66def record_ticket_created(priority: str):
67 tickets_created.labels(priority=priority).inc()
68
69def record_workflow_duration(duration_ms: float, status: str):
70 workflow_duration.labels(status=status).observe(duration_ms / 1000)
requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
sqlalchemy==2.0.23
psycopg2-binary==2.9.9
pydantic==2.5.0
pydantic-settings==2.1.0
langchain==0.1.0
langchain-anthropic==0.1.0
langgraph==0.0.1
redis==5.0.1
prometheus-client==0.19.0
python-dotenv==1.0.0
tenacity==8.2.3
Docker 部署(docker-compose.yml)
1version: '3.8'
2
3services:
4 postgres:
5 image: postgres:16
6 environment:
7 POSTGRES_USER: ai_user
8 POSTGRES_PASSWORD: ai_password
9 POSTGRES_DB: ai_backend
10 ports:
11 - "5432:5432"
12 volumes:
13 - postgres_data:/var/lib/postgresql/data
14
15 redis:
16 image: redis:7
17 ports:
18 - "6379:6379"
19 volumes:
20 - redis_data:/data
21
22 api:
23 build: .
24 ports:
25 - "8000:8000"
26 environment:
27 DATABASE_URL: postgresql://ai_user:ai_password@postgres:5432/ai_backend
28 REDIS_URL: redis://redis:6379/0
29 ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}
30 depends_on:
31 - postgres
32 - redis
33 command: uvicorn main:app --host 0.0.0.0 --port 8000
34
35 prometheus:
36 image: prom/prometheus:latest
37 ports:
38 - "9090:9090"
39 volumes:
40 - ./prometheus.yml:/etc/prometheus/prometheus.yml
41 command:
42 - '--config.file=/etc/prometheus/prometheus.yml'
43
44volumes:
45 postgres_data:
46 redis_data:
總結
這個完整的實現涵蓋了生產級 LangGraph AI 後端的所有關鍵組件:
✅ 清晰的架構分層 ✅ 完整的 ORM 數據持久化 ✅ 可重用的 Agent 基類 ✅ 模塊化的工作流設計 ✅ RESTful API 接口 ✅ 非同步處理和隊列管理 ✅ 完整的監控和日誌 ✅ Docker 容器化部署
現在你可以直接基於這個範本快速構建自己的 AI 應用!
