LangGraph AI 後端核心代碼實現:生產級代碼範本

從概念到代碼,這篇文章提供完整可運行的 LangGraph AI 後端實現。我們會構建一個真實的系統:接收請求、執行工作流、持久化結果、提供監控。


完整項目結構

ai-backend/
├── main.py              # FastAPI 應用入口
├── models/              # 數據模型
│   ├── __init__.py
│   └── schemas.py       # Pydantic schemas
├── workflows/           # LangGraph 工作流
│   ├── __init__.py
│   ├── base.py          # 基礎工作流類
│   └── ticket_workflow.py
├── agents/              # Agent 實現
│   ├── __init__.py
│   ├── base_agent.py
│   ├── classifier.py
│   ├── analyzer.py
│   └── response_generator.py
├── persistence/         # 數據持久化
│   ├── __init__.py
│   └── database.py
├── monitoring/          # 監控和日誌
│   ├── __init__.py
│   ├── logger.py
│   └── metrics.py
├── config.py            # 配置管理
├── requirements.txt
└── docker-compose.yml

數據模型層(models/schemas.py)

 1from pydantic import BaseModel, Field
 2from typing import Optional, List
 3from datetime import datetime
 4from enum import Enum
 5
 6class PriorityLevel(str, Enum):
 7    LOW = "low"
 8    MEDIUM = "medium"
 9    HIGH = "high"
10    CRITICAL = "critical"
11
12class TicketStatus(str, Enum):
13    OPEN = "open"
14    CLASSIFYING = "classifying"
15    ASSIGNED = "assigned"
16    IN_PROGRESS = "in_progress"
17    RESOLVED = "resolved"
18    CLOSED = "closed"
19
20# 請求模型
21class CreateTicketRequest(BaseModel):
22    user_id: str = Field(..., min_length=1, description="用戶 ID")
23    subject: str = Field(..., min_length=5, max_length=500)
24    description: str = Field(..., min_length=10, max_length=5000)
25    priority: Optional[PriorityLevel] = PriorityLevel.MEDIUM
26    attachments: Optional[List[str]] = []
27
28# 回應模型
29class TicketResponse(BaseModel):
30    ticket_id: str
31    status: TicketStatus
32    category: Optional[str]
33    priority: PriorityLevel
34    assigned_department: Optional[str]
35    response: Optional[str]
36    created_at: datetime
37    updated_at: datetime
38    processing_time_ms: int
39
40# 任務查詢模型
41class TaskStatusResponse(BaseModel):
42    task_id: str
43    status: str  # queued, processing, completed, failed
44    progress: float  # 0-1
45    result: Optional[dict] = None
46    error: Optional[str] = None
47    updated_at: datetime

核心配置(config.py)

 1from pydantic_settings import BaseSettings
 2from typing import Optional
 3
 4class Settings(BaseSettings):
 5    """應用配置"""
 6    
 7    # 基礎配置
 8    APP_NAME: str = "AI Backend"
 9    DEBUG: bool = False
10    LOG_LEVEL: str = "INFO"
11    
12    # 模型配置
13    LLM_MODEL: str = "claude-3-5-sonnet-20241022"
14    LLM_TEMPERATURE: float = 0.7
15    LLM_MAX_TOKENS: int = 4096
16    
17    # API 配置
18    API_HOST: str = "0.0.0.0"
19    API_PORT: int = 8000
20    API_WORKERS: int = 4
21    
22    # 數據庫配置
23    DATABASE_URL: str = "postgresql://user:password@localhost:5432/ai_backend"
24    REDIS_URL: str = "redis://localhost:6379/0"
25    
26    # 工作流配置
27    MAX_WORKFLOW_RETRIES: int = 3
28    WORKFLOW_TIMEOUT_SECONDS: int = 300
29    
30    # LangGraph 配置
31    ENABLE_CHECKPOINTING: bool = True
32    CHECKPOINT_DIR: str = ".langraph_checkpoints"
33    
34    # 監控配置
35    PROMETHEUS_PORT: int = 8001
36    ENABLE_METRICS: bool = True
37    
38    class Config:
39        env_file = ".env"
40        case_sensitive = True
41
42# 全局配置實例
43settings = Settings()

