CrewAI 完全指南(三):進階技巧——Flows 事件驅動、Memory 記憶體、與生產部署

前言

前兩篇建立了 CrewAI 的基礎和實戰應用。
這篇是進階篇,涵蓋讓 CrewAI 真正走到生產環境的關鍵技術:

  1. Flows:當 Crew 的線性流程不夠用時
  2. Memory 記憶體:讓 Agent 記得過去的對話和經驗
  3. 錯誤處理與成本控制:生產環境的必要設計
  4. 部署:把 CrewAI 包成 API 服務

Part 1:CrewAI Flows——事件驅動的複雜工作流程

Crew 的限制

Process.sequential 是線性的:任務一個接一個執行。但真實世界的工作流程往往需要:

  • 條件分支:根據分析結果走不同的路徑
  • 迴圈:重複執行直到滿足條件
  • 平行執行:多個 Crew 同時跑,最後彙整
  • 狀態管理:跨步驟保存和傳遞複雜的狀態

CrewAI Flows 就是為了處理這些複雜場景設計的。

Flow 的三個核心 Decorator

 1from crewai.flow.flow import Flow, listen, start, router
 2from pydantic import BaseModel
 3
 4class MyFlow(Flow):
 5
 6    @start()
 7    def step_one(self):
 8        """Flow 的入口點,Flow 啟動時執行"""
 9        return "step one result"
10
11    @listen(step_one)
12    def step_two(self, step_one_output):
13        """當 step_one 完成後自動觸發,可以接收上一步的輸出"""
14        return f"processed: {step_one_output}"
15
16    @router(step_two)
17    def decide_next(self, step_two_output):
18        """根據 step_two 的輸出決定下一步走哪條路"""
19        if "error" in step_two_output:
20            return "error_path"
21        return "success_path"
22
23    @listen("success_path")
24    def handle_success(self):
25        return "成功!"
26
27    @listen("error_path")
28    def handle_error(self):
29        return "處理錯誤..."

