前言
生產環境 AI 系統的監控與最佳化是確保企業 AI 應用成功的關鍵。從模型效能追蹤、基礎設施監控到成本控制,AI FDE 需要建立全方位的可觀測性體系。本文將深入探討 LLM-native 指標設計、分散式監控架構、智能故障診斷與企業級成本最佳化策略。
1. LLM-native 指標與評估體系
核心效能指標設計
LLM 特定指標框架:
1from dataclasses import dataclass
2from typing import Dict, List, Optional, Union
3import numpy as np
4from collections import deque
5import time
6import asyncio
7from enum import Enum
8
9class MetricType(Enum):
10 LATENCY = "latency"
11 THROUGHPUT = "throughput"
12 QUALITY = "quality"
13 COST = "cost"
14 RELIABILITY = "reliability"
15
16@dataclass
17class LLMMetrics:
18 timestamp: float
19 request_id: str
20 model_name: str
21
22 # 效能指標
23 time_to_first_token: float # TTFT - 首個 token 延遲
24 time_per_output_token: float # TPOT - 每個輸出 token 時間
25 total_latency: float
26 tokens_per_second: float
27
28 # 品質指標
29 perplexity: Optional[float] = None
30 bleu_score: Optional[float] = None
31 rouge_score: Optional[Dict[str, float]] = None
32 human_feedback_score: Optional[float] = None
33
34 # 成本指標
35 input_tokens: int = 0
36 output_tokens: int = 0
37 compute_cost: float = 0.0
38
39 # 可靠性指標
40 success: bool = True
41 error_type: Optional[str] = None
42 retry_count: int = 0
43
44class LLMMetricsCollector:
45 def __init__(self, buffer_size: int = 10000):
46 self.metrics_buffer = deque(maxlen=buffer_size)
47 self.real_time_stats = {}
48
49 def collect_metric(self, metric: LLMMetrics):
50 """收集單一指標"""
51 self.metrics_buffer.append(metric)
52 self._update_real_time_stats(metric)
53
54 def _update_real_time_stats(self, metric: LLMMetrics):
55 """更新即時統計"""
56 model_name = metric.model_name
57
58 if model_name not in self.real_time_stats:
59 self.real_time_stats[model_name] = {
60 'total_requests': 0,
61 'successful_requests': 0,
62 'total_latency': 0,
63 'total_cost': 0,
64 'recent_latencies': deque(maxlen=100),
65 'recent_quality_scores': deque(maxlen=100)
66 }
67
68 stats = self.real_time_stats[model_name]
69 stats['total_requests'] += 1
70
71 if metric.success:
72 stats['successful_requests'] += 1
73 stats['total_latency'] += metric.total_latency
74 stats['recent_latencies'].append(metric.total_latency)
75
76 if metric.human_feedback_score is not None:
77 stats['recent_quality_scores'].append(metric.human_feedback_score)
78
79 stats['total_cost'] += metric.compute_cost
80
81 def get_sla_metrics(self, model_name: str, time_window: int = 3600) -> Dict:
82 """獲取 SLA 相關指標"""
83 cutoff_time = time.time() - time_window
84
85 recent_metrics = [
86 m for m in self.metrics_buffer
87 if m.model_name == model_name and m.timestamp >= cutoff_time
88 ]
89
90 if not recent_metrics:
91 return {}
92
93 successful_metrics = [m for m in recent_metrics if m.success]
94
95 # 計算關鍵 SLA 指標
96 return {
97 'availability': len(successful_metrics) / len(recent_metrics),
98 'p50_latency': np.percentile([m.total_latency for m in successful_metrics], 50),
99 'p95_latency': np.percentile([m.total_latency for m in successful_metrics], 95),
100 'p99_latency': np.percentile([m.total_latency for m in successful_metrics], 99),
101 'average_ttft': np.mean([m.time_to_first_token for m in successful_metrics]),
102 'average_tpot': np.mean([m.time_per_output_token for m in successful_metrics]),
103 'error_rate': 1 - (len(successful_metrics) / len(recent_metrics)),
104 'total_requests': len(recent_metrics),
105 'requests_per_second': len(recent_metrics) / time_window
106 }
107
108 def detect_performance_anomalies(self, model_name: str) -> List[Dict]:
109 """檢測效能異常"""
110 anomalies = []
111
112 if model_name not in self.real_time_stats:
113 return anomalies
114
115 stats = self.real_time_stats[model_name]
116 recent_latencies = list(stats['recent_latencies'])
117
118 if len(recent_latencies) < 10:
119 return anomalies
120
121 # 統計異常檢測
122 mean_latency = np.mean(recent_latencies)
123 std_latency = np.std(recent_latencies)
124
125 # 檢測延遲異常
126 for i, latency in enumerate(recent_latencies[-10:]):
127 if latency > mean_latency + 3 * std_latency:
128 anomalies.append({
129 'type': 'high_latency',
130 'value': latency,
131 'threshold': mean_latency + 3 * std_latency,
132 'severity': 'high' if latency > mean_latency + 5 * std_latency else 'medium'
133 })
134
135 # 檢測品質下降
136 recent_quality = list(stats['recent_quality_scores'])
137 if len(recent_quality) >= 5:
138 recent_avg = np.mean(recent_quality[-5:])
139 historical_avg = np.mean(recent_quality[:-5]) if len(recent_quality) > 5 else recent_avg
140
141 if recent_avg < historical_avg * 0.9: # 品質下降超過 10%
142 anomalies.append({
143 'type': 'quality_degradation',
144 'current_quality': recent_avg,
145 'historical_quality': historical_avg,
146 'severity': 'high'
147 })
148
149 return anomalies
進階品質評估
多維度品質檢測:
1import torch
2from transformers import AutoTokenizer, AutoModel
3from sentence_transformers import SentenceTransformer
4from typing import List, Tuple
5
6class LLMQualityEvaluator:
7 def __init__(self):
8 self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2')
9 self.toxicity_threshold = 0.7
10 self.coherence_threshold = 0.6
11
12 def evaluate_response_quality(self, prompt: str, response: str,
13 expected_response: Optional[str] = None) -> Dict:
14 """全面評估回應品質"""
15
16 quality_scores = {}
17
18 # 1. 語言流暢度評估
19 quality_scores['fluency'] = self._evaluate_fluency(response)
20
21 # 2. 相關性評估
22 quality_scores['relevance'] = self._evaluate_relevance(prompt, response)
23
24 # 3. 一致性評估
25 quality_scores['coherence'] = self._evaluate_coherence(response)
26
27 # 4. 安全性評估
28 quality_scores['safety'] = self._evaluate_safety(response)
29
30 # 5. 事實正確性評估(如果有預期答案)
31 if expected_response:
32 quality_scores['factual_accuracy'] = self._evaluate_factual_accuracy(
33 response, expected_response
34 )
35
36 # 6. 幻覺檢測
37 quality_scores['hallucination_score'] = self._detect_hallucination(prompt, response)
38
39 # 計算綜合品質分數
40 weights = {
41 'fluency': 0.15,
42 'relevance': 0.25,
43 'coherence': 0.20,
44 'safety': 0.25,
45 'factual_accuracy': 0.10,
46 'hallucination_score': 0.05
47 }
48
49 overall_score = sum(
50 quality_scores.get(metric, 0.5) * weight
51 for metric, weight in weights.items()
52 )
53
54 quality_scores['overall_quality'] = overall_score
55
56 return quality_scores
57
58 def _evaluate_fluency(self, response: str) -> float:
59 """評估語言流暢度"""
60 # 簡化實作:基於語言模型困惑度
61 sentences = response.split('.')
62
63 fluency_indicators = {
64 'avg_sentence_length': self._calculate_avg_sentence_length(sentences),
65 'repetition_rate': self._calculate_repetition_rate(response),
66 'grammar_errors': self._count_grammar_errors(response)
67 }
68
69 # 正規化分數
70 fluency_score = 1.0
71
72 # 懲罰過短或過長的句子
73 if fluency_indicators['avg_sentence_length'] < 5 or fluency_indicators['avg_sentence_length'] > 50:
74 fluency_score *= 0.8
75
76 # 懲罰高重複率
77 if fluency_indicators['repetition_rate'] > 0.3:
78 fluency_score *= 0.7
79
80 # 懲罰語法錯誤
81 fluency_score *= max(0.1, 1.0 - fluency_indicators['grammar_errors'] * 0.1)
82
83 return max(0.0, min(1.0, fluency_score))
84
85 def _evaluate_relevance(self, prompt: str, response: str) -> float:
86 """評估回應相關性"""
87
88 # 使用語意相似度評估
89 prompt_embedding = self.sentence_model.encode([prompt])
90 response_embedding = self.sentence_model.encode([response])
91
92 # 計算餘弦相似度
93 similarity = torch.nn.functional.cosine_similarity(
94 torch.tensor(prompt_embedding),
95 torch.tensor(response_embedding)
96 ).item()
97
98 return max(0.0, min(1.0, similarity))
99
100 def _evaluate_coherence(self, response: str) -> float:
101 """評估內容一致性"""
102 sentences = [s.strip() for s in response.split('.') if s.strip()]
103
104 if len(sentences) <= 1:
105 return 1.0
106
107 # 計算相鄰句子間的語意相似度
108 coherence_scores = []
109
110 for i in range(len(sentences) - 1):
111 sim = torch.nn.functional.cosine_similarity(
112 torch.tensor(self.sentence_model.encode([sentences[i]])),
113 torch.tensor(self.sentence_model.encode([sentences[i + 1]]))
114 ).item()
115 coherence_scores.append(sim)
116
117 return np.mean(coherence_scores) if coherence_scores else 1.0
118
119 def _evaluate_safety(self, response: str) -> float:
120 """評估內容安全性"""
121
122 # 檢測有害內容關鍵詞
123 harmful_keywords = [
124 'violent', 'illegal', 'discriminatory', 'hateful',
125 'toxic', 'offensive', 'inappropriate'
126 ]
127
128 safety_score = 1.0
129
130 response_lower = response.lower()
131
132 for keyword in harmful_keywords:
133 if keyword in response_lower:
134 safety_score *= 0.7
135
136 # 檢測個人資訊洩露
137 if self._contains_pii(response):
138 safety_score *= 0.5
139
140 return max(0.0, safety_score)
141
142 def _evaluate_factual_accuracy(self, response: str, expected: str) -> float:
143 """評估事實正確性"""
144
145 # 提取關鍵事實
146 response_facts = self._extract_facts(response)
147 expected_facts = self._extract_facts(expected)
148
149 if not expected_facts:
150 return 0.5 # 無法評估
151
152 # 計算事實重疊度
153 overlap = len(set(response_facts) & set(expected_facts))
154 accuracy = overlap / len(expected_facts) if expected_facts else 0.5
155
156 return accuracy
157
158 def _detect_hallucination(self, prompt: str, response: str) -> float:
159 """檢測幻覺內容"""
160
161 # 簡化實作:檢查回應是否包含提示中沒有的具體數字、日期等
162 import re
163
164 prompt_numbers = set(re.findall(r'\d+', prompt))
165 response_numbers = set(re.findall(r'\d+', response))
166
167 # 計算新增數字的比例
168 new_numbers = response_numbers - prompt_numbers
169 hallucination_indicator = len(new_numbers) / max(len(response_numbers), 1)
170
171 return 1.0 - min(hallucination_indicator, 1.0)
172
173 def _calculate_avg_sentence_length(self, sentences: List[str]) -> float:
174 """計算平均句子長度"""
175 if not sentences:
176 return 0
177 return sum(len(s.split()) for s in sentences) / len(sentences)
178
179 def _calculate_repetition_rate(self, text: str) -> float:
180 """計算重複率"""
181 words = text.lower().split()
182 if len(words) <= 1:
183 return 0
184
185 unique_words = len(set(words))
186 return 1.0 - (unique_words / len(words))
187
188 def _count_grammar_errors(self, text: str) -> int:
189 """計算語法錯誤(簡化實作)"""
190 # 實際環境中應使用 LanguageTool 等工具
191 error_patterns = [
192 r'\b(a)\s+[aeiou]', # 錯誤的不定冠詞
193 r'\b(an)\s+[bcdfg-hj-np-tv-z]', # 錯誤的不定冠詞
194 ]
195
196 errors = 0
197 for pattern in error_patterns:
198 errors += len(re.findall(pattern, text, re.IGNORECASE))
199
200 return errors
201
202 def _contains_pii(self, text: str) -> bool:
203 """檢測個人識別資訊"""
204 import re
205
206 # 簡化的 PII 檢測模式
207 pii_patterns = [
208 r'\b\d{4}-\d{4}-\d{4}-\d{4}\b', # 信用卡號
209 r'\b\d{3}-\d{2}-\d{4}\b', # SSN
210 r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' # 電子郵件
211 ]
212
213 for pattern in pii_patterns:
214 if re.search(pattern, text):
215 return True
216
217 return False
218
219 def _extract_facts(self, text: str) -> List[str]:
220 """提取文本中的事實(簡化實作)"""
221 # 實際環境中應使用 NER 和關係抽取
222 import re
223
224 facts = []
225
226 # 提取數字事實
227 numbers = re.findall(r'\d+(?:\.\d+)?', text)
228 facts.extend([f"number_{num}" for num in numbers])
229
230 # 提取日期事實
231 dates = re.findall(r'\b\d{4}-\d{2}-\d{2}\b|\b\d{1,2}/\d{1,2}/\d{4}\b', text)
232 facts.extend([f"date_{date}" for date in dates])
233
234 return facts
2. 分散式監控架構
可觀測性基礎設施
OpenTelemetry + Prometheus 整合:
1from opentelemetry import trace, metrics
2from opentelemetry.exporter.prometheus import PrometheusMetricReader
3from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
4from opentelemetry.instrumentation.requests import RequestsInstrumentor
5from prometheus_client import Counter, Histogram, Gauge
6import asyncio
7import logging
8
9class AISystemObservability:
10 def __init__(self, service_name: str):
11 self.service_name = service_name
12
13 # 設定追蹤
14 self.tracer = trace.get_tracer(__name__)
15
16 # 自定義指標
17 self.setup_custom_metrics()
18
19 # 設定結構化日誌
20 self.setup_logging()
21
22 def setup_custom_metrics(self):
23 """設定自定義指標"""
24
25 # 計數器指標
26 self.request_counter = Counter(
27 'ai_requests_total',
28 'Total AI model requests',
29 ['model_name', 'endpoint', 'status']
30 )
31
32 self.token_counter = Counter(
33 'ai_tokens_total',
34 'Total tokens processed',
35 ['model_name', 'token_type'] # input/output
36 )
37
38 # 直方圖指標
39 self.latency_histogram = Histogram(
40 'ai_request_duration_seconds',
41 'AI request latency in seconds',
42 ['model_name', 'endpoint'],
43 buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
44 )
45
46 self.ttft_histogram = Histogram(
47 'ai_time_to_first_token_seconds',
48 'Time to first token in seconds',
49 ['model_name'],
50 buckets=[0.01, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0]
51 )
52
53 # 量表指標
54 self.active_requests = Gauge(
55 'ai_active_requests',
56 'Number of active AI requests',
57 ['model_name']
58 )
59
60 self.model_quality_score = Gauge(
61 'ai_model_quality_score',
62 'Model quality score (0-1)',
63 ['model_name', 'metric_type']
64 )
65
66 self.gpu_utilization = Gauge(
67 'ai_gpu_utilization_percent',
68 'GPU utilization percentage',
69 ['gpu_id', 'model_name']
70 )
71
72 def setup_logging(self):
73 """設定結構化日誌"""
74
75 self.logger = logging.getLogger(self.service_name)
76 self.logger.setLevel(logging.INFO)
77
78 # JSON 格式處理器
79 formatter = logging.Formatter(
80 '{"timestamp": "%(asctime)s", "service": "%(name)s", '
81 '"level": "%(levelname)s", "message": "%(message)s", '
82 '"trace_id": "%(trace_id)s", "span_id": "%(span_id)s"}'
83 )
84
85 handler = logging.StreamHandler()
86 handler.setFormatter(formatter)
87 self.logger.addHandler(handler)
88
89 def track_ai_request(self, model_name: str, endpoint: str, func):
90 """AI 請求追蹤裝飾器"""
91
92 def decorator(*args, **kwargs):
93 # 增加活躍請求計數
94 self.active_requests.labels(model_name=model_name).inc()
95
96 with self.tracer.start_as_current_span(
97 f"{self.service_name}.{endpoint}",
98 attributes={
99 "ai.model.name": model_name,
100 "ai.endpoint": endpoint,
101 "ai.service": self.service_name
102 }
103 ) as span:
104 start_time = time.time()
105
106 try:
107 # 執行實際函數
108 result = func(*args, **kwargs)
109
110 # 記錄成功指標
111 duration = time.time() - start_time
112
113 self.request_counter.labels(
114 model_name=model_name,
115 endpoint=endpoint,
116 status='success'
117 ).inc()
118
119 self.latency_histogram.labels(
120 model_name=model_name,
121 endpoint=endpoint
122 ).observe(duration)
123
124 # 添加追蹤屬性
125 span.set_attributes({
126 "ai.response.success": True,
127 "ai.response.duration": duration,
128 "ai.tokens.input": result.get('input_tokens', 0),
129 "ai.tokens.output": result.get('output_tokens', 0)
130 })
131
132 # 記錄 token 使用
133 if 'input_tokens' in result:
134 self.token_counter.labels(
135 model_name=model_name,
136 token_type='input'
137 ).inc(result['input_tokens'])
138
139 if 'output_tokens' in result:
140 self.token_counter.labels(
141 model_name=model_name,
142 token_type='output'
143 ).inc(result['output_tokens'])
144
145 return result
146
147 except Exception as e:
148 # 記錄錯誤指標
149 self.request_counter.labels(
150 model_name=model_name,
151 endpoint=endpoint,
152 status='error'
153 ).inc()
154
155 span.set_attributes({
156 "ai.response.success": False,
157 "ai.error.type": type(e).__name__,
158 "ai.error.message": str(e)
159 })
160
161 self.logger.error(
162 f"AI request failed: {e}",
163 extra={
164 "trace_id": span.get_span_context().trace_id,
165 "span_id": span.get_span_context().span_id,
166 "model_name": model_name,
167 "endpoint": endpoint
168 }
169 )
170
171 raise
172
173 finally:
174 # 減少活躍請求計數
175 self.active_requests.labels(model_name=model_name).dec()
176
177 return decorator
178
179 def update_quality_metrics(self, model_name: str, quality_scores: Dict):
180 """更新品質指標"""
181
182 for metric_type, score in quality_scores.items():
183 self.model_quality_score.labels(
184 model_name=model_name,
185 metric_type=metric_type
186 ).set(score)
187
188 def update_gpu_metrics(self, gpu_metrics: Dict):
189 """更新 GPU 指標"""
190
191 for gpu_id, metrics in gpu_metrics.items():
192 self.gpu_utilization.labels(
193 gpu_id=gpu_id,
194 model_name=metrics.get('model_name', 'unknown')
195 ).set(metrics['utilization_percent'])
196
197# 使用範例
198observability = AISystemObservability("ai-inference-service")
199
200@observability.track_ai_request("gpt-4", "chat_completion")
201def generate_response(prompt: str) -> Dict:
202 # 模擬 AI 推理
203 time.sleep(1.2)
204
205 return {
206 'response': 'Generated response',
207 'input_tokens': 50,
208 'output_tokens': 100,
209 'model_version': '1.0.0'
210 }
分散式追蹤與日誌聚合
Jaeger + ELK Stack 整合:
1from opentelemetry.exporter.jaeger.thrift import JaegerExporter
2from opentelemetry.sdk.trace import TracerProvider
3from opentelemetry.sdk.trace.export import BatchSpanProcessor
4import elasticsearch
5from datetime import datetime
6import json
7
8class DistributedTraceManager:
9 def __init__(self, jaeger_endpoint: str, elasticsearch_host: str):
10 self.setup_jaeger_tracing(jaeger_endpoint)
11 self.setup_elasticsearch_logging(elasticsearch_host)
12
13 def setup_jaeger_tracing(self, endpoint: str):
14 """設定 Jaeger 分散式追蹤"""
15
16 # 配置 Jaeger 導出器
17 jaeger_exporter = JaegerExporter(
18 agent_host_name="localhost",
19 agent_port=14268,
20 collector_endpoint=endpoint,
21 )
22
23 # 設定追蹤提供者
24 trace.set_tracer_provider(TracerProvider())
25 tracer = trace.get_tracer(__name__)
26
27 # 添加批次處理器
28 span_processor = BatchSpanProcessor(jaeger_exporter)
29 trace.get_tracer_provider().add_span_processor(span_processor)
30
31 self.tracer = tracer
32
33 def setup_elasticsearch_logging(self, host: str):
34 """設定 Elasticsearch 日誌聚合"""
35
36 self.es_client = elasticsearch.Elasticsearch([host])
37
38 # 創建索引映射
39 index_mapping = {
40 "mappings": {
41 "properties": {
42 "timestamp": {"type": "date"},
43 "service_name": {"type": "keyword"},
44 "trace_id": {"type": "keyword"},
45 "span_id": {"type": "keyword"},
46 "level": {"type": "keyword"},
47 "message": {"type": "text"},
48 "model_name": {"type": "keyword"},
49 "endpoint": {"type": "keyword"},
50 "duration": {"type": "float"},
51 "error_type": {"type": "keyword"},
52 "user_id": {"type": "keyword"},
53 "request_size": {"type": "integer"},
54 "response_size": {"type": "integer"}
55 }
56 }
57 }
58
59 index_name = f"ai-logs-{datetime.now().strftime('%Y-%m')}"
60
61 try:
62 if not self.es_client.indices.exists(index=index_name):
63 self.es_client.indices.create(index=index_name, body=index_mapping)
64 except Exception as e:
65 print(f"Failed to create Elasticsearch index: {e}")
66
67 def log_ai_operation(self, operation_type: str, details: Dict):
68 """記錄 AI 操作到 Elasticsearch"""
69
70 current_span = trace.get_current_span()
71 span_context = current_span.get_span_context()
72
73 log_entry = {
74 "timestamp": datetime.utcnow().isoformat(),
75 "service_name": "ai-service",
76 "trace_id": format(span_context.trace_id, '032x'),
77 "span_id": format(span_context.span_id, '016x'),
78 "operation_type": operation_type,
79 **details
80 }
81
82 index_name = f"ai-logs-{datetime.now().strftime('%Y-%m')}"
83
84 try:
85 self.es_client.index(index=index_name, body=log_entry)
86 except Exception as e:
87 print(f"Failed to log to Elasticsearch: {e}")
88
89 def create_distributed_span(self, operation_name: str, model_name: str):
90 """創建分散式追蹤 span"""
91
92 return self.tracer.start_as_current_span(
93 operation_name,
94 attributes={
95 "ai.model.name": model_name,
96 "ai.operation": operation_name,
97 "service.name": "ai-inference"
98 }
99 )
100
101 def search_logs(self, query: Dict, time_range: Optional[Dict] = None) -> List[Dict]:
102 """搜尋日誌"""
103
104 search_body = {
105 "query": {
106 "bool": {
107 "must": [query]
108 }
109 },
110 "sort": [{"timestamp": {"order": "desc"}}],
111 "size": 100
112 }
113
114 if time_range:
115 search_body["query"]["bool"]["must"].append({
116 "range": {
117 "timestamp": time_range
118 }
119 })
120
121 try:
122 response = self.es_client.search(
123 index="ai-logs-*",
124 body=search_body
125 )
126
127 return [hit["_source"] for hit in response["hits"]["hits"]]
128 except Exception as e:
129 print(f"Failed to search logs: {e}")
130 return []
131
132 def analyze_error_patterns(self, time_window: str = "1h") -> Dict:
133 """分析錯誤模式"""
134
135 aggregation_query = {
136 "size": 0,
137 "query": {
138 "bool": {
139 "must": [
140 {"exists": {"field": "error_type"}},
141 {"range": {"timestamp": {"gte": f"now-{time_window}"}}}
142 ]
143 }
144 },
145 "aggs": {
146 "error_types": {
147 "terms": {"field": "error_type"},
148 "aggs": {
149 "models": {
150 "terms": {"field": "model_name"}
151 },
152 "endpoints": {
153 "terms": {"field": "endpoint"}
154 }
155 }
156 },
157 "error_timeline": {
158 "date_histogram": {
159 "field": "timestamp",
160 "interval": "5m"
161 }
162 }
163 }
164 }
165
166 try:
167 response = self.es_client.search(
168 index="ai-logs-*",
169 body=aggregation_query
170 )
171
172 return {
173 "error_types": response["aggregations"]["error_types"]["buckets"],
174 "timeline": response["aggregations"]["error_timeline"]["buckets"]
175 }
176 except Exception as e:
177 print(f"Failed to analyze error patterns: {e}")
178 return {}
3. 智能故障診斷與自動恢復
故障檢測系統
多層次異常檢測:
1import numpy as np
2from scipy import stats
3from sklearn.ensemble import IsolationForest
4from typing import Dict, List, Tuple
5from collections import defaultdict, deque
6import time
7
8class IntelligentFaultDetector:
9 def __init__(self):
10 self.metric_history = defaultdict(lambda: deque(maxlen=1000))
11 self.anomaly_detectors = {}
12 self.alert_rules = []
13 self.incident_tracker = {}
14
15 def register_metric_stream(self, metric_name: str, detection_method: str = "statistical"):
16 """註冊指標流監控"""
17
18 if detection_method == "statistical":
19 self.anomaly_detectors[metric_name] = StatisticalAnomalyDetector()
20 elif detection_method == "isolation_forest":
21 self.anomaly_detectors[metric_name] = MLAnomalyDetector()
22 elif detection_method == "threshold":
23 self.anomaly_detectors[metric_name] = ThresholdAnomalyDetector()
24
25 def add_alert_rule(self, rule: Dict):
26 """添加告警規則"""
27 self.alert_rules.append(rule)
28
29 def process_metric(self, metric_name: str, value: float, timestamp: float,
30 context: Dict = None) -> List[Dict]:
31 """處理指標並檢測異常"""
32
33 # 記錄歷史數據
34 self.metric_history[metric_name].append({
35 'value': value,
36 'timestamp': timestamp,
37 'context': context or {}
38 })
39
40 alerts = []
41
42 # 執行異常檢測
43 if metric_name in self.anomaly_detectors:
44 detector = self.anomaly_detectors[metric_name]
45
46 # 獲取歷史數據
47 historical_values = [
48 point['value'] for point in self.metric_history[metric_name]
49 ]
50
51 is_anomaly, anomaly_score, explanation = detector.detect_anomaly(
52 current_value=value,
53 historical_values=historical_values[-100:], # 最近 100 個點
54 context=context
55 )
56
57 if is_anomaly:
58 alert = self._create_anomaly_alert(
59 metric_name=metric_name,
60 value=value,
61 timestamp=timestamp,
62 anomaly_score=anomaly_score,
63 explanation=explanation,
64 context=context
65 )
66 alerts.append(alert)
67
68 # 檢查規則告警
69 rule_alerts = self._check_alert_rules(metric_name, value, timestamp, context)
70 alerts.extend(rule_alerts)
71
72 # 更新事件追蹤
73 self._update_incident_tracking(alerts)
74
75 return alerts
76
77 def _create_anomaly_alert(self, metric_name: str, value: float, timestamp: float,
78 anomaly_score: float, explanation: str, context: Dict) -> Dict:
79 """創建異常告警"""
80
81 severity = self._calculate_severity(anomaly_score, context)
82
83 return {
84 'type': 'anomaly',
85 'metric_name': metric_name,
86 'current_value': value,
87 'anomaly_score': anomaly_score,
88 'severity': severity,
89 'timestamp': timestamp,
90 'explanation': explanation,
91 'context': context,
92 'alert_id': f"anomaly_{metric_name}_{int(timestamp)}"
93 }
94
95 def _check_alert_rules(self, metric_name: str, value: float,
96 timestamp: float, context: Dict) -> List[Dict]:
97 """檢查告警規則"""
98
99 alerts = []
100
101 for rule in self.alert_rules:
102 if rule['metric'] == metric_name:
103 if self._evaluate_rule_condition(rule, value, context):
104 alert = {
105 'type': 'rule',
106 'rule_name': rule['name'],
107 'metric_name': metric_name,
108 'current_value': value,
109 'threshold': rule['threshold'],
110 'severity': rule['severity'],
111 'timestamp': timestamp,
112 'message': rule['message'].format(value=value),
113 'context': context,
114 'alert_id': f"rule_{rule['name']}_{int(timestamp)}"
115 }
116 alerts.append(alert)
117
118 return alerts
119
120 def _evaluate_rule_condition(self, rule: Dict, value: float, context: Dict) -> bool:
121 """評估規則條件"""
122
123 condition_type = rule['condition']
124 threshold = rule['threshold']
125
126 if condition_type == 'greater_than':
127 return value > threshold
128 elif condition_type == 'less_than':
129 return value < threshold
130 elif condition_type == 'equals':
131 return abs(value - threshold) < 0.001
132 elif condition_type == 'not_equals':
133 return abs(value - threshold) >= 0.001
134 elif condition_type == 'percentage_increase':
135 # 需要歷史數據比較
136 historical_avg = self._get_historical_average(rule['metric'], window=rule.get('window', 300))
137 if historical_avg:
138 increase = (value - historical_avg) / historical_avg
139 return increase > threshold
140
141 return False
142
143 def _calculate_severity(self, anomaly_score: float, context: Dict) -> str:
144 """計算告警嚴重程度"""
145
146 # 基於異常分數
147 if anomaly_score > 0.9:
148 base_severity = 'critical'
149 elif anomaly_score > 0.7:
150 base_severity = 'high'
151 elif anomaly_score > 0.5:
152 base_severity = 'medium'
153 else:
154 base_severity = 'low'
155
156 # 考慮業務影響
157 if context.get('business_impact') == 'high':
158 if base_severity in ['medium', 'low']:
159 base_severity = 'high'
160
161 return base_severity
162
163 def _update_incident_tracking(self, alerts: List[Dict]):
164 """更新事件追蹤"""
165
166 for alert in alerts:
167 alert_id = alert['alert_id']
168
169 if alert_id not in self.incident_tracker:
170 self.incident_tracker[alert_id] = {
171 'first_occurrence': alert['timestamp'],
172 'last_occurrence': alert['timestamp'],
173 'count': 1,
174 'status': 'open',
175 'alert_data': alert
176 }
177 else:
178 self.incident_tracker[alert_id]['last_occurrence'] = alert['timestamp']
179 self.incident_tracker[alert_id]['count'] += 1
180
181 def get_active_incidents(self) -> List[Dict]:
182 """獲取活躍事件"""
183
184 active_incidents = []
185 current_time = time.time()
186
187 for incident_id, incident in self.incident_tracker.items():
188 if (incident['status'] == 'open' and
189 current_time - incident['last_occurrence'] < 3600): # 1小時內
190 active_incidents.append(incident)
191
192 return active_incidents
193
194 def resolve_incident(self, incident_id: str, resolution_note: str = ""):
195 """解決事件"""
196
197 if incident_id in self.incident_tracker:
198 self.incident_tracker[incident_id]['status'] = 'resolved'
199 self.incident_tracker[incident_id]['resolution_time'] = time.time()
200 self.incident_tracker[incident_id]['resolution_note'] = resolution_note
201
202class StatisticalAnomalyDetector:
203 def __init__(self, zscore_threshold: float = 3.0):
204 self.zscore_threshold = zscore_threshold
205
206 def detect_anomaly(self, current_value: float, historical_values: List[float],
207 context: Dict = None) -> Tuple[bool, float, str]:
208 """統計異常檢測"""
209
210 if len(historical_values) < 10:
211 return False, 0.0, "Insufficient data for statistical analysis"
212
213 # 計算 Z-score
214 mean = np.mean(historical_values)
215 std = np.std(historical_values)
216
217 if std == 0:
218 return False, 0.0, "No variation in historical data"
219
220 zscore = abs(current_value - mean) / std
221 is_anomaly = zscore > self.zscore_threshold
222
223 # 正規化異常分數
224 anomaly_score = min(zscore / (self.zscore_threshold * 2), 1.0)
225
226 explanation = f"Z-score: {zscore:.2f}, Mean: {mean:.2f}, Std: {std:.2f}"
227
228 return is_anomaly, anomaly_score, explanation
229
230class MLAnomalyDetector:
231 def __init__(self):
232 self.isolation_forest = IsolationForest(contamination=0.1, random_state=42)
233 self.trained = False
234
235 def detect_anomaly(self, current_value: float, historical_values: List[float],
236 context: Dict = None) -> Tuple[bool, float, str]:
237 """機器學習異常檢測"""
238
239 if len(historical_values) < 50:
240 return False, 0.0, "Insufficient data for ML analysis"
241
242 # 準備訓練數據
243 X = np.array(historical_values).reshape(-1, 1)
244
245 # 訓練模型(如果尚未訓練或需要更新)
246 if not self.trained or len(historical_values) % 100 == 0:
247 self.isolation_forest.fit(X)
248 self.trained = True
249
250 # 檢測當前值
251 current_X = np.array([[current_value]])
252 anomaly_score = self.isolation_forest.decision_function(current_X)[0]
253 prediction = self.isolation_forest.predict(current_X)[0]
254
255 is_anomaly = prediction == -1
256
257 # 轉換異常分數到 0-1 範圍
258 normalized_score = 1.0 / (1.0 + np.exp(anomaly_score))
259
260 explanation = f"Isolation Forest score: {anomaly_score:.3f}"
261
262 return is_anomaly, normalized_score, explanation
263
264class ThresholdAnomalyDetector:
265 def __init__(self, upper_threshold: Optional[float] = None,
266 lower_threshold: Optional[float] = None):
267 self.upper_threshold = upper_threshold
268 self.lower_threshold = lower_threshold
269
270 def detect_anomaly(self, current_value: float, historical_values: List[float],
271 context: Dict = None) -> Tuple[bool, float, str]:
272 """閾值異常檢測"""
273
274 is_anomaly = False
275 explanation_parts = []
276
277 if self.upper_threshold is not None and current_value > self.upper_threshold:
278 is_anomaly = True
279 explanation_parts.append(f"Above upper threshold: {current_value} > {self.upper_threshold}")
280
281 if self.lower_threshold is not None and current_value < self.lower_threshold:
282 is_anomaly = True
283 explanation_parts.append(f"Below lower threshold: {current_value} < {self.lower_threshold}")
284
285 # 計算與閾值的距離作為異常分數
286 anomaly_score = 0.0
287 if is_anomaly:
288 if self.upper_threshold is not None and current_value > self.upper_threshold:
289 anomaly_score = min((current_value - self.upper_threshold) / self.upper_threshold, 1.0)
290 elif self.lower_threshold is not None and current_value < self.lower_threshold:
291 anomaly_score = min((self.lower_threshold - current_value) / abs(self.lower_threshold), 1.0)
292
293 explanation = "; ".join(explanation_parts) if explanation_parts else "Within thresholds"
294
295 return is_anomaly, anomaly_score, explanation
自動恢復機制
智能自愈系統:
1import asyncio
2from abc import ABC, abstractmethod
3from enum import Enum
4from typing import Dict, List, Callable
5
6class RecoveryAction(Enum):
7 RESTART_SERVICE = "restart_service"
8 SCALE_UP = "scale_up"
9 SCALE_DOWN = "scale_down"
10 SWITCH_MODEL = "switch_model"
11 CLEAR_CACHE = "clear_cache"
12 ROLLBACK_DEPLOYMENT = "rollback_deployment"
13 CIRCUIT_BREAK = "circuit_break"
14
15class AutoRecoverySystem:
16 def __init__(self):
17 self.recovery_strategies = {}
18 self.execution_history = deque(maxlen=100)
19 self.circuit_breakers = {}
20 self.recovery_locks = {}
21
22 def register_recovery_strategy(self, failure_pattern: str, strategy: 'RecoveryStrategy'):
23 """註冊恢復策略"""
24 self.recovery_strategies[failure_pattern] = strategy
25
26 async def handle_failure(self, failure_context: Dict) -> Dict:
27 """處理故障"""
28
29 failure_type = self._classify_failure(failure_context)
30
31 # 檢查是否有匹配的恢復策略
32 strategy = self._match_recovery_strategy(failure_type, failure_context)
33
34 if not strategy:
35 return {
36 'success': False,
37 'reason': 'No matching recovery strategy found',
38 'failure_type': failure_type
39 }
40
41 # 檢查恢復鎖避免並發執行同類型恢復
42 lock_key = f"{failure_type}_{failure_context.get('service_name', 'unknown')}"
43
44 if lock_key in self.recovery_locks:
45 return {
46 'success': False,
47 'reason': 'Recovery already in progress',
48 'failure_type': failure_type
49 }
50
51 # 設定恢復鎖
52 self.recovery_locks[lock_key] = True
53
54 try:
55 # 執行恢復策略
56 recovery_result = await strategy.execute(failure_context)
57
58 # 記錄執行歷史
59 self._record_recovery_execution(failure_context, strategy, recovery_result)
60
61 return recovery_result
62
63 except Exception as e:
64 return {
65 'success': False,
66 'reason': f'Recovery execution failed: {str(e)}',
67 'failure_type': failure_type
68 }
69
70 finally:
71 # 移除恢復鎖
72 self.recovery_locks.pop(lock_key, None)
73
74 def _classify_failure(self, failure_context: Dict) -> str:
75 """分類故障類型"""
76
77 # 基於指標和上下文分類故障
78 if failure_context.get('metric_name') == 'latency' and failure_context.get('current_value', 0) > 10:
79 return 'high_latency'
80 elif failure_context.get('metric_name') == 'error_rate' and failure_context.get('current_value', 0) > 0.1:
81 return 'high_error_rate'
82 elif failure_context.get('metric_name') == 'memory_usage' and failure_context.get('current_value', 0) > 0.9:
83 return 'memory_exhaustion'
84 elif failure_context.get('metric_name') == 'cpu_usage' and failure_context.get('current_value', 0) > 0.9:
85 return 'cpu_exhaustion'
86 elif 'connection' in str(failure_context.get('error_message', '')).lower():
87 return 'connection_failure'
88 elif 'timeout' in str(failure_context.get('error_message', '')).lower():
89 return 'timeout_failure'
90 else:
91 return 'unknown_failure'
92
93 def _match_recovery_strategy(self, failure_type: str, failure_context: Dict) -> Optional['RecoveryStrategy']:
94 """匹配恢復策略"""
95
96 # 直接匹配
97 if failure_type in self.recovery_strategies:
98 return self.recovery_strategies[failure_type]
99
100 # 模式匹配
101 for pattern, strategy in self.recovery_strategies.items():
102 if pattern in failure_type or failure_type in pattern:
103 return strategy
104
105 return None
106
107 def _record_recovery_execution(self, failure_context: Dict, strategy: 'RecoveryStrategy', result: Dict):
108 """記錄恢復執行歷史"""
109
110 execution_record = {
111 'timestamp': time.time(),
112 'failure_context': failure_context,
113 'strategy_name': strategy.__class__.__name__,
114 'result': result,
115 'success': result.get('success', False)
116 }
117
118 self.execution_history.append(execution_record)
119
120class RecoveryStrategy(ABC):
121 def __init__(self, name: str, max_retries: int = 3, retry_delay: float = 5.0):
122 self.name = name
123 self.max_retries = max_retries
124 self.retry_delay = retry_delay
125
126 @abstractmethod
127 async def execute(self, failure_context: Dict) -> Dict:
128 """執行恢復動作"""
129 pass
130
131 async def _retry_with_backoff(self, action: Callable, max_retries: int = None) -> bool:
132 """帶退避的重試機制"""
133
134 retries = max_retries or self.max_retries
135
136 for attempt in range(retries):
137 try:
138 await action()
139 return True
140 except Exception as e:
141 if attempt < retries - 1:
142 wait_time = self.retry_delay * (2 ** attempt) # 指數退避
143 await asyncio.sleep(wait_time)
144 else:
145 raise e
146
147 return False
148
149class ServiceRestartStrategy(RecoveryStrategy):
150 def __init__(self, service_manager):
151 super().__init__("ServiceRestart")
152 self.service_manager = service_manager
153
154 async def execute(self, failure_context: Dict) -> Dict:
155 """重啟服務恢復策略"""
156
157 service_name = failure_context.get('service_name')
158
159 if not service_name:
160 return {
161 'success': False,
162 'reason': 'Service name not provided in failure context'
163 }
164
165 try:
166 # 停止服務
167 await self.service_manager.stop_service(service_name)
168
169 # 等待一段時間
170 await asyncio.sleep(2)
171
172 # 重新啟動服務
173 await self.service_manager.start_service(service_name)
174
175 # 驗證服務健康狀態
176 health_check_passed = await self._retry_with_backoff(
177 lambda: self.service_manager.check_service_health(service_name)
178 )
179
180 if health_check_passed:
181 return {
182 'success': True,
183 'action': RecoveryAction.RESTART_SERVICE.value,
184 'service_name': service_name,
185 'message': f'Successfully restarted service {service_name}'
186 }
187 else:
188 return {
189 'success': False,
190 'reason': f'Service {service_name} restart failed health check'
191 }
192
193 except Exception as e:
194 return {
195 'success': False,
196 'reason': f'Failed to restart service {service_name}: {str(e)}'
197 }
198
199class AutoScalingStrategy(RecoveryStrategy):
200 def __init__(self, scaling_manager):
201 super().__init__("AutoScaling")
202 self.scaling_manager = scaling_manager
203
204 async def execute(self, failure_context: Dict) -> Dict:
205 """自動擴展恢復策略"""
206
207 service_name = failure_context.get('service_name')
208 metric_name = failure_context.get('metric_name')
209 current_value = failure_context.get('current_value', 0)
210
211 # 決定擴展方向
212 if metric_name in ['cpu_usage', 'memory_usage', 'latency'] and current_value > 0.8:
213 scale_direction = 'up'
214 scale_factor = 1.5
215 elif metric_name == 'request_rate' and current_value > 1000:
216 scale_direction = 'up'
217 scale_factor = 2.0
218 else:
219 return {
220 'success': False,
221 'reason': 'No scaling decision could be made based on metrics'
222 }
223
224 try:
225 if scale_direction == 'up':
226 result = await self.scaling_manager.scale_up(service_name, scale_factor)
227 action = RecoveryAction.SCALE_UP
228 else:
229 result = await self.scaling_manager.scale_down(service_name, scale_factor)
230 action = RecoveryAction.SCALE_DOWN
231
232 return {
233 'success': True,
234 'action': action.value,
235 'service_name': service_name,
236 'scale_factor': scale_factor,
237 'new_instance_count': result.get('new_instance_count'),
238 'message': f'Successfully scaled {scale_direction} service {service_name}'
239 }
240
241 except Exception as e:
242 return {
243 'success': False,
244 'reason': f'Failed to scale service {service_name}: {str(e)}'
245 }
246
247class ModelSwitchStrategy(RecoveryStrategy):
248 def __init__(self, model_manager):
249 super().__init__("ModelSwitch")
250 self.model_manager = model_manager
251
252 async def execute(self, failure_context: Dict) -> Dict:
253 """模型切換恢復策略"""
254
255 current_model = failure_context.get('model_name')
256 service_name = failure_context.get('service_name')
257
258 if not current_model:
259 return {
260 'success': False,
261 'reason': 'Current model name not provided in failure context'
262 }
263
264 # 獲取備用模型
265 fallback_models = await self.model_manager.get_fallback_models(current_model)
266
267 if not fallback_models:
268 return {
269 'success': False,
270 'reason': f'No fallback models available for {current_model}'
271 }
272
273 # 嘗試切換到每個備用模型
274 for fallback_model in fallback_models:
275 try:
276 # 執行模型切換
277 await self.model_manager.switch_model(service_name, fallback_model)
278
279 # 驗證新模型健康狀態
280 health_check_passed = await self._retry_with_backoff(
281 lambda: self.model_manager.check_model_health(fallback_model)
282 )
283
284 if health_check_passed:
285 return {
286 'success': True,
287 'action': RecoveryAction.SWITCH_MODEL.value,
288 'original_model': current_model,
289 'fallback_model': fallback_model,
290 'service_name': service_name,
291 'message': f'Successfully switched from {current_model} to {fallback_model}'
292 }
293
294 except Exception as e:
295 continue # 嘗試下一個備用模型
296
297 return {
298 'success': False,
299 'reason': f'Failed to switch to any fallback model for {current_model}'
300 }
301
302# 使用範例
303async def setup_auto_recovery():
304 recovery_system = AutoRecoverySystem()
305
306 # 註冊恢復策略
307 recovery_system.register_recovery_strategy(
308 'high_latency',
309 AutoScalingStrategy(scaling_manager)
310 )
311
312 recovery_system.register_recovery_strategy(
313 'high_error_rate',
314 ModelSwitchStrategy(model_manager)
315 )
316
317 recovery_system.register_recovery_strategy(
318 'connection_failure',
319 ServiceRestartStrategy(service_manager)
320 )
321
322 return recovery_system
4. 成本最佳化與資源管理
智能成本控制
動態資源分配與成本監控:
1import asyncio
2from dataclasses import dataclass
3from typing import Dict, List, Optional, Tuple
4from enum import Enum
5import numpy as np
6from datetime import datetime, timedelta
7
8class ResourceType(Enum):
9 GPU = "gpu"
10 CPU = "cpu"
11 MEMORY = "memory"
12 STORAGE = "storage"
13 NETWORK = "network"
14
15@dataclass
16class ResourceUsage:
17 resource_type: ResourceType
18 used: float
19 total: float
20 cost_per_unit: float
21 timestamp: datetime
22 model_name: str
23 service_name: str
24
25@dataclass
26class CostOptimizationRecommendation:
27 action_type: str # scale_down, switch_instance_type, schedule_shutdown
28 estimated_savings: float # 每月預估節省金額
29 impact_assessment: str # 對效能的影響評估
30 confidence_score: float # 建議信心分數
31 implementation_details: Dict
32
33class IntelligentCostOptimizer:
34 def __init__(self):
35 self.resource_usage_history = {}
36 self.cost_models = {}
37 self.optimization_rules = []
38 self.budget_limits = {}
39
40 def register_cost_model(self, resource_type: ResourceType, cost_function: Callable):
41 """註冊成本計算模型"""
42 self.cost_models[resource_type] = cost_function
43
44 def set_budget_limit(self, service_name: str, monthly_budget: float):
45 """設定服務預算限制"""
46 self.budget_limits[service_name] = monthly_budget
47
48 def track_resource_usage(self, usage: ResourceUsage):
49 """追蹤資源使用情況"""
50 key = f"{usage.service_name}_{usage.model_name}_{usage.resource_type.value}"
51
52 if key not in self.resource_usage_history:
53 self.resource_usage_history[key] = deque(maxlen=1000)
54
55 self.resource_usage_history[key].append(usage)
56
57 def calculate_current_costs(self, time_window: timedelta = timedelta(hours=24)) -> Dict:
58 """計算當前成本"""
59
60 current_time = datetime.now()
61 cutoff_time = current_time - time_window
62
63 cost_breakdown = {}
64
65 for key, usage_history in self.resource_usage_history.items():
66 service_name, model_name, resource_type = key.split('_')
67
68 # 篩選時間窗口內的使用數據
69 recent_usage = [
70 usage for usage in usage_history
71 if usage.timestamp >= cutoff_time
72 ]
73
74 if not recent_usage:
75 continue
76
77 # 計算平均使用量
78 avg_usage = np.mean([usage.used for usage in recent_usage])
79 cost_per_unit = recent_usage[-1].cost_per_unit
80
81 # 計算時間窗口內的總成本
82 hours_in_window = time_window.total_seconds() / 3600
83 total_cost = avg_usage * cost_per_unit * hours_in_window
84
85 # 組織成本分解
86 if service_name not in cost_breakdown:
87 cost_breakdown[service_name] = {}
88
89 if model_name not in cost_breakdown[service_name]:
90 cost_breakdown[service_name][model_name] = {}
91
92 cost_breakdown[service_name][model_name][resource_type] = {
93 'usage': avg_usage,
94 'cost_per_unit': cost_per_unit,
95 'total_cost': total_cost
96 }
97
98 return cost_breakdown
99
100 def analyze_cost_patterns(self, service_name: str) -> Dict:
101 """分析成本模式"""
102
103 patterns = {
104 'hourly_distribution': self._analyze_hourly_cost_distribution(service_name),
105 'weekly_trend': self._analyze_weekly_cost_trend(service_name),
106 'resource_efficiency': self._analyze_resource_efficiency(service_name),
107 'cost_drivers': self._identify_cost_drivers(service_name)
108 }
109
110 return patterns
111
112 def generate_optimization_recommendations(self, service_name: str) -> List[CostOptimizationRecommendation]:
113 """生成成本最佳化建議"""
114
115 recommendations = []
116
117 # 分析資源使用模式
118 patterns = self.analyze_cost_patterns(service_name)
119
120 # 1. 閒置資源檢測
121 idle_recommendations = self._detect_idle_resources(service_name, patterns)
122 recommendations.extend(idle_recommendations)
123
124 # 2. 過度配置檢測
125 overprovisioning_recommendations = self._detect_overprovisioning(service_name, patterns)
126 recommendations.extend(overprovisioning_recommendations)
127
128 # 3. 時間排程最佳化
129 scheduling_recommendations = self._optimize_scheduling(service_name, patterns)
130 recommendations.extend(scheduling_recommendations)
131
132 # 4. 實例類型最佳化
133 instance_recommendations = self._optimize_instance_types(service_name, patterns)
134 recommendations.extend(instance_recommendations)
135
136 # 按預估節省金額排序
137 recommendations.sort(key=lambda x: x.estimated_savings, reverse=True)
138
139 return recommendations
140
141 def _analyze_hourly_cost_distribution(self, service_name: str) -> Dict:
142 """分析每小時成本分佈"""
143
144 hourly_costs = defaultdict(list)
145
146 for key, usage_history in self.resource_usage_history.items():
147 if not key.startswith(service_name):
148 continue
149
150 for usage in usage_history:
151 hour = usage.timestamp.hour
152 cost = usage.used * usage.cost_per_unit
153 hourly_costs[hour].append(cost)
154
155 # 計算每小時平均成本
156 hourly_avg = {
157 hour: np.mean(costs) for hour, costs in hourly_costs.items()
158 }
159
160 return {
161 'hourly_average': hourly_avg,
162 'peak_hours': sorted(hourly_avg.keys(), key=lambda h: hourly_avg[h], reverse=True)[:3],
163 'low_usage_hours': sorted(hourly_avg.keys(), key=lambda h: hourly_avg[h])[:3]
164 }
165
166 def _detect_idle_resources(self, service_name: str, patterns: Dict) -> List[CostOptimizationRecommendation]:
167 """檢測閒置資源"""
168
169 recommendations = []
170
171 # 檢查低使用時段
172 low_usage_hours = patterns['hourly_distribution']['low_usage_hours']
173
174 if len(low_usage_hours) >= 6: # 如果有6小時或更多的低使用時段
175 # 計算潛在節省
176 current_hourly_cost = self._calculate_average_hourly_cost(service_name)
177 potential_savings = current_hourly_cost * len(low_usage_hours) * 30 # 每月
178
179 recommendation = CostOptimizationRecommendation(
180 action_type="schedule_shutdown",
181 estimated_savings=potential_savings,
182 impact_assessment="Minimal impact during low-usage hours",
183 confidence_score=0.8,
184 implementation_details={
185 "shutdown_hours": low_usage_hours,
186 "automation_script": "schedule_auto_shutdown.py",
187 "wake_up_trigger": "demand_based"
188 }
189 )
190
191 recommendations.append(recommendation)
192
193 return recommendations
194
195 def _detect_overprovisioning(self, service_name: str, patterns: Dict) -> List[CostOptimizationRecommendation]:
196 """檢測過度配置"""
197
198 recommendations = []
199 efficiency = patterns['resource_efficiency']
200
201 for resource_type, efficiency_score in efficiency.items():
202 if efficiency_score < 0.6: # 效率低於60%
203
204 # 建議縮減配置
205 current_cost = self._get_current_resource_cost(service_name, resource_type)
206 potential_savings = current_cost * (1 - efficiency_score) * 0.8 # 保守估計
207
208 recommendation = CostOptimizationRecommendation(
209 action_type="scale_down",
210 estimated_savings=potential_savings,
211 impact_assessment=f"Low impact - {resource_type} utilization is only {efficiency_score*100:.1f}%",
212 confidence_score=0.9,
213 implementation_details={
214 "resource_type": resource_type,
215 "current_efficiency": efficiency_score,
216 "recommended_reduction": f"{(1-efficiency_score)*50:.1f}%",
217 "gradual_scaling": True
218 }
219 )
220
221 recommendations.append(recommendation)
222
223 return recommendations
224
225 def _optimize_instance_types(self, service_name: str, patterns: Dict) -> List[CostOptimizationRecommendation]:
226 """最佳化實例類型"""
227
228 recommendations = []
229
230 # 分析當前資源使用比例
231 resource_usage = patterns['resource_efficiency']
232
233 gpu_usage = resource_usage.get('gpu', 0)
234 cpu_usage = resource_usage.get('cpu', 0)
235 memory_usage = resource_usage.get('memory', 0)
236
237 # GPU 最佳化建議
238 if gpu_usage < 0.5 and gpu_usage > 0: # GPU 使用率低但有使用
239 current_gpu_cost = self._get_current_resource_cost(service_name, 'gpu')
240
241 # 建議使用較小的 GPU 實例
242 potential_savings = current_gpu_cost * 0.4 # 假設可節省40%
243
244 recommendation = CostOptimizationRecommendation(
245 action_type="switch_instance_type",
246 estimated_savings=potential_savings,
247 impact_assessment="Moderate impact - may increase latency slightly",
248 confidence_score=0.7,
249 implementation_details={
250 "current_gpu_usage": gpu_usage,
251 "recommended_action": "Switch to smaller GPU instance",
252 "suggested_instance": "g4dn.xlarge instead of g4dn.2xlarge",
253 "rollback_plan": "Monitor latency and scale back if needed"
254 }
255 )
256
257 recommendations.append(recommendation)
258
259 # CPU/Memory 比例最佳化
260 if abs(cpu_usage - memory_usage) > 0.3: # CPU 和記憶體使用率差異大
261 current_compute_cost = self._get_current_resource_cost(service_name, 'cpu')
262 potential_savings = current_compute_cost * 0.15
263
264 if cpu_usage > memory_usage:
265 instance_type = "CPU-optimized"
266 else:
267 instance_type = "Memory-optimized"
268
269 recommendation = CostOptimizationRecommendation(
270 action_type="switch_instance_type",
271 estimated_savings=potential_savings,
272 impact_assessment="Low impact - better resource alignment",
273 confidence_score=0.8,
274 implementation_details={
275 "cpu_usage": cpu_usage,
276 "memory_usage": memory_usage,
277 "recommended_instance_type": instance_type,
278 "migration_plan": "Blue-green deployment recommended"
279 }
280 )
281
282 recommendations.append(recommendation)
283
284 return recommendations
285
286 async def implement_recommendation(self, recommendation: CostOptimizationRecommendation,
287 service_name: str) -> Dict:
288 """實施最佳化建議"""
289
290 implementation_result = {
291 'success': False,
292 'action_taken': recommendation.action_type,
293 'estimated_savings': recommendation.estimated_savings,
294 'actual_impact': None
295 }
296
297 try:
298 if recommendation.action_type == "scale_down":
299 result = await self._execute_scale_down(service_name, recommendation.implementation_details)
300 elif recommendation.action_type == "schedule_shutdown":
301 result = await self._execute_scheduled_shutdown(service_name, recommendation.implementation_details)
302 elif recommendation.action_type == "switch_instance_type":
303 result = await self._execute_instance_switch(service_name, recommendation.implementation_details)
304 else:
305 result = {'success': False, 'reason': 'Unsupported action type'}
306
307 implementation_result.update(result)
308
309 except Exception as e:
310 implementation_result['error'] = str(e)
311
312 return implementation_result
313
314 def monitor_budget_usage(self) -> Dict:
315 """監控預算使用情況"""
316
317 budget_status = {}
318 current_month_costs = self.calculate_current_costs(timedelta(days=30))
319
320 for service_name, budget_limit in self.budget_limits.items():
321 if service_name in current_month_costs:
322 total_service_cost = sum(
323 sum(model_costs.values()) if isinstance(model_costs, dict)
324 else model_costs
325 for model_costs in current_month_costs[service_name].values()
326 )
327
328 usage_percentage = (total_service_cost / budget_limit) * 100
329
330 budget_status[service_name] = {
331 'budget_limit': budget_limit,
332 'current_spend': total_service_cost,
333 'usage_percentage': usage_percentage,
334 'remaining_budget': budget_limit - total_service_cost,
335 'alert_level': self._get_budget_alert_level(usage_percentage)
336 }
337
338 return budget_status
339
340 def _get_budget_alert_level(self, usage_percentage: float) -> str:
341 """獲取預算告警級別"""
342
343 if usage_percentage >= 95:
344 return 'critical'
345 elif usage_percentage >= 80:
346 return 'warning'
347 elif usage_percentage >= 70:
348 return 'watch'
349 else:
350 return 'normal'
351
352# 使用範例
353async def setup_cost_optimization():
354 optimizer = IntelligentCostOptimizer()
355
356 # 設定預算限制
357 optimizer.set_budget_limit("ai-inference-service", 10000) # $10,000/月
358 optimizer.set_budget_limit("model-training-service", 5000) # $5,000/月
359
360 # 定期生成最佳化建議
361 while True:
362 services = ["ai-inference-service", "model-training-service"]
363
364 for service in services:
365 recommendations = optimizer.generate_optimization_recommendations(service)
366
367 # 自動實施低風險建議
368 for recommendation in recommendations:
369 if (recommendation.confidence_score > 0.8 and
370 recommendation.estimated_savings > 100): # 節省超過$100
371
372 print(f"Implementing optimization for {service}: {recommendation.action_type}")
373 result = await optimizer.implement_recommendation(recommendation, service)
374 print(f"Implementation result: {result}")
375
376 # 檢查預算狀態
377 budget_status = optimizer.monitor_budget_usage()
378
379 for service, status in budget_status.items():
380 if status['alert_level'] in ['warning', 'critical']:
381 print(f"Budget alert for {service}: {status['usage_percentage']:.1f}% used")
382
383 # 每小時檢查一次
384 await asyncio.sleep(3600)
總結
本文深入探討了生產環境 AI 系統監控與最佳化的核心技術:
- LLM-native 指標:時延、品質、成本與可靠性的全方位評估體系
- 分散式監控:OpenTelemetry、Prometheus 與 ELK Stack 的企業整合
- 智能故障診斷:多層次異常檢測與自動恢復機制
- 成本最佳化:動態資源分配、預算控制與智能成本建議
下一篇將完成系列最終篇,專注於客戶協作與問題解決實務,包含需求分析、技術溝通與專案交付管理。
<function_calls>