數據庫層(persistence/database.py)

  1from sqlalchemy import (
  2    create_engine, Column, String, Integer, JSON, 
  3    DateTime, Boolean, Text, Enum as SQLEnum
  4)
  5from sqlalchemy.ext.declarative import declarative_base
  6from sqlalchemy.orm import sessionmaker, Session
  7from datetime import datetime
  8from typing import Optional
  9from config import settings
 10
 11Base = declarative_base()
 12
 13# ORM 模型
 14class TicketModel(Base):
 15    __tablename__ = "tickets"
 16    
 17    ticket_id = Column(String, primary_key=True, index=True)
 18    user_id = Column(String, index=True)
 19    
 20    # 工單信息
 21    subject = Column(String(500))
 22    description = Column(Text)
 23    status = Column(String, default="open")
 24    category = Column(String, nullable=True)
 25    priority = Column(String, default="medium")
 26    
 27    # 處理信息
 28    assigned_department = Column(String, nullable=True)
 29    assigned_agent_id = Column(String, nullable=True)
 30    
 31    # AI 生成的內容
 32    ai_response = Column(Text, nullable=True)
 33    response_quality_score = Column(Integer, nullable=True)
 34    
 35    # 工作流追蹤
 36    workflow_state = Column(JSON)  # 完整的狀態快照
 37    processing_steps = Column(JSON)  # [{step, timestamp, result}]
 38    
 39    # 时间戳
 40    created_at = Column(DateTime, default=datetime.utcnow, index=True)
 41    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
 42    completed_at = Column(DateTime, nullable=True)
 43    
 44    # 错误追踪
 45    error_message = Column(Text, nullable=True)
 46    retry_count = Column(Integer, default=0)
 47
 48# 工作流執行記錄
 49class WorkflowExecutionModel(Base):
 50    __tablename__ = "workflow_executions"
 51    
 52    execution_id = Column(String, primary_key=True, index=True)
 53    ticket_id = Column(String, index=True)
 54    
 55    # 執行狀態
 56    status = Column(String)  # queued, running, completed, failed
 57    
 58    # 執行細節
 59    state_snapshot = Column(JSON)  # 當前狀態快照
 60    steps_completed = Column(JSON)  # 已完成的步驟列表
 61    
 62    # 性能指標
 63    start_time = Column(DateTime)
 64    end_time = Column(DateTime, nullable=True)
 65    duration_ms = Column(Integer, nullable=True)
 66    
 67    # 錯誤信息
 68    error = Column(Text, nullable=True)
 69    retry_count = Column(Integer, default=0)
 70
 71# 數據庫操作類
 72class Database:
 73    def __init__(self):
 74        self.engine = create_engine(
 75            settings.DATABASE_URL,
 76            pool_pre_ping=True,
 77            echo=settings.DEBUG
 78        )
 79        self.SessionLocal = sessionmaker(
 80            autocommit=False,
 81            autoflush=False,
 82            bind=self.engine
 83        )
 84        self.create_tables()
 85    
 86    def create_tables(self):
 87        """創建所有表"""
 88        Base.metadata.create_all(bind=self.engine)
 89    
 90    def get_session(self) -> Session:
 91        """獲取數據庫會話"""
 92        return self.SessionLocal()
 93    
 94    def save_ticket(self, session: Session, ticket: dict):
 95        """保存工單"""
 96        db_ticket = TicketModel(**ticket)
 97        session.add(db_ticket)
 98        session.commit()
 99        session.refresh(db_ticket)
100        return db_ticket
101    
102    def update_ticket(self, session: Session, ticket_id: str, updates: dict):
103        """更新工單"""
104        ticket = session.query(TicketModel).filter(
105            TicketModel.ticket_id == ticket_id
106        ).first()
107        
108        if ticket:
109            for key, value in updates.items():
110                setattr(ticket, key, value)
111            ticket.updated_at = datetime.utcnow()
112            session.commit()
113        
114        return ticket
115    
116    def get_ticket(self, session: Session, ticket_id: str) -> Optional[TicketModel]:
117        """獲取工單"""
118        return session.query(TicketModel).filter(
119            TicketModel.ticket_id == ticket_id
120        ).first()
121
122# 全局數據庫實例
123db = Database()