狀態管理:用 Pydantic 定義 Flow 的全域狀態

  1from crewai.flow.flow import Flow, listen, start, router
  2from pydantic import BaseModel
  3from typing import Optional, List
  4
  5class ContentPipelineState(BaseModel):
  6    """Flow 的全域狀態,所有步驟都可以讀寫"""
  7    topic: str = ""
  8    research_done: bool = False
  9    research_notes: str = ""
 10    draft: str = ""
 11    quality_score: float = 0.0
 12    revision_count: int = 0
 13    final_article: str = ""
 14    publish_ready: bool = False
 15
 16
 17class ContentPipelineFlow(Flow[ContentPipelineState]):
 18    """
 19    完整的內容生產 Flow:研究 → 撰寫 → 品質評估 → 修訂(如需要)→ 發布
 20    展示條件分支和迴圈的使用
 21    """
 22
 23    @start()
 24    def initialize(self):
 25        """初始化,設定主題"""
 26        print(f"🚀 開始內容生產流程,主題:{self.state.topic}")
 27
 28    @listen(initialize)
 29    def run_research_crew(self):
 30        """呼叫研究 Crew"""
 31        from crewai import Agent, Task, Crew, Process
 32        from crewai_tools import SerperDevTool
 33
 34        researcher = Agent(
 35            role="研究員",
 36            goal="深入研究指定主題",
 37            backstory="經驗豐富的內容研究員",
 38            tools=[SerperDevTool()],
 39            llm="gpt-4o-mini",
 40        )
 41
 42        research_task = Task(
 43            description=f"研究 {self.state.topic},產出詳細的研究筆記",
 44            expected_output="詳細的研究筆記,Markdown 格式",
 45            agent=researcher,
 46        )
 47
 48        crew = Crew(agents=[researcher], tasks=[research_task])
 49        result = crew.kickoff()
 50
 51        # 把結果存到 Flow 的 state
 52        self.state.research_notes = result.raw
 53        self.state.research_done = True
 54        print(f"✅ 研究完成,約 {len(result.raw)} 字")
 55
 56    @listen(run_research_crew)
 57    def run_writing_crew(self):
 58        """呼叫撰寫 Crew"""
 59        from crewai import Agent, Task, Crew
 60
 61        writer = Agent(
 62            role="部落客",
 63            goal="撰寫引人入勝的文章",
 64            backstory="擅長技術寫作的部落客",
 65            llm="gpt-4o",
 66        )
 67
 68        writing_task = Task(
 69            description=f"""根據以下研究筆記,撰寫一篇部落格文章:
 70
 71{self.state.research_notes}
 72
 73主題:{self.state.topic}
 74要求:1200-1800 字,Markdown 格式""",
 75            expected_output="完整的部落格文章,Markdown 格式",
 76            agent=writer,
 77        )
 78
 79        crew = Crew(agents=[writer], tasks=[writing_task])
 80        result = crew.kickoff()
 81        self.state.draft = result.raw
 82        print(f"✅ 草稿撰寫完成")
 83
 84    @listen(run_writing_crew)
 85    def evaluate_quality(self):
 86        """用 LLM 評估文章品質,決定是否需要修訂"""
 87        import openai
 88        import json
 89
 90        client = openai.OpenAI()
 91        prompt = f"""請評估以下文章的品質(0-10 分),並提供評分理由。
 92
 93文章:
 94{self.state.draft[:3000]}...
 95
 96以 JSON 格式輸出:{{"score": 7.5, "reasons": ["優點1", "缺點1"], "suggestions": ["改進建議1"]}}"""
 97
 98        response = client.chat.completions.create(
 99            model="gpt-4o-mini",
100            messages=[{"role": "user", "content": prompt}],
101            response_format={"type": "json_object"},
102        )
103
104        evaluation = json.loads(response.choices[0].message.content)
105        self.state.quality_score = evaluation["score"]
106        print(f"📊 品質評分:{self.state.quality_score}/10")
107        return evaluation
108
109    @router(evaluate_quality)
110    def check_quality(self, evaluation):
111        """根據品質分數決定下一步"""
112        if self.state.quality_score >= 8.0:
113            return "publish"
114        elif self.state.revision_count >= 2:
115            # 最多修訂 2 次,防止無限迴圈
116            print("⚠️ 已達最大修訂次數,直接發布")
117            return "publish"
118        else:
119            return "revise"
120
121    @listen("revise")
122    def run_revision(self):
123        """修訂文章(迴圈:可能執行多次)"""
124        self.state.revision_count += 1
125        print(f"🔄 開始第 {self.state.revision_count} 次修訂...")
126
127        import openai
128        client = openai.OpenAI()
129
130        prompt = f"""請根據以下改進建議,修訂這篇文章。
131
132原文(前 2000 字):
133{self.state.draft[:2000]}
134
135改進方向:品質分數 {self.state.quality_score}/10,需要提升至 8 分以上。
136請大幅改善文章結構、流暢度和資訊深度。
137
138輸出修訂後的完整文章:"""
139
140        response = client.chat.completions.create(
141            model="gpt-4o",
142            messages=[{"role": "user", "content": prompt}],
143        )
144        self.state.draft = response.choices[0].message.content
145
146        # 修訂後重新評估(回到 evaluate_quality)
147        self.evaluate_quality()
148
149    @listen("publish")
150    def finalize_and_publish(self):
151        """最終發布步驟"""
152        self.state.final_article = self.state.draft
153        self.state.publish_ready = True
154        print(f"🎉 文章發布就緒!最終品質分數:{self.state.quality_score}/10")
155
156        # 實際上可以呼叫 CMS API 或寫入資料庫
157        with open(f"output_{self.state.topic[:20]}.md", "w", encoding="utf-8") as f:
158            f.write(self.state.final_article)
159
160
161# 執行 Flow
162flow = ContentPipelineFlow()
163flow.state.topic = "量子計算的商業應用"
164flow.kickoff()
165
166print(f"\n總修訂次數:{flow.state.revision_count}")
167print(f"最終品質分數:{flow.state.quality_score}")
168print(f"文章已就緒:{flow.state.publish_ready}")

平行執行多個 Crew

 1class ParallelResearchFlow(Flow):
 2    """同時研究多個子主題,最後彙整"""
 3
 4    @start()
 5    def begin(self):
 6        self.subtopics = ["技術面", "市場面", "法規面"]
 7
 8    @listen(begin)
 9    def research_technical(self):
