前言
前兩篇建立了 CrewAI 的基礎和實戰應用。
這篇是進階篇,涵蓋讓 CrewAI 真正走到生產環境的關鍵技術:
- Flows:當 Crew 的線性流程不夠用時
- Memory 記憶體:讓 Agent 記得過去的對話和經驗
- 錯誤處理與成本控制:生產環境的必要設計
- 部署:把 CrewAI 包成 API 服務
Part 1:CrewAI Flows——事件驅動的複雜工作流程
Crew 的限制
Process.sequential 是線性的:任務一個接一個執行。但真實世界的工作流程往往需要:
- 條件分支:根據分析結果走不同的路徑
- 迴圈:重複執行直到滿足條件
- 平行執行:多個 Crew 同時跑,最後彙整
- 狀態管理:跨步驟保存和傳遞複雜的狀態
CrewAI Flows 就是為了處理這些複雜場景設計的。
Flow 的三個核心 Decorator
1from crewai.flow.flow import Flow, listen, start, router
2from pydantic import BaseModel
3
4class MyFlow(Flow):
5
6 @start()
7 def step_one(self):
8 """Flow 的入口點,Flow 啟動時執行"""
9 return "step one result"
10
11 @listen(step_one)
12 def step_two(self, step_one_output):
13 """當 step_one 完成後自動觸發,可以接收上一步的輸出"""
14 return f"processed: {step_one_output}"
15
16 @router(step_two)
17 def decide_next(self, step_two_output):
18 """根據 step_two 的輸出決定下一步走哪條路"""
19 if "error" in step_two_output:
20 return "error_path"
21 return "success_path"
22
23 @listen("success_path")
24 def handle_success(self):
25 return "成功!"
26
27 @listen("error_path")
28 def handle_error(self):
29 return "處理錯誤..."
狀態管理:用 Pydantic 定義 Flow 的全域狀態
1from crewai.flow.flow import Flow, listen, start, router
2from pydantic import BaseModel
3from typing import Optional, List
4
5class ContentPipelineState(BaseModel):
6 """Flow 的全域狀態,所有步驟都可以讀寫"""
7 topic: str = ""
8 research_done: bool = False
9 research_notes: str = ""
10 draft: str = ""
11 quality_score: float = 0.0
12 revision_count: int = 0
13 final_article: str = ""
14 publish_ready: bool = False
15
16
17class ContentPipelineFlow(Flow[ContentPipelineState]):
18 """
19 完整的內容生產 Flow:研究 → 撰寫 → 品質評估 → 修訂(如需要)→ 發布
20 展示條件分支和迴圈的使用
21 """
22
23 @start()
24 def initialize(self):
25 """初始化,設定主題"""
26 print(f"🚀 開始內容生產流程,主題:{self.state.topic}")
27
28 @listen(initialize)
29 def run_research_crew(self):
30 """呼叫研究 Crew"""
31 from crewai import Agent, Task, Crew, Process
32 from crewai_tools import SerperDevTool
33
34 researcher = Agent(
35 role="研究員",
36 goal="深入研究指定主題",
37 backstory="經驗豐富的內容研究員",
38 tools=[SerperDevTool()],
39 llm="gpt-4o-mini",
40 )
41
42 research_task = Task(
43 description=f"研究 {self.state.topic},產出詳細的研究筆記",
44 expected_output="詳細的研究筆記,Markdown 格式",
45 agent=researcher,
46 )
47
48 crew = Crew(agents=[researcher], tasks=[research_task])
49 result = crew.kickoff()
50
51 # 把結果存到 Flow 的 state
52 self.state.research_notes = result.raw
53 self.state.research_done = True
54 print(f"✅ 研究完成,約 {len(result.raw)} 字")
55
56 @listen(run_research_crew)
57 def run_writing_crew(self):
58 """呼叫撰寫 Crew"""
59 from crewai import Agent, Task, Crew
60
61 writer = Agent(
62 role="部落客",
63 goal="撰寫引人入勝的文章",
64 backstory="擅長技術寫作的部落客",
65 llm="gpt-4o",
66 )
67
68 writing_task = Task(
69 description=f"""根據以下研究筆記,撰寫一篇部落格文章:
70
71{self.state.research_notes}
72
73主題:{self.state.topic}
74要求:1200-1800 字,Markdown 格式""",
75 expected_output="完整的部落格文章,Markdown 格式",
76 agent=writer,
77 )
78
79 crew = Crew(agents=[writer], tasks=[writing_task])
80 result = crew.kickoff()
81 self.state.draft = result.raw
82 print(f"✅ 草稿撰寫完成")
83
84 @listen(run_writing_crew)
85 def evaluate_quality(self):
86 """用 LLM 評估文章品質,決定是否需要修訂"""
87 import openai
88 import json
89
90 client = openai.OpenAI()
91 prompt = f"""請評估以下文章的品質(0-10 分),並提供評分理由。
92
93文章:
94{self.state.draft[:3000]}...
95
96以 JSON 格式輸出:{{"score": 7.5, "reasons": ["優點1", "缺點1"], "suggestions": ["改進建議1"]}}"""
97
98 response = client.chat.completions.create(
99 model="gpt-4o-mini",
100 messages=[{"role": "user", "content": prompt}],
101 response_format={"type": "json_object"},
102 )
103
104 evaluation = json.loads(response.choices[0].message.content)
105 self.state.quality_score = evaluation["score"]
106 print(f"📊 品質評分:{self.state.quality_score}/10")
107 return evaluation
108
109 @router(evaluate_quality)
110 def check_quality(self, evaluation):
111 """根據品質分數決定下一步"""
112 if self.state.quality_score >= 8.0:
113 return "publish"
114 elif self.state.revision_count >= 2:
115 # 最多修訂 2 次,防止無限迴圈
116 print("⚠️ 已達最大修訂次數,直接發布")
117 return "publish"
118 else:
119 return "revise"
120
121 @listen("revise")
122 def run_revision(self):
123 """修訂文章(迴圈:可能執行多次)"""
124 self.state.revision_count += 1
125 print(f"🔄 開始第 {self.state.revision_count} 次修訂...")
126
127 import openai
128 client = openai.OpenAI()
129
130 prompt = f"""請根據以下改進建議,修訂這篇文章。
131
132原文(前 2000 字):
133{self.state.draft[:2000]}
134
135改進方向:品質分數 {self.state.quality_score}/10,需要提升至 8 分以上。
136請大幅改善文章結構、流暢度和資訊深度。
137
138輸出修訂後的完整文章:"""
139
140 response = client.chat.completions.create(
141 model="gpt-4o",
142 messages=[{"role": "user", "content": prompt}],
143 )
144 self.state.draft = response.choices[0].message.content
145
146 # 修訂後重新評估(回到 evaluate_quality)
147 self.evaluate_quality()
148
149 @listen("publish")
150 def finalize_and_publish(self):
151 """最終發布步驟"""
152 self.state.final_article = self.state.draft
153 self.state.publish_ready = True
154 print(f"🎉 文章發布就緒!最終品質分數:{self.state.quality_score}/10")
155
156 # 實際上可以呼叫 CMS API 或寫入資料庫
157 with open(f"output_{self.state.topic[:20]}.md", "w", encoding="utf-8") as f:
158 f.write(self.state.final_article)
159
160
161# 執行 Flow
162flow = ContentPipelineFlow()
163flow.state.topic = "量子計算的商業應用"
164flow.kickoff()
165
166print(f"\n總修訂次數:{flow.state.revision_count}")
167print(f"最終品質分數:{flow.state.quality_score}")
168print(f"文章已就緒:{flow.state.publish_ready}")
平行執行多個 Crew
1class ParallelResearchFlow(Flow):
2 """同時研究多個子主題,最後彙整"""
3
4 @start()
5 def begin(self):
6 self.subtopics = ["技術面", "市場面", "法規面"]
7
8 @listen(begin)
9 def research_technical(self):
10 """研究技術面(與其他研究平行執行)"""
11 return self._run_research_crew("技術面")
12
13 @listen(begin)
14 def research_market(self):
15 """與 research_technical 同時執行"""
16 return self._run_research_crew("市場面")
17
18 @listen(begin)
19 def research_regulatory(self):
20 """與前兩個同時執行"""
21 return self._run_research_crew("法規面")
22
23 @listen(research_technical, research_market, research_regulatory)
24 def synthesize(self, tech, market, reg):
25 """等所有研究完成後彙整"""
26 print("所有子研究完成,開始彙整...")
27 # 整合三個研究結果
28
29 def _run_research_crew(self, aspect: str) -> str:
30 # ... 實作 Crew 邏輯
31 return f"{aspect} 的研究結果"
Part 2:Memory 記憶體——讓 Agent 記得歷史
CrewAI 提供三種記憶機制,各有不同的用途:
記憶體類型對比
| 類型 | 作用範圍 | 持久性 | 適用場景 |
|---|---|---|---|
| Short-term Memory | 單次 Crew 執行內 | 否 | 任務間傳遞上下文 |
| Long-term Memory | 跨多次執行 | 是(存 DB) | 記住用戶偏好、歷史互動 |
| Entity Memory | 特定實體的知識 | 是 | 記住「關於 X 公司的所有事情」 |
啟用完整記憶體
1from crewai import Crew, Process
2from crewai.memory import LongTermMemory, ShortTermMemory, EntityMemory
3from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage
4
5crew = Crew(
6 agents=[...],
7 tasks=[...],
8 process=Process.sequential,
9
10 # 啟用三種記憶體
11 memory=True,
12
13 # Long-term memory 存到本地 SQLite(也可以換成 PostgreSQL)
14 long_term_memory=LongTermMemory(
15 storage=LTMSQLiteStorage(db_path="./crew_memory.db")
16 ),
17
18 # Short-term memory 用向量 DB 儲存(方便語意搜尋)
19 short_term_memory=ShortTermMemory(),
20
21 # Entity memory 追蹤特定實體
22 entity_memory=EntityMemory(),
23
24 verbose=True,
25)
Long-term Memory 的實際效果
1# 第一次執行
2result1 = crew.kickoff(inputs={
3 "customer_id": "C001",
4 "question": "我想了解你們的 Enterprise 方案"
5})
6# Agent 記錄:C001 對 Enterprise 方案有興趣
7
8# 第二次執行(一週後)
9result2 = crew.kickoff(inputs={
10 "customer_id": "C001",
11 "question": "你們的 API 有沒有限制?"
12})
13# Agent 自動回憶上次互動:「這位客戶上次詢問過 Enterprise 方案,
14# 這次問 API 限制,可能在評估升級的可行性」
15# → 回覆會更有針對性,自動帶入 Enterprise 方案的 API 限制資訊
自訂知識庫(Knowledge Sources)
除了 Memory,CrewAI 也支援靜態知識庫(公司文件、產品手冊):
1from crewai import Agent, Crew
2from crewai.knowledge.source.pdf_knowledge_source import PDFKnowledgeSource
3from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
4from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
5
6# 從不同來源建立知識庫
7product_manual = PDFKnowledgeSource(file_paths=["product_manual_v3.pdf"])
8faq_doc = TextFileKnowledgeSource(file_paths=["faq.md", "pricing.md"])
9company_info = StringKnowledgeSource(content="""
10 公司名稱:TechCorp
11 成立年份:2020
12 主要產品:企業知識管理系統
13 聯絡信箱:support@techcorp.com
14 企業方案聯絡:sales@techcorp.com
15""")
16
17# 把知識庫給 Crew
18support_crew = Crew(
19 agents=[support_agent],
20 tasks=[support_task],
21 knowledge_sources=[product_manual, faq_doc, company_info],
22 memory=True,
23)
Part 3:錯誤處理與成本控制
錯誤處理
CrewAI 的工具呼叫或 LLM 呼叫可能失敗。以下是幾個關鍵的防護設計:
1from crewai import Agent, Crew
2from crewai.tools import tool
3import time
4import logging
5
6logger = logging.getLogger(__name__)
7
8# 工具層面的錯誤處理
9@tool("外部 API 查詢")
10def call_external_api(query: str) -> str:
11 """呼叫外部 API 查詢資料,有自動重試機制。"""
12 import requests
13
14 for attempt in range(3):
15 try:
16 response = requests.get(
17 "https://api.example.com/data",
18 params={"q": query},
19 timeout=10,
20 )
21 response.raise_for_status()
22 return response.json()["result"]
23 except requests.Timeout:
24 if attempt < 2:
25 wait = 2 ** attempt # 指數退避:1s, 2s
26 logger.warning(f"API timeout,{wait}s 後重試(第 {attempt+1} 次)")
27 time.sleep(wait)
28 else:
29 return "API 查詢逾時,請稍後再試。"
30 except requests.HTTPError as e:
31 return f"API 錯誤:{e.response.status_code}"
32 except Exception as e:
33 logger.error(f"未預期的錯誤:{e}")
34 return f"查詢失敗:{str(e)}"
35
36
37# Crew 層面的錯誤處理
38def run_crew_safely(crew: Crew, inputs: dict, max_retries: int = 2):
39 """帶重試的安全 Crew 執行器"""
40 for attempt in range(max_retries + 1):
41 try:
42 result = crew.kickoff(inputs=inputs)
43 return result
44 except Exception as e:
45 logger.error(f"Crew 執行失敗(第 {attempt+1} 次):{e}")
46 if attempt < max_retries:
47 logger.info(f"30 秒後重試...")
48 time.sleep(30)
49 else:
50 raise RuntimeError(f"Crew 在 {max_retries+1} 次嘗試後仍失敗:{e}")
成本控制
LLM API 費用可能是生產環境最大的支出之一。以下是幾個控制策略:
1from crewai import Agent, Crew
2import os
3
4# 策略 1:根據任務複雜度選擇模型
5fast_agent = Agent(
6 role="分類員",
7 goal="快速分類工單類型",
8 backstory="...",
9 llm="gpt-4o-mini", # 簡單分類用輕量模型(便宜 10-30 倍)
10 max_iter=5, # 限制最大迭代次數
11)
12
13deep_agent = Agent(
14 role="策略分析師",
15 goal="深度分析市場情報",
16 backstory="...",
17 llm="gpt-4o", # 複雜分析用強模型
18 max_iter=10,
19)
20
21# 策略 2:設定 max_rpm 避免 API 頻率限制和突發費用
22rate_limited_agent = Agent(
23 role="批次處理員",
24 goal="處理大量資料",
25 backstory="...",
26 llm="gpt-4o-mini",
27 max_rpm=10, # 每分鐘最多 10 個請求,避免費用衝高
28)
29
30# 策略 3:啟用工具呼叫快取(相同輸入不重複呼叫)
31cached_crew = Crew(
32 agents=[...],
33 tasks=[...],
34 cache=True, # 對相同的工具輸入快取結果
35)
36
37# 策略 4:監控 token 用量
38result = crew.kickoff(inputs={...})
39usage = result.token_usage
40print(f"本次執行:")
41print(f" 輸入 tokens:{usage.prompt_tokens}")
42print(f" 輸出 tokens:{usage.completion_tokens}")
43print(f" 總計:{usage.total_tokens}")
44# 假設 gpt-4o-mini: $0.15/1M input, $0.60/1M output
45cost_estimate = (usage.prompt_tokens * 0.15 + usage.completion_tokens * 0.60) / 1_000_000
46print(f" 估計費用:${cost_estimate:.4f} USD")
使用 Claude 作為 LLM 後端
CrewAI 支援多種 LLM,包含 Anthropic Claude:
1import os
2from crewai import Agent
3
4os.environ["ANTHROPIC_API_KEY"] = "your-anthropic-key"
5
6# 使用 Claude Sonnet(性價比高)
7analyst = Agent(
8 role="資深分析師",
9 goal="提供深度的市場分析",
10 backstory="有豐富分析經驗的顧問",
11 llm="anthropic/claude-sonnet-4-6", # CrewAI 支援 LiteLLM 格式
12)
13
14# 使用 Claude Haiku(輕量任務)
15classifier = Agent(
16 role="分類員",
17 goal="快速分類任務",
18 backstory="...",
19 llm="anthropic/claude-haiku-4-5-20251001",
20)
Part 4:部署到生產環境
用 FastAPI 包裝 CrewAI
把 Crew 變成一個可以被其他系統呼叫的 REST API:
1# main.py
2from fastapi import FastAPI, HTTPException, BackgroundTasks
3from pydantic import BaseModel
4from typing import Optional
5import asyncio
6import uuid
7from datetime import datetime
8
9# 你的 Crew 定義(從前面章節)
10from crews.content_crew import ContentCrew
11from crews.support_crew import SupportCrew
12
13app = FastAPI(title="CrewAI API", version="1.0.0")
14
15# 儲存任務狀態(生產環境用 Redis 或 DB)
16jobs: dict[str, dict] = {}
17
18
19class ContentRequest(BaseModel):
20 topic: str
21 language: str = "zh-TW"
22 target_length: str = "medium" # short / medium / long
23
24
25class SupportRequest(BaseModel):
26 customer_id: str
27 ticket_content: str
28 priority: Optional[str] = None
29
30
31class JobResponse(BaseModel):
32 job_id: str
33 status: str
34 created_at: str
35 result: Optional[dict] = None
36 error: Optional[str] = None
37
38
39# ---- 非同步任務執行 ----
40
41def run_content_crew(job_id: str, request: ContentRequest):
42 """在背景執行 Content Crew"""
43 try:
44 jobs[job_id]["status"] = "running"
45
46 crew = ContentCrew().crew()
47 result = crew.kickoff(inputs={
48 "topic": request.topic,
49 "language": request.language,
50 "target_length": request.target_length,
51 })
52
53 jobs[job_id]["status"] = "completed"
54 jobs[job_id]["result"] = {"content": result.raw, "tokens": result.token_usage.total_tokens}
55 jobs[job_id]["finished_at"] = datetime.utcnow().isoformat()
56
57 except Exception as e:
58 jobs[job_id]["status"] = "failed"
59 jobs[job_id]["error"] = str(e)
60
61
62def run_support_crew(job_id: str, request: SupportRequest):
63 """在背景執行 Support Crew"""
64 try:
65 jobs[job_id]["status"] = "running"
66
67 crew = SupportCrew().crew()
68 result = crew.kickoff(inputs={
69 "customer_id": request.customer_id,
70 "ticket_content": request.ticket_content,
71 })
72
73 reply = result.pydantic
74 jobs[job_id]["status"] = "completed"
75 jobs[job_id]["result"] = {
76 "classification": reply.classification.model_dump(),
77 "draft_reply": reply.draft_reply,
78 "internal_note": reply.internal_note,
79 "assignee": reply.suggested_assignee,
80 }
81 except Exception as e:
82 jobs[job_id]["status"] = "failed"
83 jobs[job_id]["error"] = str(e)
84
85
86# ---- API Endpoints ----
87
88@app.post("/content/generate", response_model=JobResponse)
89async def generate_content(
90 request: ContentRequest,
91 background_tasks: BackgroundTasks
92):
93 """提交內容生成任務(非同步,立即回傳 job_id)"""
94 job_id = str(uuid.uuid4())
95 jobs[job_id] = {
96 "status": "queued",
97 "created_at": datetime.utcnow().isoformat(),
98 "type": "content",
99 }
100
101 # 在背景執行,不阻塞 API
102 background_tasks.add_task(run_content_crew, job_id, request)
103
104 return JobResponse(
105 job_id=job_id,
106 status="queued",
107 created_at=jobs[job_id]["created_at"],
108 )
109
110
111@app.post("/support/analyze", response_model=JobResponse)
112async def analyze_support_ticket(
113 request: SupportRequest,
114 background_tasks: BackgroundTasks
115):
116 """提交客服工單分析任務"""
117 job_id = str(uuid.uuid4())
118 jobs[job_id] = {
119 "status": "queued",
120 "created_at": datetime.utcnow().isoformat(),
121 "type": "support",
122 }
123 background_tasks.add_task(run_support_crew, job_id, request)
124 return JobResponse(job_id=job_id, status="queued", created_at=jobs[job_id]["created_at"])
125
126
127@app.get("/jobs/{job_id}", response_model=JobResponse)
128async def get_job_status(job_id: str):
129 """查詢任務狀態和結果"""
130 if job_id not in jobs:
131 raise HTTPException(status_code=404, detail="Job not found")
132
133 job = jobs[job_id]
134 return JobResponse(
135 job_id=job_id,
136 status=job["status"],
137 created_at=job["created_at"],
138 result=job.get("result"),
139 error=job.get("error"),
140 )
141
142
143@app.get("/health")
144async def health_check():
145 return {"status": "ok", "jobs_in_memory": len(jobs)}
啟動 API 服務
1# 安裝依賴
2pip install fastapi uvicorn
3
4# 啟動(開發模式)
5uvicorn main:app --reload --port 8000
6
7# 啟動(生產模式,多 worker)
8uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
呼叫 API 的使用範例
1# 提交內容生成任務
2curl -X POST http://localhost:8000/content/generate \
3 -H "Content-Type: application/json" \
4 -d '{"topic": "量子計算的商業應用", "target_length": "medium"}'
5
6# 回應:
7# {"job_id": "abc123", "status": "queued", "created_at": "2026-05-23T10:00:00"}
8
9# 查詢任務進度
10curl http://localhost:8000/jobs/abc123
11
12# 完成後回應:
13# {
14# "job_id": "abc123",
15# "status": "completed",
16# "result": {"content": "...", "tokens": 3542}
17# }
Docker 容器化
1# Dockerfile
2FROM python:3.11-slim
3
4WORKDIR /app
5
6COPY requirements.txt .
7RUN pip install --no-cache-dir -r requirements.txt
8
9COPY . .
10
11# 設定環境變數(實際部署用 Secret Manager)
12ENV OPENAI_API_KEY=""
13ENV SERPER_API_KEY=""
14
15EXPOSE 8000
16
17CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
1# 建立 image
2docker build -t crewai-service .
3
4# 執行(注入 API key)
5docker run -d \
6 -p 8000:8000 \
7 -e OPENAI_API_KEY="your-key" \
8 -e SERPER_API_KEY="your-key" \
9 -v $(pwd)/crew_memory.db:/app/crew_memory.db \ # 持久化記憶體 DB
10 crewai-service
完整的生產架構建議
用戶 / 觸發系統
↓
API Gateway(Nginx / AWS API Gateway)
↓
FastAPI 服務(CrewAI)
├── Redis Queue(任務佇列,防止過載)
├── CrewAI Flows(複雜工作流程)
│ └── Crew 1, Crew 2, ... (各職能團隊)
├── LTM Database(PostgreSQL / SQLite)
└── Vector DB(Chroma / Qdrant,用於 Short-term Memory)
↓
結果回調(Webhook / Slack / Email)
生產環境 Checklist
API 安全
☐ API Key 驗證(Header: X-API-Key)
☐ 請求頻率限制(Rate Limiting)
☐ 輸入驗證(Pydantic 已處理大部分)
可觀測性
☐ 結構化日誌(JSON 格式,送到 CloudWatch / Datadog)
☐ 任務執行時間追蹤
☐ Token 用量監控(設定每日預算告警)
☐ 錯誤率告警
可靠性
☐ 任務佇列(Redis Queue)防止 Crew 並發過高
☐ 失敗重試機制
☐ 超時設定(max_execution_time)
☐ Graceful shutdown
成本
☐ 根據任務類型選擇適合的 LLM
☐ 啟用工具結果快取(cache=True)
☐ 設定每個 Agent 的 max_iter 上限
☐ 月度費用預算設定(OpenAI Dashboard)
系列總結
CrewAI 系列三篇涵蓋了從入門到生產的完整路徑:
| 篇章 | 主題 | 關鍵技術 |
|---|---|---|
| 第一篇 | 入門 | Agent、Task、Crew、Tool 四大元件 |
| 第二篇 | 實戰 | 競情分析、程式碼審查、客服自動化 |
| 第三篇 | 進階 | Flows(@start/@listen/@router)、Memory、FastAPI 部署 |
CrewAI 的核心價值是讓「多角色協作」這個複雜的概念,變得非常直覺。
當你開始設計一個 Crew 時,想的不是「這段程式碼要怎麼寫」,而是「我需要什麼樣的團隊,每個人要做什麼」——這個思考方式本身就是 CrewAI 帶來的最大改變。
推薦的學習路徑:
- 用第一篇的範例建立你的第一個 Crew
- 選一個你目前在手動做的重複性工作,用第二篇的模式設計 Crew
- 當流程需要條件分支,導入 Flows
- 用 FastAPI 包裝,讓團隊的其他人也能使用
系列導覽