Agent 實現(agents/base_agent.py 和具體 Agent)

  1from abc import ABC, abstractmethod
  2from typing import Dict, Any, Optional
  3from datetime import datetime
  4import logging
  5from langchain_anthropic import ChatAnthropic
  6from langchain_core.prompts import ChatPromptTemplate
  7
  8logger = logging.getLogger(__name__)
  9
 10class BaseAgent(ABC):
 11    """所有 Agent 的基類"""
 12    
 13    def __init__(self, name: str, description: str):
 14        self.name = name
 15        self.description = description
 16        self.model = ChatAnthropic(
 17            model="claude-3-5-sonnet-20241022",
 18            temperature=0.7,
 19            max_tokens=4096
 20        )
 21        self.execution_count = 0
 22        self.total_duration_ms = 0
 23    
 24    @abstractmethod
 25    def get_prompt_template(self) -> ChatPromptTemplate:
 26        """獲取 Agent 的提示詞模板"""
 27        pass
 28    
 29    @abstractmethod
 30    def parse_response(self, response: str) -> Dict[str, Any]:
 31        """解析模型回應"""
 32        pass
 33    
 34    async def execute(self, **kwargs) -> Dict[str, Any]:
 35        """執行 Agent"""
 36        start_time = datetime.now()
 37        
 38        try:
 39            prompt = self.get_prompt_template()
 40            chain = prompt | self.model
 41            
 42            response = chain.invoke(kwargs)
 43            result = self.parse_response(response.content)
 44            
 45            duration_ms = int((datetime.now() - start_time).total_seconds() * 1000)
 46            self.execution_count += 1
 47            self.total_duration_ms += duration_ms
 48            
 49            logger.info(
 50                f"{self.name} executed successfully",
 51                extra={
 52                    "agent": self.name,
 53                    "duration_ms": duration_ms,
 54                    "result_keys": list(result.keys())
 55                }
 56            )
 57            
 58            return result
 59        
 60        except Exception as e:
 61            logger.error(
 62                f"{self.name} execution failed",
 63                extra={"agent": self.name, "error": str(e)},
 64                exc_info=True
 65            )
 66            raise
 67
 68# 具體 Agent 實現
 69class ClassificationAgent(BaseAgent):
 70    def __init__(self):
 71        super().__init__(
 72            name="ClassificationAgent",
 73            description="分類工單類型"
 74        )
 75    
 76    def get_prompt_template(self) -> ChatPromptTemplate:
 77        return ChatPromptTemplate.from_template("""
 78        分析以下工單,提取類別和關鍵信息。
 79        
 80        主題:{subject}
 81        描述:{description}
 82        
 83        返回 JSON:
 84        {{
 85            "category": "technical|billing|account|feature_request|bug_report|other",
 86            "confidence": 0.0-1.0,
 87            "key_issues": ["issue1", "issue2"],
 88            "severity": "low|medium|high|critical"
 89        }}
 90        """)
 91    
 92    def parse_response(self, response: str) -> Dict[str, Any]:
 93        import json
 94        return json.loads(response)
 95
 96class AnalysisAgent(BaseAgent):
 97    def __init__(self):
 98        super().__init__(
 99            name="AnalysisAgent",
100            description="分析工單內容"
101        )
102    
103    def get_prompt_template(self) -> ChatPromptTemplate:
104        return ChatPromptTemplate.from_template("""
105        根據以下信息進行深入分析。
106        
107        工單:{subject}
108        類別:{category}
109        
110        返回 JSON:
111        {{
112            "analysis": "詳細分析...",
113            "affected_systems": ["sys1", "sys2"],
114            "suggested_solution": "...",
115            "confidence": 0.0-1.0
116        }}
117        """)
118    
119    def parse_response(self, response: str) -> Dict[str, Any]:
120        import json
121        return json.loads(response)
122
123class ResponseGeneratorAgent(BaseAgent):
124    def __init__(self):
125        super().__init__(
126            name="ResponseGeneratorAgent",
127            description="生成客服回應"
128        )
129    
130    def get_prompt_template(self) -> ChatPromptTemplate:
131        return ChatPromptTemplate.from_template("""
132        根據以下工單信息生成專業的客服回覆。
133        
134        主題:{subject}
135        類別:{category}
136        分析:{analysis}
137        
138        生成友好、專業的回覆(中文,200-400 字):
139        """)
140    
141    def parse_response(self, response: str) -> Dict[str, Any]:
142        return {"response": response}