10        """研究技術面(與其他研究平行執行)"""
11        return self._run_research_crew("技術面")
12
13    @listen(begin)
14    def research_market(self):
15        """與 research_technical 同時執行"""
16        return self._run_research_crew("市場面")
17
18    @listen(begin)
19    def research_regulatory(self):
20        """與前兩個同時執行"""
21        return self._run_research_crew("法規面")
22
23    @listen(research_technical, research_market, research_regulatory)
24    def synthesize(self, tech, market, reg):
25        """等所有研究完成後彙整"""
26        print("所有子研究完成,開始彙整...")
27        # 整合三個研究結果
28
29    def _run_research_crew(self, aspect: str) -> str:
30        # ... 實作 Crew 邏輯
31        return f"{aspect} 的研究結果"

Part 2:Memory 記憶體——讓 Agent 記得歷史

CrewAI 提供三種記憶機制,各有不同的用途:

記憶體類型對比

類型作用範圍持久性適用場景
Short-term Memory單次 Crew 執行內任務間傳遞上下文
Long-term Memory跨多次執行是(存 DB)記住用戶偏好、歷史互動
Entity Memory特定實體的知識記住「關於 X 公司的所有事情」

啟用完整記憶體

 1from crewai import Crew, Process
 2from crewai.memory import LongTermMemory, ShortTermMemory, EntityMemory
 3from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage
 4
 5crew = Crew(
 6    agents=[...],
 7    tasks=[...],
 8    process=Process.sequential,
 9
10    # 啟用三種記憶體
11    memory=True,
12
13    # Long-term memory 存到本地 SQLite(也可以換成 PostgreSQL)
14    long_term_memory=LongTermMemory(
15        storage=LTMSQLiteStorage(db_path="./crew_memory.db")
16    ),
17
18    # Short-term memory 用向量 DB 儲存(方便語意搜尋)
19    short_term_memory=ShortTermMemory(),
20
21    # Entity memory 追蹤特定實體
22    entity_memory=EntityMemory(),
23
24    verbose=True,
25)

Long-term Memory 的實際效果

 1# 第一次執行
 2result1 = crew.kickoff(inputs={
 3    "customer_id": "C001",
 4    "question": "我想了解你們的 Enterprise 方案"
 5})
 6# Agent 記錄:C001 對 Enterprise 方案有興趣
 7
 8# 第二次執行(一週後)
 9result2 = crew.kickoff(inputs={
10    "customer_id": "C001",
11    "question": "你們的 API 有沒有限制?"
12})
13# Agent 自動回憶上次互動:「這位客戶上次詢問過 Enterprise 方案,
14#  這次問 API 限制,可能在評估升級的可行性」
15# → 回覆會更有針對性,自動帶入 Enterprise 方案的 API 限制資訊

自訂知識庫(Knowledge Sources)

除了 Memory,CrewAI 也支援靜態知識庫(公司文件、產品手冊):

 1from crewai import Agent, Crew
 2from crewai.knowledge.source.pdf_knowledge_source import PDFKnowledgeSource
 3from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
 4from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
 5
 6# 從不同來源建立知識庫
 7product_manual = PDFKnowledgeSource(file_paths=["product_manual_v3.pdf"])
 8faq_doc = TextFileKnowledgeSource(file_paths=["faq.md", "pricing.md"])
 9company_info = StringKnowledgeSource(content="""
10    公司名稱:TechCorp
11    成立年份:2020
12    主要產品:企業知識管理系統
13    聯絡信箱:support@techcorp.com
14    企業方案聯絡:sales@techcorp.com
15""")
16
17# 把知識庫給 Crew
18support_crew = Crew(
19    agents=[support_agent],
20    tasks=[support_task],
21    knowledge_sources=[product_manual, faq_doc, company_info],
22    memory=True,
23)

Part 3:錯誤處理與成本控制

錯誤處理

CrewAI 的工具呼叫或 LLM 呼叫可能失敗。以下是幾個關鍵的防護設計:

 1from crewai import Agent, Crew
 2from crewai.tools import tool
 3import time
 4import logging
 5
 6logger = logging.getLogger(__name__)
 7
 8# 工具層面的錯誤處理
 9@tool("外部 API 查詢")
