在前一篇文章《多 Agent 系統的 Token 用量調優指南》中,我們介紹了 Prompt Caching 作為 Token 優化的首選策略。本文將深入實作層面,探討如何在真實系統中建構完整的快取架構,涵蓋從 Claude API 原生快取到應用層快取、再到 RAG 系統整合的完整解決方案。
快取架構總覽
在生產環境中,一個完整的 AI 應用快取策略通常包含多個層次:
┌─────────────────────────────────────────────────────────────────────┐
│ 多層快取架構 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Layer 1: Claude API Prompt Caching │
│ ├── 快取 System Prompt、工具定義等固定前綴 │
│ ├── 由 Anthropic 伺服器管理 │
│ └── 5 分鐘自動過期 │
│ │
│ Layer 2: 應用層記憶體快取 (In-Memory Cache) │
│ ├── 快取完整 API 回應 │
│ ├── 相同輸入直接返回,完全跳過 API 呼叫 │
│ └── 適用於重複性高的查詢 │
│ │
│ Layer 3: RAG / 向量資料庫快取 │
│ ├── 快取文件 Embeddings │
│ ├── 快取 Context 檢索結果 │
│ └── 減少重複的 Embedding 計算和相似度搜尋 │
│ │
│ Layer 4: 分散式快取 (Redis/Memcached) │
│ ├── 跨實例共享快取 │
│ ├── 適用於微服務架構 │
│ └── 支援快取失效策略 │
│ │
└─────────────────────────────────────────────────────────────────────┘
Layer 1:Claude API 原生 Prompt Caching
基本概念
Claude API 的 Prompt Caching 功能允許你快取訊息前綴,避免每次 API 呼叫都重新處理相同的內容。這對於包含大量固定內容(如 System Prompt、工具定義、背景知識)的應用特別有效。
API 呼叫結構:
┌──────────────────────────────────────────────────────────────┐
│ ┌────────────────────────────────────────┐ │
│ │ 可快取區域(固定前綴) │ ← cache_control │
│ │ • System Prompt │ │
│ │ • 工具定義 │ │
│ │ • 背景知識文件 │ │
│ │ • Few-shot 範例 │ │
│ └────────────────────────────────────────┘ │
│ ┌────────────────────────────────────────┐ │
│ │ 動態區域(每次變化) │ ← 不快取 │
│ │ • 使用者當前輸入 │ │
│ │ • 對話歷史(最近幾輪) │ │
│ └────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
基礎實作
1import anthropic
2from typing import Optional
3
4client = anthropic.Anthropic()
5
6class PromptCacheManager:
7 """Claude API Prompt Caching 管理器"""
8
9 def __init__(self, base_system_prompt: str, tools: Optional[list] = None):
10 """
11 初始化快取管理器
12
13 Args:
14 base_system_prompt: 基礎 System Prompt(將被快取)
15 tools: 工具定義列表(將被快取)
16 """
17 self.base_system_prompt = base_system_prompt
18 self.tools = tools or []
19 self._cache_stats = {
20 "cache_creation_tokens": 0,
21 "cache_read_tokens": 0,
22 "total_calls": 0
23 }
24
25 def _build_cached_system(self) -> list:
26 """構建帶快取控制的 System Content"""
27 return [{
28 "type": "text",
29 "text": self.base_system_prompt,
30 "cache_control": {"type": "ephemeral"}
31 }]
32
33 def _build_cached_tools(self) -> list:
34 """為工具定義添加快取控制"""
35 if not self.tools:
36 return []
37
38 # 在最後一個工具上添加快取控制點
39 cached_tools = self.tools.copy()
40 if cached_tools:
41 cached_tools[-1] = {
42 **cached_tools[-1],
43 "cache_control": {"type": "ephemeral"}
44 }
45 return cached_tools
46
47 def call_with_cache(
48 self,
49 messages: list,
50 model: str = "claude-sonnet-4-6",
51 max_tokens: int = 4096,
52 additional_system: Optional[str] = None
53 ) -> anthropic.types.Message:
54 """
55 使用 Prompt Caching 進行 API 呼叫
56
57 Args:
58 messages: 對話訊息列表
59 model: 模型名稱
60 max_tokens: 最大輸出 tokens
61 additional_system: 額外的動態 System 內容(不快取)
62
63 Returns:
64 API 回應
65 """
66 system_content = self._build_cached_system()
67
68 # 如果有額外的動態內容,追加但不快取
69 if additional_system:
70 system_content.append({
71 "type": "text",
72 "text": additional_system
73 })
74
75 kwargs = {
76 "model": model,
77 "max_tokens": max_tokens,
78 "system": system_content,
79 "messages": messages
80 }
81
82 # 添加快取的工具定義
83 if self.tools:
84 kwargs["tools"] = self._build_cached_tools()
85
86 response = client.messages.create(**kwargs)
87
88 # 記錄快取統計
89 self._update_stats(response.usage)
90 self._cache_stats["total_calls"] += 1
91
92 return response
93
94 def _update_stats(self, usage):
95 """更新快取統計資訊"""
96 self._cache_stats["cache_creation_tokens"] += getattr(
97 usage, "cache_creation_input_tokens", 0
98 )
99 self._cache_stats["cache_read_tokens"] += getattr(
100 usage, "cache_read_input_tokens", 0
101 )
102
103 def get_cache_stats(self) -> dict:
104 """取得快取統計報告"""
105 stats = self._cache_stats.copy()
106
107 if stats["total_calls"] > 1:
108 # 計算快取效益
109 total_cached = stats["cache_creation_tokens"] + stats["cache_read_tokens"]
110 if total_cached > 0:
111 stats["cache_hit_rate"] = (
112 stats["cache_read_tokens"] / total_cached * 100
113 )
114 # 假設快取讀取節省 90% 成本
115 stats["estimated_savings_pct"] = (
116 stats["cache_read_tokens"] * 0.9 / total_cached * 100
117 )
118
119 return stats
120
121
122# 使用範例
123if __name__ == "__main__":
124 # 定義大型 System Prompt(適合快取)
125 SYSTEM_PROMPT = """你是一個專業的程式碼助手。
126
127## 你的能力
128- 程式碼生成:Python、TypeScript、Go、Rust
129- 程式碼審查:安全性、效能、可讀性
130- 架構設計:微服務、事件驅動、CQRS
131
132## 輸出格式
133所有程式碼回應必須包含:
1341. 完整可執行的程式碼
1352. 簡要說明
1363. 使用範例
137
138## 程式碼風格指南
139- 使用有意義的變數名稱
140- 保持函數簡短(< 30 行)
141- 加入必要的錯誤處理
142- 遵循各語言的官方風格指南
143
144[此處可包含更多詳細的背景知識、範例程式碼等...]
145"""
146
147 # 定義工具
148 TOOLS = [
149 {
150 "name": "read_file",
151 "description": "讀取檔案內容",
152 "input_schema": {
153 "type": "object",
154 "properties": {
155 "path": {"type": "string", "description": "檔案路徑"}
156 },
157 "required": ["path"]
158 }
159 },
160 {
161 "name": "write_file",
162 "description": "寫入檔案",
163 "input_schema": {
164 "type": "object",
165 "properties": {
166 "path": {"type": "string"},
167 "content": {"type": "string"}
168 },
169 "required": ["path", "content"]
170 }
171 }
172 ]
173
174 # 建立快取管理器
175 cache_manager = PromptCacheManager(SYSTEM_PROMPT, TOOLS)
176
177 # 模擬多次呼叫
178 queries = [
179 "請用 Python 實作一個 LRU Cache",
180 "請用 TypeScript 實作一個 Event Emitter",
181 "請審查這段程式碼的效能問題"
182 ]
183
184 for query in queries:
185 response = cache_manager.call_with_cache(
186 messages=[{"role": "user", "content": query}]
187 )
188 print(f"\n查詢:{query[:30]}...")
189 print(f"回應長度:{len(response.content[0].text)} 字元")
190
191 # 輸出快取統計
192 stats = cache_manager.get_cache_stats()
193 print("\n" + "="*50)
194 print("快取統計:")
195 print(f" 總呼叫次數:{stats['total_calls']}")
196 print(f" 快取建立 tokens:{stats['cache_creation_tokens']:,}")
197 print(f" 快取讀取 tokens:{stats['cache_read_tokens']:,}")
198 if "cache_hit_rate" in stats:
199 print(f" 快取命中率:{stats['cache_hit_rate']:.1f}%")
200 print(f" 估計節省成本:{stats['estimated_savings_pct']:.1f}%")
進階:多 Breakpoint 快取策略
對於複雜的 prompt 結構,可以設置多個快取斷點,讓不同頻率變化的內容分別快取:
1class MultiBreakpointCacheManager:
2 """
3 多斷點快取管理器
4
5 支援將 System Prompt 分成多個區塊,每個區塊獨立快取。
6 適用於部分內容需要較頻繁更新的場景。
7 """
8
9 def __init__(self):
10 self.static_context = "" # 完全靜態(年為單位更新)
11 self.semi_static_context = "" # 半靜態(天為單位更新)
12 self.session_context = "" # Session 層級(小時為單位更新)
13
14 def set_static_context(self, content: str):
15 """設定完全靜態的背景知識"""
16 self.static_context = content
17
18 def set_semi_static_context(self, content: str):
19 """設定半靜態的內容(如每日更新的資料摘要)"""
20 self.semi_static_context = content
21
22 def set_session_context(self, content: str):
23 """設定 Session 層級的 context"""
24 self.session_context = content
25
26 def build_system_content(self) -> list:
27 """
28 構建多斷點快取的 System Content
29
30 結構:
31 ┌────────────────────────────┐
32 │ 靜態區塊 + cache_control │ ← 長期快取
33 ├────────────────────────────┤
34 │ 半靜態區塊 + cache_control │ ← 中期快取
35 ├────────────────────────────┤
36 │ Session 區塊 │ ← 不快取(每次變化)
37 └────────────────────────────┘
38 """
39 content = []
40
41 # 區塊 1:完全靜態(設置快取斷點)
42 if self.static_context:
43 content.append({
44 "type": "text",
45 "text": f"[背景知識]\n{self.static_context}",
46 "cache_control": {"type": "ephemeral"}
47 })
48
49 # 區塊 2:半靜態(設置第二個快取斷點)
50 if self.semi_static_context:
51 content.append({
52 "type": "text",
53 "text": f"[當前狀態]\n{self.semi_static_context}",
54 "cache_control": {"type": "ephemeral"}
55 })
56
57 # 區塊 3:Session 動態內容(不快取)
58 if self.session_context:
59 content.append({
60 "type": "text",
61 "text": f"[Session 資訊]\n{self.session_context}"
62 })
63
64 return content
65
66 def call(self, messages: list, **kwargs) -> anthropic.types.Message:
67 return client.messages.create(
68 model=kwargs.get("model", "claude-sonnet-4-6"),
69 max_tokens=kwargs.get("max_tokens", 4096),
70 system=self.build_system_content(),
71 messages=messages
72 )
73
74
75# 使用範例
76cache = MultiBreakpointCacheManager()
77
78# 設定長期不變的背景知識(例如公司政策、產品說明)
79cache.set_static_context("""
80公司產品線:
81- ProductA:企業級資料分析平台
82- ProductB:即時監控解決方案
83- ProductC:自動化報告系統
84
85技術棧規範:
86- 後端:Python 3.11+, FastAPI, PostgreSQL
87- 前端:React 18, TypeScript 5
88- 基礎設施:AWS, Kubernetes, Terraform
89""")
90
91# 設定每日更新的內容(例如今日重點、系統狀態)
92cache.set_semi_static_context("""
93今日系統狀態:
94- ProductA:正常運作
95- ProductB:維護中(預計 18:00 恢復)
96- ProductC:正常運作
97
98本週重點:
99- 正在進行 Q2 效能優化專案
100- 禁止部署到生產環境(程式碼凍結期)
101""")
102
103# 設定 Session 動態內容
104cache.set_session_context("""
105當前使用者:工程師 Jerry
106角色:Backend Developer
107目前任務:修復 API-1234 效能問題
108""")
Layer 2:應用層記憶體快取
Claude API 的 Prompt Caching 只能減少重複前綴的處理成本,但如果完全相同的請求重複出現,我們可以在應用層直接快取整個回應,完全跳過 API 呼叫。
基於 LRU Cache 的實作
1import hashlib
2import json
3import time
4from functools import lru_cache
5from dataclasses import dataclass, field
6from typing import Optional, Any
7from collections import OrderedDict
8import threading
9
10@dataclass
11class CacheEntry:
12 """快取條目"""
13 response: Any
14 created_at: float
15 hit_count: int = 0
16 last_accessed: float = field(default_factory=time.time)
17
18class InMemoryResponseCache:
19 """
20 應用層 LRU 回應快取
21
22 特點:
23 - 完全相同的請求直接返回快取結果
24 - 支援 TTL(存活時間)
25 - 支援最大容量限制(LRU 淘汰)
26 - 執行緒安全
27 """
28
29 def __init__(
30 self,
31 max_size: int = 1000,
32 ttl_seconds: int = 3600, # 預設 1 小時
33 enable_stats: bool = True
34 ):
35 self.max_size = max_size
36 self.ttl_seconds = ttl_seconds
37 self.enable_stats = enable_stats
38
39 self._cache: OrderedDict[str, CacheEntry] = OrderedDict()
40 self._lock = threading.RLock()
41
42 self._stats = {
43 "hits": 0,
44 "misses": 0,
45 "evictions": 0,
46 "expirations": 0
47 }
48
49 def _compute_cache_key(
50 self,
51 messages: list,
52 system: Optional[str] = None,
53 model: str = "claude-sonnet-4-6",
54 **kwargs
55 ) -> str:
56 """
57 計算快取鍵值
58
59 將所有影響回應的參數序列化後計算 hash
60 """
61 key_data = {
62 "messages": messages,
63 "system": system,
64 "model": model,
65 "max_tokens": kwargs.get("max_tokens"),
66 "temperature": kwargs.get("temperature", 1.0)
67 }
68 key_string = json.dumps(key_data, sort_keys=True, ensure_ascii=False)
69 return hashlib.sha256(key_string.encode()).hexdigest()
70
71 def _is_expired(self, entry: CacheEntry) -> bool:
72 """檢查條目是否過期"""
73 return time.time() - entry.created_at > self.ttl_seconds
74
75 def _evict_if_needed(self):
76 """如果超過容量限制,淘汰最舊的條目"""
77 while len(self._cache) >= self.max_size:
78 oldest_key = next(iter(self._cache))
79 del self._cache[oldest_key]
80 self._stats["evictions"] += 1
81
82 def get(self, cache_key: str) -> Optional[Any]:
83 """取得快取的回應"""
84 with self._lock:
85 if cache_key not in self._cache:
86 self._stats["misses"] += 1
87 return None
88
89 entry = self._cache[cache_key]
90
91 # 檢查是否過期
92 if self._is_expired(entry):
93 del self._cache[cache_key]
94 self._stats["expirations"] += 1
95 self._stats["misses"] += 1
96 return None
97
98 # 更新 LRU 順序(移到最後)
99 self._cache.move_to_end(cache_key)
100 entry.hit_count += 1
101 entry.last_accessed = time.time()
102
103 self._stats["hits"] += 1
104 return entry.response
105
106 def set(self, cache_key: str, response: Any):
107 """設定快取"""
108 with self._lock:
109 self._evict_if_needed()
110 self._cache[cache_key] = CacheEntry(
111 response=response,
112 created_at=time.time()
113 )
114
115 def get_stats(self) -> dict:
116 """取得快取統計"""
117 with self._lock:
118 total = self._stats["hits"] + self._stats["misses"]
119 hit_rate = self._stats["hits"] / total * 100 if total > 0 else 0
120
121 return {
122 **self._stats,
123 "size": len(self._cache),
124 "max_size": self.max_size,
125 "hit_rate": f"{hit_rate:.1f}%"
126 }
127
128 def clear(self):
129 """清空快取"""
130 with self._lock:
131 self._cache.clear()
132
133
134class CachedClaudeClient:
135 """
136 帶應用層快取的 Claude Client
137
138 結合 Claude API 的 Prompt Caching 和應用層回應快取
139 """
140
141 def __init__(
142 self,
143 response_cache: Optional[InMemoryResponseCache] = None,
144 enable_api_cache: bool = True
145 ):
146 self.client = anthropic.Anthropic()
147 self.response_cache = response_cache or InMemoryResponseCache()
148 self.enable_api_cache = enable_api_cache
149
150 def _build_system_with_cache(self, system: str) -> list:
151 """構建帶 API 快取的 system content"""
152 if self.enable_api_cache and len(system) > 1024: # 快取需要 > 1024 tokens
153 return [{
154 "type": "text",
155 "text": system,
156 "cache_control": {"type": "ephemeral"}
157 }]
158 return system
159
160 def create_message(
161 self,
162 messages: list,
163 system: Optional[str] = None,
164 model: str = "claude-sonnet-4-6",
165 use_response_cache: bool = True,
166 **kwargs
167 ) -> anthropic.types.Message:
168 """
169 建立訊息(帶雙層快取)
170
171 Args:
172 messages: 對話訊息
173 system: System prompt
174 model: 模型名稱
175 use_response_cache: 是否使用應用層回應快取
176 **kwargs: 其他 API 參數
177
178 Returns:
179 API 回應
180 """
181 # Layer 2:檢查應用層快取
182 if use_response_cache:
183 cache_key = self.response_cache._compute_cache_key(
184 messages=messages,
185 system=system,
186 model=model,
187 **kwargs
188 )
189
190 cached_response = self.response_cache.get(cache_key)
191 if cached_response is not None:
192 print("[Cache HIT] 返回快取的回應")
193 return cached_response
194
195 # Layer 1:使用 API 快取發送請求
196 api_kwargs = {
197 "model": model,
198 "max_tokens": kwargs.get("max_tokens", 4096),
199 "messages": messages
200 }
201
202 if system:
203 api_kwargs["system"] = self._build_system_with_cache(system)
204
205 if "temperature" in kwargs:
206 api_kwargs["temperature"] = kwargs["temperature"]
207
208 response = self.client.messages.create(**api_kwargs)
209
210 # 儲存到應用層快取
211 if use_response_cache:
212 self.response_cache.set(cache_key, response)
213 print("[Cache MISS] 已儲存回應到快取")
214
215 return response
216
217
218# 使用範例
219if __name__ == "__main__":
220 # 建立帶快取的 client
221 cached_client = CachedClaudeClient(
222 response_cache=InMemoryResponseCache(
223 max_size=500,
224 ttl_seconds=1800 # 30 分鐘
225 )
226 )
227
228 SYSTEM = "你是一個專業的程式助手,專長是 Python 開發。"
229
230 # 第一次呼叫(快取未命中)
231 response1 = cached_client.create_message(
232 messages=[{"role": "user", "content": "什麼是 Python 的 GIL?"}],
233 system=SYSTEM
234 )
235 print(f"回應 1:{response1.content[0].text[:100]}...")
236
237 # 第二次相同呼叫(快取命中)
238 response2 = cached_client.create_message(
239 messages=[{"role": "user", "content": "什麼是 Python 的 GIL?"}],
240 system=SYSTEM
241 )
242 print(f"回應 2:{response2.content[0].text[:100]}...")
243
244 # 輸出統計
245 print("\n快取統計:")
246 print(cached_client.response_cache.get_stats())
快取策略考量
何時使用應用層快取:
✅ 適合快取的情況:
├── 相同問題的重複查詢(FAQ、常見問題)
├── 確定性輸出(temperature=0)
├── 資料查詢型任務(不需要創意性回應)
└── 高頻重複請求(例如 API 閘道)
❌ 不適合快取的情況:
├── 需要隨機性的創意任務(temperature > 0)
├── 時效性敏感的資訊(即時數據)
├── 個人化回應(每個使用者不同)
└── 需要最新知識的查詢
Layer 3:RAG 系統快取整合
在 Retrieval-Augmented Generation (RAG) 系統中,快取策略可以應用在多個層面,顯著提升效能和降低成本。
RAG 系統架構與快取點
┌─────────────────────────────────────────────────────────────────────┐
│ RAG 系統快取架構 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 使用者查詢 │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Query Embedding Cache │ │
│ │ ├── 快取 Query → Embedding 的映射 │ │
│ │ └── 避免重複的 Embedding API 呼叫 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Retrieval Result Cache │ │
│ │ ├── 快取 Query → 檢索結果的映射 │ │
│ │ └── 避免重複的向量搜尋 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Document Embedding Cache (持久化) │ │
│ │ ├── 文件 → Embedding 存入向量資料庫 │ │
│ │ └── 只在文件變更時重新計算 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ LLM Response Cache │ │
│ │ ├── 快取 (Query + Context) → Response │ │
│ │ └── 相同輸入直接返回 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
完整 RAG 快取系統實作
1import hashlib
2import json
3import numpy as np
4from dataclasses import dataclass
5from typing import Optional, Any
6from abc import ABC, abstractmethod
7
8# 向量資料庫抽象(可替換為 Pinecone、Weaviate、Qdrant 等)
9class VectorStore(ABC):
10 @abstractmethod
11 def upsert(self, id: str, embedding: list[float], metadata: dict): pass
12
13 @abstractmethod
14 def query(self, embedding: list[float], top_k: int) -> list[dict]: pass
15
16
17# 簡易記憶體向量資料庫(示範用)
18class InMemoryVectorStore(VectorStore):
19 def __init__(self):
20 self.vectors: dict[str, dict] = {}
21
22 def upsert(self, id: str, embedding: list[float], metadata: dict):
23 self.vectors[id] = {
24 "embedding": np.array(embedding),
25 "metadata": metadata
26 }
27
28 def query(self, embedding: list[float], top_k: int = 5) -> list[dict]:
29 query_vec = np.array(embedding)
30 scores = []
31
32 for id, data in self.vectors.items():
33 # 餘弦相似度
34 similarity = np.dot(query_vec, data["embedding"]) / (
35 np.linalg.norm(query_vec) * np.linalg.norm(data["embedding"])
36 )
37 scores.append({
38 "id": id,
39 "score": float(similarity),
40 "metadata": data["metadata"]
41 })
42
43 # 排序並返回 top_k
44 scores.sort(key=lambda x: x["score"], reverse=True)
45 return scores[:top_k]
46
47
48@dataclass
49class RAGConfig:
50 """RAG 系統配置"""
51 embedding_model: str = "text-embedding-3-small" # OpenAI embedding
52 llm_model: str = "claude-sonnet-4-6"
53 chunk_size: int = 500
54 chunk_overlap: int = 50
55 top_k: int = 5
56 cache_ttl: int = 3600
57
58
59class CachedRAGSystem:
60 """
61 帶多層快取的 RAG 系統
62
63 快取層:
64 1. Query Embedding Cache:避免重複計算 query embedding
65 2. Retrieval Cache:避免重複的向量搜尋
66 3. Response Cache:避免重複的 LLM 呼叫
67 4. Document Embedding:持久化在向量資料庫
68 """
69
70 def __init__(
71 self,
72 vector_store: VectorStore,
73 config: Optional[RAGConfig] = None
74 ):
75 self.vector_store = vector_store
76 self.config = config or RAGConfig()
77 self.claude_client = anthropic.Anthropic()
78
79 # 快取層
80 self._embedding_cache: dict[str, list[float]] = {} # query -> embedding
81 self._retrieval_cache: dict[str, list[dict]] = {} # query -> results
82 self._response_cache: dict[str, str] = {} # (query+context) -> response
83
84 # 統計
85 self._stats = {
86 "embedding_cache_hits": 0,
87 "embedding_cache_misses": 0,
88 "retrieval_cache_hits": 0,
89 "retrieval_cache_misses": 0,
90 "response_cache_hits": 0,
91 "response_cache_misses": 0
92 }
93
94 def _compute_embedding(self, text: str) -> list[float]:
95 """
96 計算文字的 embedding(帶快取)
97
98 注意:這裡使用 OpenAI 的 embedding API
99 實際使用時可替換為其他 embedding 服務
100 """
101 cache_key = hashlib.md5(text.encode()).hexdigest()
102
103 if cache_key in self._embedding_cache:
104 self._stats["embedding_cache_hits"] += 1
105 return self._embedding_cache[cache_key]
106
107 self._stats["embedding_cache_misses"] += 1
108
109 # 呼叫 embedding API(這裡使用假資料示範)
110 # 實際使用:response = openai.embeddings.create(input=text, model=self.config.embedding_model)
111 # embedding = response.data[0].embedding
112
113 # 示範用:生成隨機 embedding
114 np.random.seed(hash(text) % (2**32))
115 embedding = np.random.randn(1536).tolist()
116
117 self._embedding_cache[cache_key] = embedding
118 return embedding
119
120 def _retrieve_context(self, query: str, top_k: Optional[int] = None) -> list[dict]:
121 """
122 檢索相關文件(帶快取)
123 """
124 cache_key = hashlib.md5(query.encode()).hexdigest()
125 top_k = top_k or self.config.top_k
126
127 if cache_key in self._retrieval_cache:
128 self._stats["retrieval_cache_hits"] += 1
129 return self._retrieval_cache[cache_key][:top_k]
130
131 self._stats["retrieval_cache_misses"] += 1
132
133 # 計算 query embedding
134 query_embedding = self._compute_embedding(query)
135
136 # 向量搜尋
137 results = self.vector_store.query(query_embedding, top_k=top_k)
138
139 self._retrieval_cache[cache_key] = results
140 return results
141
142 def _build_context_string(self, retrieved_docs: list[dict]) -> str:
143 """將檢索結果格式化為 context 字串"""
144 context_parts = []
145 for i, doc in enumerate(retrieved_docs, 1):
146 content = doc["metadata"].get("content", "")
147 source = doc["metadata"].get("source", "unknown")
148 context_parts.append(f"[文件 {i}] (來源: {source})\n{content}")
149 return "\n\n---\n\n".join(context_parts)
150
151 def _compute_response_cache_key(self, query: str, context: str) -> str:
152 """計算回應快取鍵值"""
153 combined = f"{query}|||{context}"
154 return hashlib.sha256(combined.encode()).hexdigest()
155
156 def query(
157 self,
158 question: str,
159 use_cache: bool = True,
160 additional_context: Optional[str] = None
161 ) -> dict:
162 """
163 執行 RAG 查詢
164
165 Args:
166 question: 使用者問題
167 use_cache: 是否使用快取
168 additional_context: 額外的 context(不經過檢索)
169
170 Returns:
171 包含回應和元資料的字典
172 """
173 # Step 1: 檢索相關文件
174 retrieved_docs = self._retrieve_context(question)
175 context = self._build_context_string(retrieved_docs)
176
177 if additional_context:
178 context = f"{additional_context}\n\n{context}"
179
180 # Step 2: 檢查回應快取
181 if use_cache:
182 cache_key = self._compute_response_cache_key(question, context)
183 if cache_key in self._response_cache:
184 self._stats["response_cache_hits"] += 1
185 return {
186 "answer": self._response_cache[cache_key],
187 "sources": retrieved_docs,
188 "cached": True
189 }
190 self._stats["response_cache_misses"] += 1
191
192 # Step 3: 呼叫 LLM(使用 Prompt Caching)
193 system_prompt = """你是一個專業的知識助手。根據提供的參考文件回答問題。
194
195規則:
1961. 只根據提供的文件內容回答
1972. 如果文件中沒有相關資訊,明確說明
1983. 引用資訊時標註來源文件編號
1994. 保持回答簡潔準確"""
200
201 user_message = f"""參考文件:
202{context}
203
204問題:{question}
205
206請根據上述文件回答問題。"""
207
208 response = self.claude_client.messages.create(
209 model=self.config.llm_model,
210 max_tokens=2048,
211 system=[{
212 "type": "text",
213 "text": system_prompt,
214 "cache_control": {"type": "ephemeral"} # API Prompt Caching
215 }],
216 messages=[{"role": "user", "content": user_message}]
217 )
218
219 answer = response.content[0].text
220
221 # 儲存到回應快取
222 if use_cache:
223 self._response_cache[cache_key] = answer
224
225 return {
226 "answer": answer,
227 "sources": retrieved_docs,
228 "cached": False,
229 "usage": {
230 "input_tokens": response.usage.input_tokens,
231 "output_tokens": response.usage.output_tokens,
232 "cache_creation_tokens": getattr(response.usage, "cache_creation_input_tokens", 0),
233 "cache_read_tokens": getattr(response.usage, "cache_read_input_tokens", 0)
234 }
235 }
236
237 def add_documents(self, documents: list[dict]):
238 """
239 添加文件到向量資料庫
240
241 Args:
242 documents: 文件列表,每個文件包含 id, content, metadata
243 """
244 for doc in documents:
245 embedding = self._compute_embedding(doc["content"])
246 self.vector_store.upsert(
247 id=doc["id"],
248 embedding=embedding,
249 metadata={
250 "content": doc["content"],
251 **doc.get("metadata", {})
252 }
253 )
254 print(f"已添加 {len(documents)} 份文件")
255
256 def get_stats(self) -> dict:
257 """取得快取統計"""
258 return {
259 **self._stats,
260 "embedding_cache_size": len(self._embedding_cache),
261 "retrieval_cache_size": len(self._retrieval_cache),
262 "response_cache_size": len(self._response_cache)
263 }
264
265
266# 使用範例
267if __name__ == "__main__":
268 # 建立向量資料庫和 RAG 系統
269 vector_store = InMemoryVectorStore()
270 rag = CachedRAGSystem(vector_store)
271
272 # 添加示範文件
273 documents = [
274 {
275 "id": "doc1",
276 "content": "Python 的 GIL(Global Interpreter Lock)是一個互斥鎖,確保同一時間只有一個執行緒執行 Python bytecode。這是 CPython 實作的特性,用於簡化記憶體管理。",
277 "metadata": {"source": "python-docs.md"}
278 },
279 {
280 "id": "doc2",
281 "content": "要繞過 GIL 的限制,可以使用 multiprocessing 模組進行多進程處理,或使用 C 擴展釋放 GIL。對於 I/O 密集型任務,asyncio 是更好的選擇。",
282 "metadata": {"source": "python-best-practices.md"}
283 },
284 {
285 "id": "doc3",
286 "content": "FastAPI 是一個現代的 Python Web 框架,基於 Starlette 和 Pydantic 構建。它支援異步處理,效能接近 NodeJS 和 Go。",
287 "metadata": {"source": "fastapi-intro.md"}
288 }
289 ]
290
291 rag.add_documents(documents)
292
293 # 查詢測試
294 print("\n" + "="*50)
295 print("第一次查詢(快取未命中)")
296 print("="*50)
297 result1 = rag.query("什麼是 Python 的 GIL?")
298 print(f"回答:{result1['answer'][:200]}...")
299 print(f"快取:{result1['cached']}")
300
301 print("\n" + "="*50)
302 print("第二次相同查詢(快取命中)")
303 print("="*50)
304 result2 = rag.query("什麼是 Python 的 GIL?")
305 print(f"回答:{result2['answer'][:200]}...")
306 print(f"快取:{result2['cached']}")
307
308 print("\n" + "="*50)
309 print("快取統計")
310 print("="*50)
311 stats = rag.get_stats()
312 for key, value in stats.items():
313 print(f" {key}: {value}")
Layer 4:分散式快取整合
對於生產環境的多實例部署,需要使用分散式快取(如 Redis)來共享快取資料。
Redis 快取整合
1import redis
2import json
3import hashlib
4from typing import Optional, Any
5from dataclasses import dataclass
6import pickle
7
8@dataclass
9class RedisConfig:
10 host: str = "localhost"
11 port: int = 6379
12 db: int = 0
13 password: Optional[str] = None
14 default_ttl: int = 3600 # 1 小時
15
16
17class DistributedResponseCache:
18 """
19 基於 Redis 的分散式回應快取
20
21 特點:
22 - 跨實例共享快取
23 - 支援 TTL 自動過期
24 - 支援快取標籤(用於批次失效)
25 """
26
27 def __init__(self, config: Optional[RedisConfig] = None):
28 self.config = config or RedisConfig()
29 self.redis = redis.Redis(
30 host=self.config.host,
31 port=self.config.port,
32 db=self.config.db,
33 password=self.config.password,
34 decode_responses=False # 支援 binary 資料
35 )
36 self._prefix = "llm_cache:"
37
38 def _make_key(self, cache_key: str) -> str:
39 return f"{self._prefix}{cache_key}"
40
41 def _compute_cache_key(
42 self,
43 messages: list,
44 system: Optional[str] = None,
45 model: str = "claude-sonnet-4-6",
46 **kwargs
47 ) -> str:
48 key_data = {
49 "messages": messages,
50 "system": system,
51 "model": model,
52 "temperature": kwargs.get("temperature", 1.0)
53 }
54 key_string = json.dumps(key_data, sort_keys=True, ensure_ascii=False)
55 return hashlib.sha256(key_string.encode()).hexdigest()
56
57 def get(self, cache_key: str) -> Optional[Any]:
58 """取得快取"""
59 try:
60 data = self.redis.get(self._make_key(cache_key))
61 if data:
62 return pickle.loads(data)
63 return None
64 except Exception as e:
65 print(f"Redis get error: {e}")
66 return None
67
68 def set(
69 self,
70 cache_key: str,
71 value: Any,
72 ttl: Optional[int] = None,
73 tags: Optional[list[str]] = None
74 ):
75 """
76 設定快取
77
78 Args:
79 cache_key: 快取鍵
80 value: 快取值
81 ttl: 存活時間(秒)
82 tags: 快取標籤(用於批次失效)
83 """
84 try:
85 key = self._make_key(cache_key)
86 ttl = ttl or self.config.default_ttl
87
88 # 存入資料
89 self.redis.setex(key, ttl, pickle.dumps(value))
90
91 # 如果有標籤,添加到標籤集合
92 if tags:
93 for tag in tags:
94 self.redis.sadd(f"{self._prefix}tag:{tag}", cache_key)
95 self.redis.expire(f"{self._prefix}tag:{tag}", ttl)
96
97 except Exception as e:
98 print(f"Redis set error: {e}")
99
100 def invalidate_by_tag(self, tag: str):
101 """根據標籤批次失效快取"""
102 try:
103 tag_key = f"{self._prefix}tag:{tag}"
104 cache_keys = self.redis.smembers(tag_key)
105
106 if cache_keys:
107 # 刪除所有相關快取
108 keys_to_delete = [self._make_key(k.decode()) for k in cache_keys]
109 self.redis.delete(*keys_to_delete)
110 self.redis.delete(tag_key)
111
112 print(f"已失效 {len(cache_keys)} 個快取(標籤:{tag})")
113 except Exception as e:
114 print(f"Redis invalidate error: {e}")
115
116 def get_stats(self) -> dict:
117 """取得 Redis 快取統計"""
118 try:
119 info = self.redis.info("stats")
120 keys_count = self.redis.dbsize()
121 return {
122 "total_keys": keys_count,
123 "keyspace_hits": info.get("keyspace_hits", 0),
124 "keyspace_misses": info.get("keyspace_misses", 0),
125 "hit_rate": f"{info.get('keyspace_hits', 0) / max(info.get('keyspace_hits', 0) + info.get('keyspace_misses', 0), 1) * 100:.1f}%"
126 }
127 except Exception as e:
128 return {"error": str(e)}
129
130
131class ProductionCachedClient:
132 """
133 生產級帶快取的 Claude Client
134
135 整合:
136 - Claude API Prompt Caching
137 - 本地 LRU 快取(L1)
138 - Redis 分散式快取(L2)
139 """
140
141 def __init__(
142 self,
143 redis_config: Optional[RedisConfig] = None,
144 local_cache_size: int = 100
145 ):
146 self.client = anthropic.Anthropic()
147 self.local_cache = InMemoryResponseCache(max_size=local_cache_size)
148 self.redis_cache = DistributedResponseCache(redis_config)
149
150 def create_message(
151 self,
152 messages: list,
153 system: Optional[str] = None,
154 model: str = "claude-sonnet-4-6",
155 cache_tags: Optional[list[str]] = None,
156 **kwargs
157 ):
158 """
159 建立訊息(三層快取)
160
161 快取檢查順序:
162 1. 本地 LRU 快取(最快)
163 2. Redis 分散式快取
164 3. Claude API(帶 Prompt Caching)
165 """
166 cache_key = self.redis_cache._compute_cache_key(
167 messages=messages, system=system, model=model, **kwargs
168 )
169
170 # L1: 本地快取
171 local_result = self.local_cache.get(cache_key)
172 if local_result:
173 print("[L1 HIT] 本地快取命中")
174 return local_result
175
176 # L2: Redis 快取
177 redis_result = self.redis_cache.get(cache_key)
178 if redis_result:
179 print("[L2 HIT] Redis 快取命中")
180 # 回填本地快取
181 self.local_cache.set(cache_key, redis_result)
182 return redis_result
183
184 # L3: API 呼叫(帶 Prompt Caching)
185 print("[MISS] 呼叫 API")
186
187 api_kwargs = {
188 "model": model,
189 "max_tokens": kwargs.get("max_tokens", 4096),
190 "messages": messages
191 }
192
193 if system:
194 # 使用 Prompt Caching
195 if len(system) > 500:
196 api_kwargs["system"] = [{
197 "type": "text",
198 "text": system,
199 "cache_control": {"type": "ephemeral"}
200 }]
201 else:
202 api_kwargs["system"] = system
203
204 response = self.client.messages.create(**api_kwargs)
205
206 # 回填快取
207 self.local_cache.set(cache_key, response)
208 self.redis_cache.set(cache_key, response, tags=cache_tags)
209
210 return response
實戰案例:智能客服系統
整合所有快取策略的完整智能客服系統實作:
1"""
2智能客服系統 - 整合多層快取的完整實作
3
4架構:
5- FAQ 快取:常見問題直接返回預設答案
6- RAG 快取:知識庫檢索結果快取
7- 回應快取:LLM 回應快取
8- Prompt Caching:System Prompt 快取
9"""
10
11import anthropic
12from dataclasses import dataclass, field
13from typing import Optional
14from enum import Enum
15import hashlib
16import json
17
18client = anthropic.Anthropic()
19
20
21class QueryType(Enum):
22 FAQ = "faq" # 常見問題
23 KNOWLEDGE = "knowledge" # 知識庫查詢
24 GENERAL = "general" # 一般對話
25
26
27@dataclass
28class CustomerServiceConfig:
29 company_name: str = "TechCorp"
30 support_email: str = "support@techcorp.com"
31 business_hours: str = "週一至週五 09:00-18:00"
32
33
34class SmartCustomerService:
35 """
36 智能客服系統
37
38 快取策略:
39 1. FAQ 完全匹配:直接返回預設答案(零 API 呼叫)
40 2. FAQ 模糊匹配:使用快取的分類結果
41 3. 知識庫查詢:RAG + 回應快取
42 4. 一般對話:Prompt Caching
43 """
44
45 def __init__(self, config: Optional[CustomerServiceConfig] = None):
46 self.config = config or CustomerServiceConfig()
47
48 # FAQ 資料庫(完全匹配,零 API 成本)
49 self.faq_exact_match: dict[str, str] = {
50 "營業時間": f"我們的營業時間是{self.config.business_hours}。",
51 "客服電話": f"請聯繫 {self.config.support_email} 或致電客服專線。",
52 "退貨政策": "商品可在購買後 7 天內申請退貨,請保持商品完整。",
53 }
54
55 # FAQ 模糊匹配關鍵字
56 self.faq_keywords: dict[str, str] = {
57 "退貨|退款|換貨": "退貨政策",
58 "營業|開門|上班": "營業時間",
59 "電話|聯繫|客服": "客服電話",
60 }
61
62 # 知識庫(實際使用時接入向量資料庫)
63 self.knowledge_base: list[dict] = [
64 {
65 "id": "kb1",
66 "content": "我們的旗艦產品 ProductX 支援 iOS 和 Android 平台...",
67 "category": "product"
68 },
69 # ... 更多知識條目
70 ]
71
72 # 回應快取
73 self._response_cache: dict[str, str] = {}
74
75 # System Prompt(將被快取)
76 self._system_prompt = f"""你是 {self.config.company_name} 的智能客服助手。
77
78## 你的職責
79- 專業且友善地回答客戶問題
80- 提供準確的產品和服務資訊
81- 無法回答時,引導客戶聯繫人工客服
82
83## 公司資訊
84- 公司名稱:{self.config.company_name}
85- 客服郵箱:{self.config.support_email}
86- 營業時間:{self.config.business_hours}
87
88## 回應原則
891. 保持簡潔,直接回答問題
902. 使用繁體中文
913. 語氣專業但親切
924. 不確定的資訊要明確說明"""
93
94 # 統計
95 self._stats = {
96 "faq_exact_hits": 0,
97 "faq_keyword_hits": 0,
98 "cache_hits": 0,
99 "api_calls": 0
100 }
101
102 def _classify_query(self, query: str) -> QueryType:
103 """分類查詢類型"""
104 import re
105
106 # 檢查 FAQ 完全匹配
107 if query in self.faq_exact_match:
108 return QueryType.FAQ
109
110 # 檢查 FAQ 關鍵字匹配
111 for pattern in self.faq_keywords:
112 if re.search(pattern, query):
113 return QueryType.FAQ
114
115 # 檢查是否需要知識庫
116 knowledge_keywords = ["產品", "功能", "如何使用", "規格", "價格"]
117 if any(kw in query for kw in knowledge_keywords):
118 return QueryType.KNOWLEDGE
119
120 return QueryType.GENERAL
121
122 def _get_faq_answer(self, query: str) -> Optional[str]:
123 """取得 FAQ 答案"""
124 import re
125
126 # 完全匹配
127 if query in self.faq_exact_match:
128 self._stats["faq_exact_hits"] += 1
129 return self.faq_exact_match[query]
130
131 # 關鍵字匹配
132 for pattern, faq_key in self.faq_keywords.items():
133 if re.search(pattern, query):
134 self._stats["faq_keyword_hits"] += 1
135 return self.faq_exact_match.get(faq_key)
136
137 return None
138
139 def _compute_cache_key(self, query: str, context: str = "") -> str:
140 combined = f"{query}|{context}"
141 return hashlib.md5(combined.encode()).hexdigest()
142
143 def _call_llm(self, query: str, context: str = "") -> str:
144 """呼叫 LLM(帶 Prompt Caching)"""
145 user_content = query
146 if context:
147 user_content = f"參考資訊:\n{context}\n\n客戶問題:{query}"
148
149 response = client.messages.create(
150 model="claude-sonnet-4-6",
151 max_tokens=1024,
152 system=[{
153 "type": "text",
154 "text": self._system_prompt,
155 "cache_control": {"type": "ephemeral"} # Prompt Caching
156 }],
157 messages=[{"role": "user", "content": user_content}]
158 )
159
160 self._stats["api_calls"] += 1
161 return response.content[0].text
162
163 def respond(self, query: str) -> dict:
164 """
165 處理客戶查詢
166
167 Returns:
168 包含回應和元資料的字典
169 """
170 query_type = self._classify_query(query)
171
172 # 路徑 1:FAQ 直接返回
173 if query_type == QueryType.FAQ:
174 answer = self._get_faq_answer(query)
175 if answer:
176 return {
177 "answer": answer,
178 "source": "faq",
179 "cached": True,
180 "api_called": False
181 }
182
183 # 路徑 2:檢查回應快取
184 cache_key = self._compute_cache_key(query)
185 if cache_key in self._response_cache:
186 self._stats["cache_hits"] += 1
187 return {
188 "answer": self._response_cache[cache_key],
189 "source": "cache",
190 "cached": True,
191 "api_called": False
192 }
193
194 # 路徑 3:知識庫查詢 + LLM
195 context = ""
196 if query_type == QueryType.KNOWLEDGE:
197 # 這裡簡化處理,實際應使用向量搜尋
198 context = self.knowledge_base[0]["content"] if self.knowledge_base else ""
199
200 # 路徑 4:呼叫 LLM
201 answer = self._call_llm(query, context)
202
203 # 儲存到快取
204 self._response_cache[cache_key] = answer
205
206 return {
207 "answer": answer,
208 "source": "llm" if query_type == QueryType.GENERAL else "rag",
209 "cached": False,
210 "api_called": True
211 }
212
213 def get_stats(self) -> dict:
214 """取得系統統計"""
215 total_requests = (
216 self._stats["faq_exact_hits"] +
217 self._stats["faq_keyword_hits"] +
218 self._stats["cache_hits"] +
219 self._stats["api_calls"]
220 )
221
222 return {
223 **self._stats,
224 "total_requests": total_requests,
225 "cache_hit_rate": f"{(self._stats['faq_exact_hits'] + self._stats['faq_keyword_hits'] + self._stats['cache_hits']) / max(total_requests, 1) * 100:.1f}%",
226 "response_cache_size": len(self._response_cache)
227 }
228
229
230# 使用範例
231if __name__ == "__main__":
232 service = SmartCustomerService()
233
234 queries = [
235 "營業時間", # FAQ 完全匹配
236 "請問你們幾點開門?", # FAQ 關鍵字匹配
237 "退貨政策", # FAQ 完全匹配
238 "我想了解你們的產品功能", # 知識庫查詢
239 "今天天氣如何?", # 一般對話
240 "今天天氣如何?", # 快取命中
241 ]
242
243 print("="*60)
244 print("智能客服系統測試")
245 print("="*60)
246
247 for query in queries:
248 result = service.respond(query)
249 print(f"\n問:{query}")
250 print(f"答:{result['answer'][:100]}...")
251 print(f"來源:{result['source']},快取:{result['cached']},API:{result['api_called']}")
252
253 print("\n" + "="*60)
254 print("系統統計")
255 print("="*60)
256 stats = service.get_stats()
257 for key, value in stats.items():
258 print(f" {key}: {value}")
快取效益總覽
┌─────────────────────────────────────────────────────────────────────┐
│ 快取策略效益比較 │
├──────────────────────┬──────────┬──────────┬──────────┬────────────┤
│ 快取層 │ 成本節省 │ 延遲降低 │ 實作複雜度│ 適用場景 │
├──────────────────────┼──────────┼──────────┼──────────┼────────────┤
│ Claude Prompt Cache │ 90% │ 顯著 │ 極低 │ 固定前綴 │
│ 應用層 LRU 快取 │ 100% │ 極大 │ 低 │ 重複查詢 │
│ RAG Embedding 快取 │ 50-80% │ 中等 │ 中 │ 文件檢索 │
│ Redis 分散式快取 │ 100% │ 極大 │ 中 │ 多實例 │
└──────────────────────┴──────────┴──────────┴──────────┴────────────┘
建議實施順序:
1. Claude Prompt Caching(最簡單,立即見效)
2. 應用層 LRU 快取(重複查詢場景)
3. RAG 快取整合(知識庫應用)
4. Redis 分散式快取(生產環境擴展)
最佳實踐清單
快取實作 Checklist:
基礎設定
□ System Prompt 是否超過 1,024 tokens?若是,啟用 Prompt Caching
□ 是否定義了明確的快取鍵計算邏輯?
□ 是否設定了合適的 TTL(存活時間)?
快取設計
□ 是否識別出高頻重複查詢並優先快取?
□ 是否區分可快取和不可快取的請求?
□ 是否設計了快取失效策略(基於時間/事件)?
RAG 整合
□ 是否快取 Embedding 計算結果?
□ 是否快取向量搜尋結果?
□ 是否在文件更新時正確失效相關快取?
監控
□ 是否追蹤快取命中率?
□ 是否監控快取大小和記憶體使用?
□ 是否設定了快取效能警報?
總結
Prompt Caching 和應用層快取是優化 AI 應用成本和效能的關鍵技術。本文介紹的多層快取架構可以根據實際需求靈活組合:
| 場景 | 推薦快取策略 |
|---|---|
| 單實例簡單應用 | Claude Prompt Cache + LRU |
| 高頻重複查詢 | Claude Prompt Cache + LRU + FAQ |
| 知識庫問答 | Claude Prompt Cache + RAG 全層快取 |
| 多實例生產環境 | 全部層級 + Redis |
快取不是一勞永逸的解決方案,需要根據實際使用模式持續調整。建立完善的監控和分析機制,讓數據驅動快取策略的優化,才能持續提升系統效能並控制成本。