LangGraph 工作流(workflows/ticket_workflow.py)

  1from dataclasses import dataclass, field
  2from typing import Optional
  3from langgraph.graph import StateGraph, START, END
  4from datetime import datetime
  5import logging
  6
  7from agents.base_agent import (
  8    ClassificationAgent,
  9    AnalysisAgent,
 10    ResponseGeneratorAgent
 11)
 12
 13logger = logging.getLogger(__name__)
 14
 15@dataclass
 16class TicketWorkflowState:
 17    """工作流狀態"""
 18    ticket_id: str
 19    user_id: str
 20    subject: str
 21    description: str
 22    
 23    # 處理結果
 24    category: Optional[str] = None
 25    analysis: Optional[str] = None
 26    response: Optional[str] = None
 27    
 28    # 狀態追蹤
 29    current_step: str = "init"
 30    steps_completed: list = field(default_factory=list)
 31    errors: list = field(default_factory=list)
 32    retry_count: int = 0
 33
 34class TicketWorkflow:
 35    def __init__(self):
 36        self.classification_agent = ClassificationAgent()
 37        self.analysis_agent = AnalysisAgent()
 38        self.response_agent = ResponseGeneratorAgent()
 39    
 40    def classify_step(self, state: TicketWorkflowState) -> TicketWorkflowState:
 41        """第 1 步:分類"""
 42        try:
 43            result = self.classification_agent.execute(
 44                subject=state.subject,
 45                description=state.description
 46            )
 47            
 48            state.category = result["category"]
 49            state.steps_completed.append("classify")
 50            state.current_step = "classify"
 51            
 52        except Exception as e:
 53            state.errors.append(f"Classification failed: {str(e)}")
 54            logger.error(f"Classification error: {e}")
 55        
 56        return state
 57    
 58    def analyze_step(self, state: TicketWorkflowState) -> TicketWorkflowState:
 59        """第 2 步:分析"""
 60        try:
 61            result = self.analysis_agent.execute(
 62                subject=state.subject,
 63                category=state.category
 64            )
 65            
 66            state.analysis = result["analysis"]
 67            state.steps_completed.append("analyze")
 68            state.current_step = "analyze"
 69            
 70        except Exception as e:
 71            state.errors.append(f"Analysis failed: {str(e)}")
 72        
 73        return state
 74    
 75    def generate_response_step(self, state: TicketWorkflowState) -> TicketWorkflowState:
 76        """第 3 步:生成回應"""
 77        try:
 78            result = self.response_agent.execute(
 79                subject=state.subject,
 80                category=state.category,
 81                analysis=state.analysis
 82            )
 83            
 84            state.response = result["response"]
 85            state.steps_completed.append("generate_response")
 86            state.current_step = "generate_response"
 87            
 88        except Exception as e:
 89            state.errors.append(f"Response generation failed: {str(e)}")
 90        
 91        return state
 92    
 93    def should_continue(self, state: TicketWorkflowState) -> str:
 94        """決定是否繼續"""
 95        if state.errors and state.retry_count < 3:
 96            state.retry_count += 1
 97            return "classify"  # 重新開始
 98        elif state.errors:
 99            return "end"
100        else:
101            return "end"
102    
103    def build(self) -> StateGraph:
104        """構建工作流圖"""
105        workflow = StateGraph(TicketWorkflowState)
106        
107        # 添加節點
108        workflow.add_node("classify", self.classify_step)
109        workflow.add_node("analyze", self.analyze_step)
110        workflow.add_node("generate_response", self.generate_response_step)
111        
112        # 添加邊
113        workflow.add_edge(START, "classify")
114        workflow.add_edge("classify", "analyze")
115        workflow.add_edge("analyze", "generate_response")
116        workflow.add_conditional_edges(
117            "generate_response",
118            self.should_continue,
119            {"classify": "classify", "end": END}
120        )
121        
122        return workflow.compile()
123
124# 全局工作流實例
125ticket_workflow = TicketWorkflow().build()