10def call_external_api(query: str) -> str:
11    """呼叫外部 API 查詢資料,有自動重試機制。"""
12    import requests
13
14    for attempt in range(3):
15        try:
16            response = requests.get(
17                "https://api.example.com/data",
18                params={"q": query},
19                timeout=10,
20            )
21            response.raise_for_status()
22            return response.json()["result"]
23        except requests.Timeout:
24            if attempt < 2:
25                wait = 2 ** attempt  # 指數退避:1s, 2s
26                logger.warning(f"API timeout,{wait}s 後重試(第 {attempt+1} 次)")
27                time.sleep(wait)
28            else:
29                return "API 查詢逾時,請稍後再試。"
30        except requests.HTTPError as e:
31            return f"API 錯誤:{e.response.status_code}"
32        except Exception as e:
33            logger.error(f"未預期的錯誤:{e}")
34            return f"查詢失敗:{str(e)}"
35
36
37# Crew 層面的錯誤處理
38def run_crew_safely(crew: Crew, inputs: dict, max_retries: int = 2):
39    """帶重試的安全 Crew 執行器"""
40    for attempt in range(max_retries + 1):
41        try:
42            result = crew.kickoff(inputs=inputs)
43            return result
44        except Exception as e:
45            logger.error(f"Crew 執行失敗(第 {attempt+1} 次):{e}")
46            if attempt < max_retries:
47                logger.info(f"30 秒後重試...")
48                time.sleep(30)
49            else:
50                raise RuntimeError(f"Crew 在 {max_retries+1} 次嘗試後仍失敗:{e}")

成本控制

LLM API 費用可能是生產環境最大的支出之一。以下是幾個控制策略:

 1from crewai import Agent, Crew
 2import os
 3
 4# 策略 1:根據任務複雜度選擇模型
 5fast_agent = Agent(
 6    role="分類員",
 7    goal="快速分類工單類型",
 8    backstory="...",
 9    llm="gpt-4o-mini",    # 簡單分類用輕量模型(便宜 10-30 倍)
10    max_iter=5,            # 限制最大迭代次數
11)
12
13deep_agent = Agent(
14    role="策略分析師",
15    goal="深度分析市場情報",
16    backstory="...",
17    llm="gpt-4o",         # 複雜分析用強模型
18    max_iter=10,
19)
20
21# 策略 2:設定 max_rpm 避免 API 頻率限制和突發費用
22rate_limited_agent = Agent(
23    role="批次處理員",
24    goal="處理大量資料",
25    backstory="...",
26    llm="gpt-4o-mini",
27    max_rpm=10,    # 每分鐘最多 10 個請求,避免費用衝高
28)
29
30# 策略 3:啟用工具呼叫快取(相同輸入不重複呼叫)
31cached_crew = Crew(
32    agents=[...],
33    tasks=[...],
34    cache=True,    # 對相同的工具輸入快取結果
35)
36
37# 策略 4:監控 token 用量
38result = crew.kickoff(inputs={...})
39usage = result.token_usage
40print(f"本次執行:")
41print(f"  輸入 tokens:{usage.prompt_tokens}")
42print(f"  輸出 tokens:{usage.completion_tokens}")
43print(f"  總計:{usage.total_tokens}")
44# 假設 gpt-4o-mini: $0.15/1M input, $0.60/1M output
45cost_estimate = (usage.prompt_tokens * 0.15 + usage.completion_tokens * 0.60) / 1_000_000
46print(f"  估計費用:${cost_estimate:.4f} USD")

使用 Claude 作為 LLM 後端

CrewAI 支援多種 LLM,包含 Anthropic Claude:

 1import os
 2from crewai import Agent
 3
 4os.environ["ANTHROPIC_API_KEY"] = "your-anthropic-key"
 5
 6# 使用 Claude Sonnet(性價比高)
 7analyst = Agent(
 8    role="資深分析師",
 9    goal="提供深度的市場分析",
10    backstory="有豐富分析經驗的顧問",
11    llm="anthropic/claude-sonnet-4-6",  # CrewAI 支援 LiteLLM 格式
12)
13
14# 使用 Claude Haiku(輕量任務)
15classifier = Agent(
16    role="分類員",
17    goal="快速分類任務",
18    backstory="...",
19    llm="anthropic/claude-haiku-4-5-20251001",
20)

Part 4:部署到生產環境

用 FastAPI 包裝 CrewAI