FastAPI 應用(main.py)

  1from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends
  2from fastapi.responses import JSONResponse
  3from sqlalchemy.orm import Session
  4import uuid
  5from datetime import datetime
  6import asyncio
  7import logging
  8
  9from config import settings
 10from models.schemas import (
 11    CreateTicketRequest,
 12    TicketResponse,
 13    TaskStatusResponse
 14)
 15from persistence.database import db, TicketModel
 16from workflows.ticket_workflow import ticket_workflow, TicketWorkflowState
 17
 18# 設置日誌
 19logging.basicConfig(
 20    level=settings.LOG_LEVEL,
 21    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
 22)
 23logger = logging.getLogger(__name__)
 24
 25# 創建 FastAPI 應用
 26app = FastAPI(
 27    title=settings.APP_NAME,
 28    debug=settings.DEBUG
 29)
 30
 31# 任務隊列(簡單的內存存儲,生產應使用 Celery/Redis)
 32task_storage = {}
 33
 34def get_db() -> Session:
 35    """獲取數據庫連接"""
 36    db_session = db.get_session()
 37    try:
 38        yield db_session
 39    finally:
 40        db_session.close()
 41
 42async def process_ticket_async(
 43    ticket_id: str,
 44    request: CreateTicketRequest,
 45    db_session: Session
 46):
 47    """非同步處理工單"""
 48    start_time = datetime.utcnow()
 49    
 50    try:
 51        # 建立初始狀態
 52        workflow_state = TicketWorkflowState(
 53            ticket_id=ticket_id,
 54            user_id=request.user_id,
 55            subject=request.subject,
 56            description=request.description
 57        )
 58        
 59        # 執行工作流
 60        result = ticket_workflow.invoke(workflow_state)
 61        
 62        # 計算耗時
 63        duration_ms = int((datetime.utcnow() - start_time).total_seconds() * 1000)
 64        
 65        # 保存結果
 66        db.update_ticket(db_session, ticket_id, {
 67            "category": result.category,
 68            "ai_response": result.response,
 69            "status": "completed",
 70            "workflow_state": result.__dict__,
 71            "processing_steps": result.steps_completed,
 72            "completed_at": datetime.utcnow()
 73        })
 74        
 75        # 存儲任務結果
 76        task_storage[ticket_id] = {
 77            "status": "completed",
 78            "result": {
 79                "ticket_id": ticket_id,
 80                "category": result.category,
 81                "response": result.response,
 82                "steps": result.steps_completed
 83            },
 84            "duration_ms": duration_ms
 85        }
 86        
 87        logger.info(
 88            f"Ticket {ticket_id} processed successfully",
 89            extra={"duration_ms": duration_ms}
 90        )
 91    
 92    except Exception as e:
 93        logger.error(
 94            f"Ticket {ticket_id} processing failed",
 95            extra={"error": str(e)},
 96            exc_info=True
 97        )
 98        
 99        task_storage[ticket_id] = {
100            "status": "failed",
101            "error": str(e)
102        }
103        
104        db.update_ticket(db_session, ticket_id, {
105            "status": "failed",
106            "error_message": str(e)
107        })
108
109@app.post("/tickets", response_model=dict)
110async def create_ticket(
111    request: CreateTicketRequest,
112    background_tasks: BackgroundTasks,
113    db_session: Session = Depends(get_db)
114):
115    """創建新工單"""
116    
117    ticket_id = f"TKT-{uuid.uuid4().hex[:8].upper()}"
118    
119    try:
120        # 保存工單到數據庫
121        ticket_data = {
122            "ticket_id": ticket_id,
123            "user_id": request.user_id,
124            "subject": request.subject,
125            "description": request.description,
126            "priority": request.priority,
127            "status": "open"
128        }
129        
130        db.save_ticket(db_session, ticket_data)
131        
132        # 後台處理
133        background_tasks.add_task(
134            process_ticket_async,
135            ticket_id,
136            request,
137            db_session
138        )
139        
140        # 初始化任務存儲
141        task_storage[ticket_id] = {
142            "status": "queued",
143            "created_at": datetime.utcnow().isoformat()
144        }
145        
146        return {
147            "ticket_id": ticket_id,
148            "status": "accepted",
149            "message": "工單已接收,正在處理中"
150        }
151    
152    except Exception as e:
153        logger.error(f"Failed to create ticket: {e}")
154        raise HTTPException(status_code=500, detail=str(e))
155
156@app.get("/tickets/{ticket_id}")
157async def get_ticket(ticket_id: str, db_session: Session = Depends(get_db)):
158    """獲取工單詳情"""
159    
160    ticket = db.get_ticket(db_session, ticket_id)
161    
162    if not ticket:
163        raise HTTPException(status_code=404, detail="工單不存在")
164    
165    return {
166        "ticket_id": ticket.ticket_id,
167        "status": ticket.status,
168        "category": ticket.category,
169        "priority": ticket.priority,
170        "response": ticket.ai_response,
171        "created_at": ticket.created_at,
172        "updated_at": ticket.updated_at
173    }
174
175@app.get("/tasks/{ticket_id}")
176async def get_task_status(ticket_id: str):
177    """查詢工單處理狀態"""
178    
179    task = task_storage.get(ticket_id)
180    
181    if not task:
182        raise HTTPException(status_code=404, detail="任務不存在")
183    
184    return {
185        "ticket_id": ticket_id,
186        "status": task["status"],
187        "result": task.get("result"),
188        "error": task.get("error"),
189        "duration_ms": task.get("duration_ms")
190    }
191
192@app.get("/health")
193async def health_check():
194    """健康檢查"""
195    return {"status": "healthy"}
196
197if __name__ == "__main__":
198    import uvicorn
199    uvicorn.run(
200        app,
201        host=settings.API_HOST,
202        port=settings.API_PORT,
203        workers=settings.API_WORKERS
204    )