把 Crew 變成一個可以被其他系統呼叫的 REST API:

  1# main.py
  2from fastapi import FastAPI, HTTPException, BackgroundTasks
  3from pydantic import BaseModel
  4from typing import Optional
  5import asyncio
  6import uuid
  7from datetime import datetime
  8
  9# 你的 Crew 定義(從前面章節)
 10from crews.content_crew import ContentCrew
 11from crews.support_crew import SupportCrew
 12
 13app = FastAPI(title="CrewAI API", version="1.0.0")
 14
 15# 儲存任務狀態(生產環境用 Redis 或 DB)
 16jobs: dict[str, dict] = {}
 17
 18
 19class ContentRequest(BaseModel):
 20    topic: str
 21    language: str = "zh-TW"
 22    target_length: str = "medium"  # short / medium / long
 23
 24
 25class SupportRequest(BaseModel):
 26    customer_id: str
 27    ticket_content: str
 28    priority: Optional[str] = None
 29
 30
 31class JobResponse(BaseModel):
 32    job_id: str
 33    status: str
 34    created_at: str
 35    result: Optional[dict] = None
 36    error: Optional[str] = None
 37
 38
 39# ---- 非同步任務執行 ----
 40
 41def run_content_crew(job_id: str, request: ContentRequest):
 42    """在背景執行 Content Crew"""
 43    try:
 44        jobs[job_id]["status"] = "running"
 45
 46        crew = ContentCrew().crew()
 47        result = crew.kickoff(inputs={
 48            "topic": request.topic,
 49            "language": request.language,
 50            "target_length": request.target_length,
 51        })
 52
 53        jobs[job_id]["status"]   = "completed"
 54        jobs[job_id]["result"]   = {"content": result.raw, "tokens": result.token_usage.total_tokens}
 55        jobs[job_id]["finished_at"] = datetime.utcnow().isoformat()
 56
 57    except Exception as e:
 58        jobs[job_id]["status"] = "failed"
 59        jobs[job_id]["error"]  = str(e)
 60
 61
 62def run_support_crew(job_id: str, request: SupportRequest):
 63    """在背景執行 Support Crew"""
 64    try:
 65        jobs[job_id]["status"] = "running"
 66
 67        crew = SupportCrew().crew()
 68        result = crew.kickoff(inputs={
 69            "customer_id": request.customer_id,
 70            "ticket_content": request.ticket_content,
 71        })
 72
 73        reply = result.pydantic
 74        jobs[job_id]["status"] = "completed"
 75        jobs[job_id]["result"] = {
 76            "classification": reply.classification.model_dump(),
 77            "draft_reply":    reply.draft_reply,
 78            "internal_note":  reply.internal_note,
 79            "assignee":       reply.suggested_assignee,
 80        }
 81    except Exception as e:
 82        jobs[job_id]["status"] = "failed"
 83        jobs[job_id]["error"]  = str(e)
 84
 85
 86# ---- API Endpoints ----
 87
 88@app.post("/content/generate", response_model=JobResponse)
 89async def generate_content(
 90    request: ContentRequest,
 91    background_tasks: BackgroundTasks
 92):
 93    """提交內容生成任務(非同步,立即回傳 job_id)"""
 94    job_id = str(uuid.uuid4())
 95    jobs[job_id] = {
 96        "status":     "queued",
 97        "created_at": datetime.utcnow().isoformat(),
 98        "type":       "content",
 99    }
100
101    # 在背景執行,不阻塞 API
102    background_tasks.add_task(run_content_crew, job_id, request)
103
104    return JobResponse(
105        job_id=job_id,
106        status="queued",
107        created_at=jobs[job_id]["created_at"],
108    )
109
110
111@app.post("/support/analyze", response_model=JobResponse)
112async def analyze_support_ticket(
113    request: SupportRequest,
114    background_tasks: BackgroundTasks
115):
116    """提交客服工單分析任務"""
117    job_id = str(uuid.uuid4())
118    jobs[job_id] = {
119        "status":     "queued",
120        "created_at": datetime.utcnow().isoformat(),
121        "type":       "support",
122    }
123    background_tasks.add_task(run_support_crew, job_id, request)
124    return JobResponse(job_id=job_id, status="queued", created_at=jobs[job_id]["created_at"])
125
126
127@app.get("/jobs/{job_id}", response_model=JobResponse)
128async def get_job_status(job_id: str):
129    """查詢任務狀態和結果"""
130    if job_id not in jobs:
131        raise HTTPException(status_code=404, detail="Job not found")
132
133    job = jobs[job_id]
134    return JobResponse(
135        job_id=job_id,
136        status=job["status"],
137        created_at=job["created_at"],
138        result=job.get("result"),
139        error=job.get("error"),
140    )
141
142
143@app.get("/health")
144async def health_check():
145    return {"status": "ok", "jobs_in_memory": len(jobs)}

啟動 API 服務