監控層(monitoring/logger.py 和 metrics.py)

 1# monitoring/logger.py
 2import logging
 3import json
 4from datetime import datetime
 5
 6class JSONFormatter(logging.Formatter):
 7    """JSON 格式的日誌"""
 8    
 9    def format(self, record):
10        log_obj = {
11            "timestamp": datetime.utcnow().isoformat(),
12            "level": record.levelname,
13            "logger": record.name,
14            "message": record.getMessage(),
15            "module": record.module,
16            "function": record.funcName,
17            "line": record.lineno
18        }
19        
20        # 添加自定義字段
21        if hasattr(record, '__dict__'):
22            log_obj.update(record.__dict__)
23        
24        return json.dumps(log_obj, ensure_ascii=False)
25
26def setup_logging():
27    """設置日誌系統"""
28    logger = logging.getLogger()
29    
30    # 控制台處理器
31    console_handler = logging.StreamHandler()
32    console_handler.setFormatter(JSONFormatter())
33    
34    logger.addHandler(console_handler)
35    logger.setLevel(logging.INFO)
36    
37    return logger
38
39# monitoring/metrics.py
40from prometheus_client import Counter, Histogram, Gauge
41
42# 定義指標
43tickets_created = Counter(
44    'tickets_created_total',
45    'Total tickets created',
46    ['priority']
47)
48
49workflow_duration = Histogram(
50    'workflow_duration_seconds',
51    'Workflow execution duration',
52    ['status']
53)
54
55active_workflows = Gauge(
56    'active_workflows',
57    'Number of active workflows'
58)
59
60agent_execution_time = Histogram(
61    'agent_execution_seconds',
62    'Agent execution time',
63    ['agent_name']
64)
65
66def record_ticket_created(priority: str):
67    tickets_created.labels(priority=priority).inc()
68
69def record_workflow_duration(duration_ms: float, status: str):
70    workflow_duration.labels(status=status).observe(duration_ms / 1000)

requirements.txt

fastapi==0.104.1
uvicorn==0.24.0
sqlalchemy==2.0.23
psycopg2-binary==2.9.9
pydantic==2.5.0
pydantic-settings==2.1.0
langchain==0.1.0
langchain-anthropic==0.1.0
langgraph==0.0.1
redis==5.0.1
prometheus-client==0.19.0
python-dotenv==1.0.0
tenacity==8.2.3

Docker 部署(docker-compose.yml)

 1version: '3.8'
 2
 3services:
 4  postgres:
 5    image: postgres:16
 6    environment:
 7      POSTGRES_USER: ai_user
 8      POSTGRES_PASSWORD: ai_password
 9      POSTGRES_DB: ai_backend
10    ports:
11      - "5432:5432"
12    volumes:
13      - postgres_data:/var/lib/postgresql/data
14
15  redis:
16    image: redis:7
17    ports:
18      - "6379:6379"
19    volumes:
20      - redis_data:/data
21
22  api:
23    build: .
24    ports:
25      - "8000:8000"
26    environment:
27      DATABASE_URL: postgresql://ai_user:ai_password@postgres:5432/ai_backend
28      REDIS_URL: redis://redis:6379/0
29      ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}
30    depends_on:
31      - postgres
32      - redis
33    command: uvicorn main:app --host 0.0.0.0 --port 8000
34
35  prometheus:
36    image: prom/prometheus:latest
37    ports:
38      - "9090:9090"
39    volumes:
40      - ./prometheus.yml:/etc/prometheus/prometheus.yml
41    command:
42      - '--config.file=/etc/prometheus/prometheus.yml'
43
44volumes:
45  postgres_data:
46  redis_data:

總結

這個完整的實現涵蓋了生產級 LangGraph AI 後端的所有關鍵組件:

✅ 清晰的架構分層 ✅ 完整的 ORM 數據持久化 ✅ 可重用的 Agent 基類 ✅ 模塊化的工作流設計 ✅ RESTful API 接口 ✅ 非同步處理和隊列管理 ✅ 完整的監控和日誌 ✅ Docker 容器化部署

現在你可以直接基於這個範本快速構建自己的 AI 應用!

Yen

Yen

Yen