1# 安裝依賴
2pip install fastapi uvicorn
3
4# 啟動(開發模式)
5uvicorn main:app --reload --port 8000
6
7# 啟動(生產模式,多 worker)
8uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

呼叫 API 的使用範例

 1# 提交內容生成任務
 2curl -X POST http://localhost:8000/content/generate \
 3  -H "Content-Type: application/json" \
 4  -d '{"topic": "量子計算的商業應用", "target_length": "medium"}'
 5
 6# 回應:
 7# {"job_id": "abc123", "status": "queued", "created_at": "2026-05-23T10:00:00"}
 8
 9# 查詢任務進度
10curl http://localhost:8000/jobs/abc123
11
12# 完成後回應:
13# {
14#   "job_id": "abc123",
15#   "status": "completed",
16#   "result": {"content": "...", "tokens": 3542}
17# }

Docker 容器化

 1# Dockerfile
 2FROM python:3.11-slim
 3
 4WORKDIR /app
 5
 6COPY requirements.txt .
 7RUN pip install --no-cache-dir -r requirements.txt
 8
 9COPY . .
10
11# 設定環境變數(實際部署用 Secret Manager)
12ENV OPENAI_API_KEY=""
13ENV SERPER_API_KEY=""
14
15EXPOSE 8000
16
17CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
 1# 建立 image
 2docker build -t crewai-service .
 3
 4# 執行(注入 API key)
 5docker run -d \
 6  -p 8000:8000 \
 7  -e OPENAI_API_KEY="your-key" \
 8  -e SERPER_API_KEY="your-key" \
 9  -v $(pwd)/crew_memory.db:/app/crew_memory.db \  # 持久化記憶體 DB
10  crewai-service

完整的生產架構建議

用戶 / 觸發系統
    ↓
API Gateway(Nginx / AWS API Gateway)
    ↓
FastAPI 服務(CrewAI)
    ├── Redis Queue(任務佇列,防止過載)
    ├── CrewAI Flows(複雜工作流程)
    │   └── Crew 1, Crew 2, ... (各職能團隊)
    ├── LTM Database(PostgreSQL / SQLite)
    └── Vector DB(Chroma / Qdrant,用於 Short-term Memory)
    ↓
結果回調(Webhook / Slack / Email)

生產環境 Checklist

API 安全
  ☐ API Key 驗證(Header: X-API-Key)
  ☐ 請求頻率限制(Rate Limiting)
  ☐ 輸入驗證(Pydantic 已處理大部分)

可觀測性
  ☐ 結構化日誌(JSON 格式,送到 CloudWatch / Datadog)
  ☐ 任務執行時間追蹤
  ☐ Token 用量監控(設定每日預算告警)
  ☐ 錯誤率告警

可靠性
  ☐ 任務佇列(Redis Queue)防止 Crew 並發過高
  ☐ 失敗重試機制
  ☐ 超時設定(max_execution_time)
  ☐ Graceful shutdown

成本
  ☐ 根據任務類型選擇適合的 LLM
  ☐ 啟用工具結果快取(cache=True)
  ☐ 設定每個 Agent 的 max_iter 上限
  ☐ 月度費用預算設定(OpenAI Dashboard)

系列總結

CrewAI 系列三篇涵蓋了從入門到生產的完整路徑:

篇章主題關鍵技術
第一篇入門Agent、Task、Crew、Tool 四大元件
第二篇實戰競情分析、程式碼審查、客服自動化
第三篇進階Flows(@start/@listen/@router)、Memory、FastAPI 部署

CrewAI 的核心價值是讓「多角色協作」這個複雜的概念,變得非常直覺。
當你開始設計一個 Crew 時,想的不是「這段程式碼要怎麼寫」,而是「我需要什麼樣的團隊,每個人要做什麼」——這個思考方式本身就是 CrewAI 帶來的最大改變。

推薦的學習路徑:

  1. 用第一篇的範例建立你的第一個 Crew
  2. 選一個你目前在手動做的重複性工作,用第二篇的模式設計 Crew
  3. 當流程需要條件分支,導入 Flows
  4. 用 FastAPI 包裝,讓團隊的其他人也能使用

系列導覽

  • 第一篇:入門與核心概念
  • 第二篇:真實場景實戰——競情分析、程式碼審查、客服自動化
  • 第三篇(本篇):進階技巧——Flows、Memory、結構化輸出與生產部署