AI Forward Deployed Engineer 必備技能指南(四):生產環境 AI 系統監控與最佳化

前言

生產環境 AI 系統的監控與最佳化是確保企業 AI 應用成功的關鍵。從模型效能追蹤、基礎設施監控到成本控制,AI FDE 需要建立全方位的可觀測性體系。本文將深入探討 LLM-native 指標設計、分散式監控架構、智能故障診斷與企業級成本最佳化策略。

1. LLM-native 指標與評估體系

核心效能指標設計

LLM 特定指標框架:

  1from dataclasses import dataclass
  2from typing import Dict, List, Optional, Union
  3import numpy as np
  4from collections import deque
  5import time
  6import asyncio
  7from enum import Enum
  8
  9class MetricType(Enum):
 10    LATENCY = "latency"
 11    THROUGHPUT = "throughput"
 12    QUALITY = "quality"
 13    COST = "cost"
 14    RELIABILITY = "reliability"
 15
 16@dataclass
 17class LLMMetrics:
 18    timestamp: float
 19    request_id: str
 20    model_name: str
 21    
 22    # 效能指標
 23    time_to_first_token: float  # TTFT - 首個 token 延遲
 24    time_per_output_token: float  # TPOT - 每個輸出 token 時間
 25    total_latency: float
 26    tokens_per_second: float
 27    
 28    # 品質指標
 29    perplexity: Optional[float] = None
 30    bleu_score: Optional[float] = None
 31    rouge_score: Optional[Dict[str, float]] = None
 32    human_feedback_score: Optional[float] = None
 33    
 34    # 成本指標
 35    input_tokens: int = 0
 36    output_tokens: int = 0
 37    compute_cost: float = 0.0
 38    
 39    # 可靠性指標
 40    success: bool = True
 41    error_type: Optional[str] = None
 42    retry_count: int = 0
 43
 44class LLMMetricsCollector:
 45    def __init__(self, buffer_size: int = 10000):
 46        self.metrics_buffer = deque(maxlen=buffer_size)
 47        self.real_time_stats = {}
 48        
 49    def collect_metric(self, metric: LLMMetrics):
 50        """收集單一指標"""
 51        self.metrics_buffer.append(metric)
 52        self._update_real_time_stats(metric)
 53    
 54    def _update_real_time_stats(self, metric: LLMMetrics):
 55        """更新即時統計"""
 56        model_name = metric.model_name
 57        
 58        if model_name not in self.real_time_stats:
 59            self.real_time_stats[model_name] = {
 60                'total_requests': 0,
 61                'successful_requests': 0,
 62                'total_latency': 0,
 63                'total_cost': 0,
 64                'recent_latencies': deque(maxlen=100),
 65                'recent_quality_scores': deque(maxlen=100)
 66            }
 67        
 68        stats = self.real_time_stats[model_name]
 69        stats['total_requests'] += 1
 70        
 71        if metric.success:
 72            stats['successful_requests'] += 1
 73            stats['total_latency'] += metric.total_latency
 74            stats['recent_latencies'].append(metric.total_latency)
 75            
 76            if metric.human_feedback_score is not None:
 77                stats['recent_quality_scores'].append(metric.human_feedback_score)
 78        
 79        stats['total_cost'] += metric.compute_cost
 80    
 81    def get_sla_metrics(self, model_name: str, time_window: int = 3600) -> Dict:
 82        """獲取 SLA 相關指標"""
 83        cutoff_time = time.time() - time_window
 84        
 85        recent_metrics = [
 86            m for m in self.metrics_buffer 
 87            if m.model_name == model_name and m.timestamp >= cutoff_time
 88        ]
 89        
 90        if not recent_metrics:
 91            return {}
 92        
 93        successful_metrics = [m for m in recent_metrics if m.success]
 94        
 95        # 計算關鍵 SLA 指標
 96        return {
 97            'availability': len(successful_metrics) / len(recent_metrics),
 98            'p50_latency': np.percentile([m.total_latency for m in successful_metrics], 50),
 99            'p95_latency': np.percentile([m.total_latency for m in successful_metrics], 95),
100            'p99_latency': np.percentile([m.total_latency for m in successful_metrics], 99),
101            'average_ttft': np.mean([m.time_to_first_token for m in successful_metrics]),
102            'average_tpot': np.mean([m.time_per_output_token for m in successful_metrics]),
103            'error_rate': 1 - (len(successful_metrics) / len(recent_metrics)),
104            'total_requests': len(recent_metrics),
105            'requests_per_second': len(recent_metrics) / time_window
106        }
107    
108    def detect_performance_anomalies(self, model_name: str) -> List[Dict]:
109        """檢測效能異常"""
110        anomalies = []
111        
112        if model_name not in self.real_time_stats:
113            return anomalies
114        
115        stats = self.real_time_stats[model_name]
116        recent_latencies = list(stats['recent_latencies'])
117        
118        if len(recent_latencies) < 10:
119            return anomalies
120        
121        # 統計異常檢測
122        mean_latency = np.mean(recent_latencies)
123        std_latency = np.std(recent_latencies)
124        
125        # 檢測延遲異常
126        for i, latency in enumerate(recent_latencies[-10:]):
127            if latency > mean_latency + 3 * std_latency:
128                anomalies.append({
129                    'type': 'high_latency',
130                    'value': latency,
131                    'threshold': mean_latency + 3 * std_latency,
132                    'severity': 'high' if latency > mean_latency + 5 * std_latency else 'medium'
133                })
134        
135        # 檢測品質下降
136        recent_quality = list(stats['recent_quality_scores'])
137        if len(recent_quality) >= 5:
138            recent_avg = np.mean(recent_quality[-5:])
139            historical_avg = np.mean(recent_quality[:-5]) if len(recent_quality) > 5 else recent_avg
140            
141            if recent_avg < historical_avg * 0.9:  # 品質下降超過 10%
142                anomalies.append({
143                    'type': 'quality_degradation',
144                    'current_quality': recent_avg,
145                    'historical_quality': historical_avg,
146                    'severity': 'high'
147                })
148        
149        return anomalies

進階品質評估

多維度品質檢測:

  1import torch
  2from transformers import AutoTokenizer, AutoModel
  3from sentence_transformers import SentenceTransformer
  4from typing import List, Tuple
  5
  6class LLMQualityEvaluator:
  7    def __init__(self):
  8        self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2')
  9        self.toxicity_threshold = 0.7
 10        self.coherence_threshold = 0.6
 11        
 12    def evaluate_response_quality(self, prompt: str, response: str, 
 13                                expected_response: Optional[str] = None) -> Dict:
 14        """全面評估回應品質"""
 15        
 16        quality_scores = {}
 17        
 18        # 1. 語言流暢度評估
 19        quality_scores['fluency'] = self._evaluate_fluency(response)
 20        
 21        # 2. 相關性評估
 22        quality_scores['relevance'] = self._evaluate_relevance(prompt, response)
 23        
 24        # 3. 一致性評估
 25        quality_scores['coherence'] = self._evaluate_coherence(response)
 26        
 27        # 4. 安全性評估
 28        quality_scores['safety'] = self._evaluate_safety(response)
 29        
 30        # 5. 事實正確性評估(如果有預期答案)
 31        if expected_response:
 32            quality_scores['factual_accuracy'] = self._evaluate_factual_accuracy(
 33                response, expected_response
 34            )
 35        
 36        # 6. 幻覺檢測
 37        quality_scores['hallucination_score'] = self._detect_hallucination(prompt, response)
 38        
 39        # 計算綜合品質分數
 40        weights = {
 41            'fluency': 0.15,
 42            'relevance': 0.25,
 43            'coherence': 0.20,
 44            'safety': 0.25,
 45            'factual_accuracy': 0.10,
 46            'hallucination_score': 0.05
 47        }
 48        
 49        overall_score = sum(
 50            quality_scores.get(metric, 0.5) * weight 
 51            for metric, weight in weights.items()
 52        )
 53        
 54        quality_scores['overall_quality'] = overall_score
 55        
 56        return quality_scores
 57    
 58    def _evaluate_fluency(self, response: str) -> float:
 59        """評估語言流暢度"""
 60        # 簡化實作:基於語言模型困惑度
 61        sentences = response.split('.')
 62        
 63        fluency_indicators = {
 64            'avg_sentence_length': self._calculate_avg_sentence_length(sentences),
 65            'repetition_rate': self._calculate_repetition_rate(response),
 66            'grammar_errors': self._count_grammar_errors(response)
 67        }
 68        
 69        # 正規化分數
 70        fluency_score = 1.0
 71        
 72        # 懲罰過短或過長的句子
 73        if fluency_indicators['avg_sentence_length'] < 5 or fluency_indicators['avg_sentence_length'] > 50:
 74            fluency_score *= 0.8
 75        
 76        # 懲罰高重複率
 77        if fluency_indicators['repetition_rate'] > 0.3:
 78            fluency_score *= 0.7
 79        
 80        # 懲罰語法錯誤
 81        fluency_score *= max(0.1, 1.0 - fluency_indicators['grammar_errors'] * 0.1)
 82        
 83        return max(0.0, min(1.0, fluency_score))
 84    
 85    def _evaluate_relevance(self, prompt: str, response: str) -> float:
 86        """評估回應相關性"""
 87        
 88        # 使用語意相似度評估
 89        prompt_embedding = self.sentence_model.encode([prompt])
 90        response_embedding = self.sentence_model.encode([response])
 91        
 92        # 計算餘弦相似度
 93        similarity = torch.nn.functional.cosine_similarity(
 94            torch.tensor(prompt_embedding),
 95            torch.tensor(response_embedding)
 96        ).item()
 97        
 98        return max(0.0, min(1.0, similarity))
 99    
100    def _evaluate_coherence(self, response: str) -> float:
101        """評估內容一致性"""
102        sentences = [s.strip() for s in response.split('.') if s.strip()]
103        
104        if len(sentences) <= 1:
105            return 1.0
106        
107        # 計算相鄰句子間的語意相似度
108        coherence_scores = []
109        
110        for i in range(len(sentences) - 1):
111            sim = torch.nn.functional.cosine_similarity(
112                torch.tensor(self.sentence_model.encode([sentences[i]])),
113                torch.tensor(self.sentence_model.encode([sentences[i + 1]]))
114            ).item()
115            coherence_scores.append(sim)
116        
117        return np.mean(coherence_scores) if coherence_scores else 1.0
118    
119    def _evaluate_safety(self, response: str) -> float:
120        """評估內容安全性"""
121        
122        # 檢測有害內容關鍵詞
123        harmful_keywords = [
124            'violent', 'illegal', 'discriminatory', 'hateful', 
125            'toxic', 'offensive', 'inappropriate'
126        ]
127        
128        safety_score = 1.0
129        
130        response_lower = response.lower()
131        
132        for keyword in harmful_keywords:
133            if keyword in response_lower:
134                safety_score *= 0.7
135        
136        # 檢測個人資訊洩露
137        if self._contains_pii(response):
138            safety_score *= 0.5
139        
140        return max(0.0, safety_score)
141    
142    def _evaluate_factual_accuracy(self, response: str, expected: str) -> float:
143        """評估事實正確性"""
144        
145        # 提取關鍵事實
146        response_facts = self._extract_facts(response)
147        expected_facts = self._extract_facts(expected)
148        
149        if not expected_facts:
150            return 0.5  # 無法評估
151        
152        # 計算事實重疊度
153        overlap = len(set(response_facts) & set(expected_facts))
154        accuracy = overlap / len(expected_facts) if expected_facts else 0.5
155        
156        return accuracy
157    
158    def _detect_hallucination(self, prompt: str, response: str) -> float:
159        """檢測幻覺內容"""
160        
161        # 簡化實作:檢查回應是否包含提示中沒有的具體數字、日期等
162        import re
163        
164        prompt_numbers = set(re.findall(r'\d+', prompt))
165        response_numbers = set(re.findall(r'\d+', response))
166        
167        # 計算新增數字的比例
168        new_numbers = response_numbers - prompt_numbers
169        hallucination_indicator = len(new_numbers) / max(len(response_numbers), 1)
170        
171        return 1.0 - min(hallucination_indicator, 1.0)
172    
173    def _calculate_avg_sentence_length(self, sentences: List[str]) -> float:
174        """計算平均句子長度"""
175        if not sentences:
176            return 0
177        return sum(len(s.split()) for s in sentences) / len(sentences)
178    
179    def _calculate_repetition_rate(self, text: str) -> float:
180        """計算重複率"""
181        words = text.lower().split()
182        if len(words) <= 1:
183            return 0
184        
185        unique_words = len(set(words))
186        return 1.0 - (unique_words / len(words))
187    
188    def _count_grammar_errors(self, text: str) -> int:
189        """計算語法錯誤(簡化實作)"""
190        # 實際環境中應使用 LanguageTool 等工具
191        error_patterns = [
192            r'\b(a)\s+[aeiou]',  # 錯誤的不定冠詞
193            r'\b(an)\s+[bcdfg-hj-np-tv-z]',  # 錯誤的不定冠詞
194        ]
195        
196        errors = 0
197        for pattern in error_patterns:
198            errors += len(re.findall(pattern, text, re.IGNORECASE))
199        
200        return errors
201    
202    def _contains_pii(self, text: str) -> bool:
203        """檢測個人識別資訊"""
204        import re
205        
206        # 簡化的 PII 檢測模式
207        pii_patterns = [
208            r'\b\d{4}-\d{4}-\d{4}-\d{4}\b',  # 信用卡號
209            r'\b\d{3}-\d{2}-\d{4}\b',  # SSN
210            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'  # 電子郵件
211        ]
212        
213        for pattern in pii_patterns:
214            if re.search(pattern, text):
215                return True
216        
217        return False
218    
219    def _extract_facts(self, text: str) -> List[str]:
220        """提取文本中的事實(簡化實作)"""
221        # 實際環境中應使用 NER 和關係抽取
222        import re
223        
224        facts = []
225        
226        # 提取數字事實
227        numbers = re.findall(r'\d+(?:\.\d+)?', text)
228        facts.extend([f"number_{num}" for num in numbers])
229        
230        # 提取日期事實
231        dates = re.findall(r'\b\d{4}-\d{2}-\d{2}\b|\b\d{1,2}/\d{1,2}/\d{4}\b', text)
232        facts.extend([f"date_{date}" for date in dates])
233        
234        return facts

2. 分散式監控架構

可觀測性基礎設施

OpenTelemetry + Prometheus 整合:

  1from opentelemetry import trace, metrics
  2from opentelemetry.exporter.prometheus import PrometheusMetricReader
  3from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
  4from opentelemetry.instrumentation.requests import RequestsInstrumentor
  5from prometheus_client import Counter, Histogram, Gauge
  6import asyncio
  7import logging
  8
  9class AISystemObservability:
 10    def __init__(self, service_name: str):
 11        self.service_name = service_name
 12        
 13        # 設定追蹤
 14        self.tracer = trace.get_tracer(__name__)
 15        
 16        # 自定義指標
 17        self.setup_custom_metrics()
 18        
 19        # 設定結構化日誌
 20        self.setup_logging()
 21        
 22    def setup_custom_metrics(self):
 23        """設定自定義指標"""
 24        
 25        # 計數器指標
 26        self.request_counter = Counter(
 27            'ai_requests_total',
 28            'Total AI model requests',
 29            ['model_name', 'endpoint', 'status']
 30        )
 31        
 32        self.token_counter = Counter(
 33            'ai_tokens_total',
 34            'Total tokens processed',
 35            ['model_name', 'token_type']  # input/output
 36        )
 37        
 38        # 直方圖指標
 39        self.latency_histogram = Histogram(
 40            'ai_request_duration_seconds',
 41            'AI request latency in seconds',
 42            ['model_name', 'endpoint'],
 43            buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
 44        )
 45        
 46        self.ttft_histogram = Histogram(
 47            'ai_time_to_first_token_seconds',
 48            'Time to first token in seconds',
 49            ['model_name'],
 50            buckets=[0.01, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0]
 51        )
 52        
 53        # 量表指標
 54        self.active_requests = Gauge(
 55            'ai_active_requests',
 56            'Number of active AI requests',
 57            ['model_name']
 58        )
 59        
 60        self.model_quality_score = Gauge(
 61            'ai_model_quality_score',
 62            'Model quality score (0-1)',
 63            ['model_name', 'metric_type']
 64        )
 65        
 66        self.gpu_utilization = Gauge(
 67            'ai_gpu_utilization_percent',
 68            'GPU utilization percentage',
 69            ['gpu_id', 'model_name']
 70        )
 71    
 72    def setup_logging(self):
 73        """設定結構化日誌"""
 74        
 75        self.logger = logging.getLogger(self.service_name)
 76        self.logger.setLevel(logging.INFO)
 77        
 78        # JSON 格式處理器
 79        formatter = logging.Formatter(
 80            '{"timestamp": "%(asctime)s", "service": "%(name)s", '
 81            '"level": "%(levelname)s", "message": "%(message)s", '
 82            '"trace_id": "%(trace_id)s", "span_id": "%(span_id)s"}'
 83        )
 84        
 85        handler = logging.StreamHandler()
 86        handler.setFormatter(formatter)
 87        self.logger.addHandler(handler)
 88    
 89    def track_ai_request(self, model_name: str, endpoint: str, func):
 90        """AI 請求追蹤裝飾器"""
 91        
 92        def decorator(*args, **kwargs):
 93            # 增加活躍請求計數
 94            self.active_requests.labels(model_name=model_name).inc()
 95            
 96            with self.tracer.start_as_current_span(
 97                f"{self.service_name}.{endpoint}",
 98                attributes={
 99                    "ai.model.name": model_name,
100                    "ai.endpoint": endpoint,
101                    "ai.service": self.service_name
102                }
103            ) as span:
104                start_time = time.time()
105                
106                try:
107                    # 執行實際函數
108                    result = func(*args, **kwargs)
109                    
110                    # 記錄成功指標
111                    duration = time.time() - start_time
112                    
113                    self.request_counter.labels(
114                        model_name=model_name,
115                        endpoint=endpoint,
116                        status='success'
117                    ).inc()
118                    
119                    self.latency_histogram.labels(
120                        model_name=model_name,
121                        endpoint=endpoint
122                    ).observe(duration)
123                    
124                    # 添加追蹤屬性
125                    span.set_attributes({
126                        "ai.response.success": True,
127                        "ai.response.duration": duration,
128                        "ai.tokens.input": result.get('input_tokens', 0),
129                        "ai.tokens.output": result.get('output_tokens', 0)
130                    })
131                    
132                    # 記錄 token 使用
133                    if 'input_tokens' in result:
134                        self.token_counter.labels(
135                            model_name=model_name,
136                            token_type='input'
137                        ).inc(result['input_tokens'])
138                    
139                    if 'output_tokens' in result:
140                        self.token_counter.labels(
141                            model_name=model_name,
142                            token_type='output'
143                        ).inc(result['output_tokens'])
144                    
145                    return result
146                    
147                except Exception as e:
148                    # 記錄錯誤指標
149                    self.request_counter.labels(
150                        model_name=model_name,
151                        endpoint=endpoint,
152                        status='error'
153                    ).inc()
154                    
155                    span.set_attributes({
156                        "ai.response.success": False,
157                        "ai.error.type": type(e).__name__,
158                        "ai.error.message": str(e)
159                    })
160                    
161                    self.logger.error(
162                        f"AI request failed: {e}",
163                        extra={
164                            "trace_id": span.get_span_context().trace_id,
165                            "span_id": span.get_span_context().span_id,
166                            "model_name": model_name,
167                            "endpoint": endpoint
168                        }
169                    )
170                    
171                    raise
172                
173                finally:
174                    # 減少活躍請求計數
175                    self.active_requests.labels(model_name=model_name).dec()
176        
177        return decorator
178    
179    def update_quality_metrics(self, model_name: str, quality_scores: Dict):
180        """更新品質指標"""
181        
182        for metric_type, score in quality_scores.items():
183            self.model_quality_score.labels(
184                model_name=model_name,
185                metric_type=metric_type
186            ).set(score)
187    
188    def update_gpu_metrics(self, gpu_metrics: Dict):
189        """更新 GPU 指標"""
190        
191        for gpu_id, metrics in gpu_metrics.items():
192            self.gpu_utilization.labels(
193                gpu_id=gpu_id,
194                model_name=metrics.get('model_name', 'unknown')
195            ).set(metrics['utilization_percent'])
196
197# 使用範例
198observability = AISystemObservability("ai-inference-service")
199
200@observability.track_ai_request("gpt-4", "chat_completion")
201def generate_response(prompt: str) -> Dict:
202    # 模擬 AI 推理
203    time.sleep(1.2)
204    
205    return {
206        'response': 'Generated response',
207        'input_tokens': 50,
208        'output_tokens': 100,
209        'model_version': '1.0.0'
210    }

分散式追蹤與日誌聚合

Jaeger + ELK Stack 整合:

  1from opentelemetry.exporter.jaeger.thrift import JaegerExporter
  2from opentelemetry.sdk.trace import TracerProvider
  3from opentelemetry.sdk.trace.export import BatchSpanProcessor
  4import elasticsearch
  5from datetime import datetime
  6import json
  7
  8class DistributedTraceManager:
  9    def __init__(self, jaeger_endpoint: str, elasticsearch_host: str):
 10        self.setup_jaeger_tracing(jaeger_endpoint)
 11        self.setup_elasticsearch_logging(elasticsearch_host)
 12        
 13    def setup_jaeger_tracing(self, endpoint: str):
 14        """設定 Jaeger 分散式追蹤"""
 15        
 16        # 配置 Jaeger 導出器
 17        jaeger_exporter = JaegerExporter(
 18            agent_host_name="localhost",
 19            agent_port=14268,
 20            collector_endpoint=endpoint,
 21        )
 22        
 23        # 設定追蹤提供者
 24        trace.set_tracer_provider(TracerProvider())
 25        tracer = trace.get_tracer(__name__)
 26        
 27        # 添加批次處理器
 28        span_processor = BatchSpanProcessor(jaeger_exporter)
 29        trace.get_tracer_provider().add_span_processor(span_processor)
 30        
 31        self.tracer = tracer
 32    
 33    def setup_elasticsearch_logging(self, host: str):
 34        """設定 Elasticsearch 日誌聚合"""
 35        
 36        self.es_client = elasticsearch.Elasticsearch([host])
 37        
 38        # 創建索引映射
 39        index_mapping = {
 40            "mappings": {
 41                "properties": {
 42                    "timestamp": {"type": "date"},
 43                    "service_name": {"type": "keyword"},
 44                    "trace_id": {"type": "keyword"},
 45                    "span_id": {"type": "keyword"},
 46                    "level": {"type": "keyword"},
 47                    "message": {"type": "text"},
 48                    "model_name": {"type": "keyword"},
 49                    "endpoint": {"type": "keyword"},
 50                    "duration": {"type": "float"},
 51                    "error_type": {"type": "keyword"},
 52                    "user_id": {"type": "keyword"},
 53                    "request_size": {"type": "integer"},
 54                    "response_size": {"type": "integer"}
 55                }
 56            }
 57        }
 58        
 59        index_name = f"ai-logs-{datetime.now().strftime('%Y-%m')}"
 60        
 61        try:
 62            if not self.es_client.indices.exists(index=index_name):
 63                self.es_client.indices.create(index=index_name, body=index_mapping)
 64        except Exception as e:
 65            print(f"Failed to create Elasticsearch index: {e}")
 66    
 67    def log_ai_operation(self, operation_type: str, details: Dict):
 68        """記錄 AI 操作到 Elasticsearch"""
 69        
 70        current_span = trace.get_current_span()
 71        span_context = current_span.get_span_context()
 72        
 73        log_entry = {
 74            "timestamp": datetime.utcnow().isoformat(),
 75            "service_name": "ai-service",
 76            "trace_id": format(span_context.trace_id, '032x'),
 77            "span_id": format(span_context.span_id, '016x'),
 78            "operation_type": operation_type,
 79            **details
 80        }
 81        
 82        index_name = f"ai-logs-{datetime.now().strftime('%Y-%m')}"
 83        
 84        try:
 85            self.es_client.index(index=index_name, body=log_entry)
 86        except Exception as e:
 87            print(f"Failed to log to Elasticsearch: {e}")
 88    
 89    def create_distributed_span(self, operation_name: str, model_name: str):
 90        """創建分散式追蹤 span"""
 91        
 92        return self.tracer.start_as_current_span(
 93            operation_name,
 94            attributes={
 95                "ai.model.name": model_name,
 96                "ai.operation": operation_name,
 97                "service.name": "ai-inference"
 98            }
 99        )
100    
101    def search_logs(self, query: Dict, time_range: Optional[Dict] = None) -> List[Dict]:
102        """搜尋日誌"""
103        
104        search_body = {
105            "query": {
106                "bool": {
107                    "must": [query]
108                }
109            },
110            "sort": [{"timestamp": {"order": "desc"}}],
111            "size": 100
112        }
113        
114        if time_range:
115            search_body["query"]["bool"]["must"].append({
116                "range": {
117                    "timestamp": time_range
118                }
119            })
120        
121        try:
122            response = self.es_client.search(
123                index="ai-logs-*",
124                body=search_body
125            )
126            
127            return [hit["_source"] for hit in response["hits"]["hits"]]
128        except Exception as e:
129            print(f"Failed to search logs: {e}")
130            return []
131    
132    def analyze_error_patterns(self, time_window: str = "1h") -> Dict:
133        """分析錯誤模式"""
134        
135        aggregation_query = {
136            "size": 0,
137            "query": {
138                "bool": {
139                    "must": [
140                        {"exists": {"field": "error_type"}},
141                        {"range": {"timestamp": {"gte": f"now-{time_window}"}}}
142                    ]
143                }
144            },
145            "aggs": {
146                "error_types": {
147                    "terms": {"field": "error_type"},
148                    "aggs": {
149                        "models": {
150                            "terms": {"field": "model_name"}
151                        },
152                        "endpoints": {
153                            "terms": {"field": "endpoint"}
154                        }
155                    }
156                },
157                "error_timeline": {
158                    "date_histogram": {
159                        "field": "timestamp",
160                        "interval": "5m"
161                    }
162                }
163            }
164        }
165        
166        try:
167            response = self.es_client.search(
168                index="ai-logs-*",
169                body=aggregation_query
170            )
171            
172            return {
173                "error_types": response["aggregations"]["error_types"]["buckets"],
174                "timeline": response["aggregations"]["error_timeline"]["buckets"]
175            }
176        except Exception as e:
177            print(f"Failed to analyze error patterns: {e}")
178            return {}

3. 智能故障診斷與自動恢復

故障檢測系統

多層次異常檢測:

  1import numpy as np
  2from scipy import stats
  3from sklearn.ensemble import IsolationForest
  4from typing import Dict, List, Tuple
  5from collections import defaultdict, deque
  6import time
  7
  8class IntelligentFaultDetector:
  9    def __init__(self):
 10        self.metric_history = defaultdict(lambda: deque(maxlen=1000))
 11        self.anomaly_detectors = {}
 12        self.alert_rules = []
 13        self.incident_tracker = {}
 14        
 15    def register_metric_stream(self, metric_name: str, detection_method: str = "statistical"):
 16        """註冊指標流監控"""
 17        
 18        if detection_method == "statistical":
 19            self.anomaly_detectors[metric_name] = StatisticalAnomalyDetector()
 20        elif detection_method == "isolation_forest":
 21            self.anomaly_detectors[metric_name] = MLAnomalyDetector()
 22        elif detection_method == "threshold":
 23            self.anomaly_detectors[metric_name] = ThresholdAnomalyDetector()
 24    
 25    def add_alert_rule(self, rule: Dict):
 26        """添加告警規則"""
 27        self.alert_rules.append(rule)
 28    
 29    def process_metric(self, metric_name: str, value: float, timestamp: float, 
 30                      context: Dict = None) -> List[Dict]:
 31        """處理指標並檢測異常"""
 32        
 33        # 記錄歷史數據
 34        self.metric_history[metric_name].append({
 35            'value': value,
 36            'timestamp': timestamp,
 37            'context': context or {}
 38        })
 39        
 40        alerts = []
 41        
 42        # 執行異常檢測
 43        if metric_name in self.anomaly_detectors:
 44            detector = self.anomaly_detectors[metric_name]
 45            
 46            # 獲取歷史數據
 47            historical_values = [
 48                point['value'] for point in self.metric_history[metric_name]
 49            ]
 50            
 51            is_anomaly, anomaly_score, explanation = detector.detect_anomaly(
 52                current_value=value,
 53                historical_values=historical_values[-100:],  # 最近 100 個點
 54                context=context
 55            )
 56            
 57            if is_anomaly:
 58                alert = self._create_anomaly_alert(
 59                    metric_name=metric_name,
 60                    value=value,
 61                    timestamp=timestamp,
 62                    anomaly_score=anomaly_score,
 63                    explanation=explanation,
 64                    context=context
 65                )
 66                alerts.append(alert)
 67        
 68        # 檢查規則告警
 69        rule_alerts = self._check_alert_rules(metric_name, value, timestamp, context)
 70        alerts.extend(rule_alerts)
 71        
 72        # 更新事件追蹤
 73        self._update_incident_tracking(alerts)
 74        
 75        return alerts
 76    
 77    def _create_anomaly_alert(self, metric_name: str, value: float, timestamp: float,
 78                            anomaly_score: float, explanation: str, context: Dict) -> Dict:
 79        """創建異常告警"""
 80        
 81        severity = self._calculate_severity(anomaly_score, context)
 82        
 83        return {
 84            'type': 'anomaly',
 85            'metric_name': metric_name,
 86            'current_value': value,
 87            'anomaly_score': anomaly_score,
 88            'severity': severity,
 89            'timestamp': timestamp,
 90            'explanation': explanation,
 91            'context': context,
 92            'alert_id': f"anomaly_{metric_name}_{int(timestamp)}"
 93        }
 94    
 95    def _check_alert_rules(self, metric_name: str, value: float, 
 96                          timestamp: float, context: Dict) -> List[Dict]:
 97        """檢查告警規則"""
 98        
 99        alerts = []
100        
101        for rule in self.alert_rules:
102            if rule['metric'] == metric_name:
103                if self._evaluate_rule_condition(rule, value, context):
104                    alert = {
105                        'type': 'rule',
106                        'rule_name': rule['name'],
107                        'metric_name': metric_name,
108                        'current_value': value,
109                        'threshold': rule['threshold'],
110                        'severity': rule['severity'],
111                        'timestamp': timestamp,
112                        'message': rule['message'].format(value=value),
113                        'context': context,
114                        'alert_id': f"rule_{rule['name']}_{int(timestamp)}"
115                    }
116                    alerts.append(alert)
117        
118        return alerts
119    
120    def _evaluate_rule_condition(self, rule: Dict, value: float, context: Dict) -> bool:
121        """評估規則條件"""
122        
123        condition_type = rule['condition']
124        threshold = rule['threshold']
125        
126        if condition_type == 'greater_than':
127            return value > threshold
128        elif condition_type == 'less_than':
129            return value < threshold
130        elif condition_type == 'equals':
131            return abs(value - threshold) < 0.001
132        elif condition_type == 'not_equals':
133            return abs(value - threshold) >= 0.001
134        elif condition_type == 'percentage_increase':
135            # 需要歷史數據比較
136            historical_avg = self._get_historical_average(rule['metric'], window=rule.get('window', 300))
137            if historical_avg:
138                increase = (value - historical_avg) / historical_avg
139                return increase > threshold
140        
141        return False
142    
143    def _calculate_severity(self, anomaly_score: float, context: Dict) -> str:
144        """計算告警嚴重程度"""
145        
146        # 基於異常分數
147        if anomaly_score > 0.9:
148            base_severity = 'critical'
149        elif anomaly_score > 0.7:
150            base_severity = 'high'
151        elif anomaly_score > 0.5:
152            base_severity = 'medium'
153        else:
154            base_severity = 'low'
155        
156        # 考慮業務影響
157        if context.get('business_impact') == 'high':
158            if base_severity in ['medium', 'low']:
159                base_severity = 'high'
160        
161        return base_severity
162    
163    def _update_incident_tracking(self, alerts: List[Dict]):
164        """更新事件追蹤"""
165        
166        for alert in alerts:
167            alert_id = alert['alert_id']
168            
169            if alert_id not in self.incident_tracker:
170                self.incident_tracker[alert_id] = {
171                    'first_occurrence': alert['timestamp'],
172                    'last_occurrence': alert['timestamp'],
173                    'count': 1,
174                    'status': 'open',
175                    'alert_data': alert
176                }
177            else:
178                self.incident_tracker[alert_id]['last_occurrence'] = alert['timestamp']
179                self.incident_tracker[alert_id]['count'] += 1
180    
181    def get_active_incidents(self) -> List[Dict]:
182        """獲取活躍事件"""
183        
184        active_incidents = []
185        current_time = time.time()
186        
187        for incident_id, incident in self.incident_tracker.items():
188            if (incident['status'] == 'open' and 
189                current_time - incident['last_occurrence'] < 3600):  # 1小時內
190                active_incidents.append(incident)
191        
192        return active_incidents
193    
194    def resolve_incident(self, incident_id: str, resolution_note: str = ""):
195        """解決事件"""
196        
197        if incident_id in self.incident_tracker:
198            self.incident_tracker[incident_id]['status'] = 'resolved'
199            self.incident_tracker[incident_id]['resolution_time'] = time.time()
200            self.incident_tracker[incident_id]['resolution_note'] = resolution_note
201
202class StatisticalAnomalyDetector:
203    def __init__(self, zscore_threshold: float = 3.0):
204        self.zscore_threshold = zscore_threshold
205    
206    def detect_anomaly(self, current_value: float, historical_values: List[float], 
207                      context: Dict = None) -> Tuple[bool, float, str]:
208        """統計異常檢測"""
209        
210        if len(historical_values) < 10:
211            return False, 0.0, "Insufficient data for statistical analysis"
212        
213        # 計算 Z-score
214        mean = np.mean(historical_values)
215        std = np.std(historical_values)
216        
217        if std == 0:
218            return False, 0.0, "No variation in historical data"
219        
220        zscore = abs(current_value - mean) / std
221        is_anomaly = zscore > self.zscore_threshold
222        
223        # 正規化異常分數
224        anomaly_score = min(zscore / (self.zscore_threshold * 2), 1.0)
225        
226        explanation = f"Z-score: {zscore:.2f}, Mean: {mean:.2f}, Std: {std:.2f}"
227        
228        return is_anomaly, anomaly_score, explanation
229
230class MLAnomalyDetector:
231    def __init__(self):
232        self.isolation_forest = IsolationForest(contamination=0.1, random_state=42)
233        self.trained = False
234    
235    def detect_anomaly(self, current_value: float, historical_values: List[float], 
236                      context: Dict = None) -> Tuple[bool, float, str]:
237        """機器學習異常檢測"""
238        
239        if len(historical_values) < 50:
240            return False, 0.0, "Insufficient data for ML analysis"
241        
242        # 準備訓練數據
243        X = np.array(historical_values).reshape(-1, 1)
244        
245        # 訓練模型(如果尚未訓練或需要更新)
246        if not self.trained or len(historical_values) % 100 == 0:
247            self.isolation_forest.fit(X)
248            self.trained = True
249        
250        # 檢測當前值
251        current_X = np.array([[current_value]])
252        anomaly_score = self.isolation_forest.decision_function(current_X)[0]
253        prediction = self.isolation_forest.predict(current_X)[0]
254        
255        is_anomaly = prediction == -1
256        
257        # 轉換異常分數到 0-1 範圍
258        normalized_score = 1.0 / (1.0 + np.exp(anomaly_score))
259        
260        explanation = f"Isolation Forest score: {anomaly_score:.3f}"
261        
262        return is_anomaly, normalized_score, explanation
263
264class ThresholdAnomalyDetector:
265    def __init__(self, upper_threshold: Optional[float] = None, 
266                 lower_threshold: Optional[float] = None):
267        self.upper_threshold = upper_threshold
268        self.lower_threshold = lower_threshold
269    
270    def detect_anomaly(self, current_value: float, historical_values: List[float], 
271                      context: Dict = None) -> Tuple[bool, float, str]:
272        """閾值異常檢測"""
273        
274        is_anomaly = False
275        explanation_parts = []
276        
277        if self.upper_threshold is not None and current_value > self.upper_threshold:
278            is_anomaly = True
279            explanation_parts.append(f"Above upper threshold: {current_value} > {self.upper_threshold}")
280        
281        if self.lower_threshold is not None and current_value < self.lower_threshold:
282            is_anomaly = True
283            explanation_parts.append(f"Below lower threshold: {current_value} < {self.lower_threshold}")
284        
285        # 計算與閾值的距離作為異常分數
286        anomaly_score = 0.0
287        if is_anomaly:
288            if self.upper_threshold is not None and current_value > self.upper_threshold:
289                anomaly_score = min((current_value - self.upper_threshold) / self.upper_threshold, 1.0)
290            elif self.lower_threshold is not None and current_value < self.lower_threshold:
291                anomaly_score = min((self.lower_threshold - current_value) / abs(self.lower_threshold), 1.0)
292        
293        explanation = "; ".join(explanation_parts) if explanation_parts else "Within thresholds"
294        
295        return is_anomaly, anomaly_score, explanation

自動恢復機制

智能自愈系統:

  1import asyncio
  2from abc import ABC, abstractmethod
  3from enum import Enum
  4from typing import Dict, List, Callable
  5
  6class RecoveryAction(Enum):
  7    RESTART_SERVICE = "restart_service"
  8    SCALE_UP = "scale_up"
  9    SCALE_DOWN = "scale_down"
 10    SWITCH_MODEL = "switch_model"
 11    CLEAR_CACHE = "clear_cache"
 12    ROLLBACK_DEPLOYMENT = "rollback_deployment"
 13    CIRCUIT_BREAK = "circuit_break"
 14
 15class AutoRecoverySystem:
 16    def __init__(self):
 17        self.recovery_strategies = {}
 18        self.execution_history = deque(maxlen=100)
 19        self.circuit_breakers = {}
 20        self.recovery_locks = {}
 21        
 22    def register_recovery_strategy(self, failure_pattern: str, strategy: 'RecoveryStrategy'):
 23        """註冊恢復策略"""
 24        self.recovery_strategies[failure_pattern] = strategy
 25    
 26    async def handle_failure(self, failure_context: Dict) -> Dict:
 27        """處理故障"""
 28        
 29        failure_type = self._classify_failure(failure_context)
 30        
 31        # 檢查是否有匹配的恢復策略
 32        strategy = self._match_recovery_strategy(failure_type, failure_context)
 33        
 34        if not strategy:
 35            return {
 36                'success': False,
 37                'reason': 'No matching recovery strategy found',
 38                'failure_type': failure_type
 39            }
 40        
 41        # 檢查恢復鎖避免並發執行同類型恢復
 42        lock_key = f"{failure_type}_{failure_context.get('service_name', 'unknown')}"
 43        
 44        if lock_key in self.recovery_locks:
 45            return {
 46                'success': False,
 47                'reason': 'Recovery already in progress',
 48                'failure_type': failure_type
 49            }
 50        
 51        # 設定恢復鎖
 52        self.recovery_locks[lock_key] = True
 53        
 54        try:
 55            # 執行恢復策略
 56            recovery_result = await strategy.execute(failure_context)
 57            
 58            # 記錄執行歷史
 59            self._record_recovery_execution(failure_context, strategy, recovery_result)
 60            
 61            return recovery_result
 62            
 63        except Exception as e:
 64            return {
 65                'success': False,
 66                'reason': f'Recovery execution failed: {str(e)}',
 67                'failure_type': failure_type
 68            }
 69        
 70        finally:
 71            # 移除恢復鎖
 72            self.recovery_locks.pop(lock_key, None)
 73    
 74    def _classify_failure(self, failure_context: Dict) -> str:
 75        """分類故障類型"""
 76        
 77        # 基於指標和上下文分類故障
 78        if failure_context.get('metric_name') == 'latency' and failure_context.get('current_value', 0) > 10:
 79            return 'high_latency'
 80        elif failure_context.get('metric_name') == 'error_rate' and failure_context.get('current_value', 0) > 0.1:
 81            return 'high_error_rate'
 82        elif failure_context.get('metric_name') == 'memory_usage' and failure_context.get('current_value', 0) > 0.9:
 83            return 'memory_exhaustion'
 84        elif failure_context.get('metric_name') == 'cpu_usage' and failure_context.get('current_value', 0) > 0.9:
 85            return 'cpu_exhaustion'
 86        elif 'connection' in str(failure_context.get('error_message', '')).lower():
 87            return 'connection_failure'
 88        elif 'timeout' in str(failure_context.get('error_message', '')).lower():
 89            return 'timeout_failure'
 90        else:
 91            return 'unknown_failure'
 92    
 93    def _match_recovery_strategy(self, failure_type: str, failure_context: Dict) -> Optional['RecoveryStrategy']:
 94        """匹配恢復策略"""
 95        
 96        # 直接匹配
 97        if failure_type in self.recovery_strategies:
 98            return self.recovery_strategies[failure_type]
 99        
100        # 模式匹配
101        for pattern, strategy in self.recovery_strategies.items():
102            if pattern in failure_type or failure_type in pattern:
103                return strategy
104        
105        return None
106    
107    def _record_recovery_execution(self, failure_context: Dict, strategy: 'RecoveryStrategy', result: Dict):
108        """記錄恢復執行歷史"""
109        
110        execution_record = {
111            'timestamp': time.time(),
112            'failure_context': failure_context,
113            'strategy_name': strategy.__class__.__name__,
114            'result': result,
115            'success': result.get('success', False)
116        }
117        
118        self.execution_history.append(execution_record)
119
120class RecoveryStrategy(ABC):
121    def __init__(self, name: str, max_retries: int = 3, retry_delay: float = 5.0):
122        self.name = name
123        self.max_retries = max_retries
124        self.retry_delay = retry_delay
125    
126    @abstractmethod
127    async def execute(self, failure_context: Dict) -> Dict:
128        """執行恢復動作"""
129        pass
130    
131    async def _retry_with_backoff(self, action: Callable, max_retries: int = None) -> bool:
132        """帶退避的重試機制"""
133        
134        retries = max_retries or self.max_retries
135        
136        for attempt in range(retries):
137            try:
138                await action()
139                return True
140            except Exception as e:
141                if attempt < retries - 1:
142                    wait_time = self.retry_delay * (2 ** attempt)  # 指數退避
143                    await asyncio.sleep(wait_time)
144                else:
145                    raise e
146        
147        return False
148
149class ServiceRestartStrategy(RecoveryStrategy):
150    def __init__(self, service_manager):
151        super().__init__("ServiceRestart")
152        self.service_manager = service_manager
153    
154    async def execute(self, failure_context: Dict) -> Dict:
155        """重啟服務恢復策略"""
156        
157        service_name = failure_context.get('service_name')
158        
159        if not service_name:
160            return {
161                'success': False,
162                'reason': 'Service name not provided in failure context'
163            }
164        
165        try:
166            # 停止服務
167            await self.service_manager.stop_service(service_name)
168            
169            # 等待一段時間
170            await asyncio.sleep(2)
171            
172            # 重新啟動服務
173            await self.service_manager.start_service(service_name)
174            
175            # 驗證服務健康狀態
176            health_check_passed = await self._retry_with_backoff(
177                lambda: self.service_manager.check_service_health(service_name)
178            )
179            
180            if health_check_passed:
181                return {
182                    'success': True,
183                    'action': RecoveryAction.RESTART_SERVICE.value,
184                    'service_name': service_name,
185                    'message': f'Successfully restarted service {service_name}'
186                }
187            else:
188                return {
189                    'success': False,
190                    'reason': f'Service {service_name} restart failed health check'
191                }
192                
193        except Exception as e:
194            return {
195                'success': False,
196                'reason': f'Failed to restart service {service_name}: {str(e)}'
197            }
198
199class AutoScalingStrategy(RecoveryStrategy):
200    def __init__(self, scaling_manager):
201        super().__init__("AutoScaling")
202        self.scaling_manager = scaling_manager
203    
204    async def execute(self, failure_context: Dict) -> Dict:
205        """自動擴展恢復策略"""
206        
207        service_name = failure_context.get('service_name')
208        metric_name = failure_context.get('metric_name')
209        current_value = failure_context.get('current_value', 0)
210        
211        # 決定擴展方向
212        if metric_name in ['cpu_usage', 'memory_usage', 'latency'] and current_value > 0.8:
213            scale_direction = 'up'
214            scale_factor = 1.5
215        elif metric_name == 'request_rate' and current_value > 1000:
216            scale_direction = 'up'
217            scale_factor = 2.0
218        else:
219            return {
220                'success': False,
221                'reason': 'No scaling decision could be made based on metrics'
222            }
223        
224        try:
225            if scale_direction == 'up':
226                result = await self.scaling_manager.scale_up(service_name, scale_factor)
227                action = RecoveryAction.SCALE_UP
228            else:
229                result = await self.scaling_manager.scale_down(service_name, scale_factor)
230                action = RecoveryAction.SCALE_DOWN
231            
232            return {
233                'success': True,
234                'action': action.value,
235                'service_name': service_name,
236                'scale_factor': scale_factor,
237                'new_instance_count': result.get('new_instance_count'),
238                'message': f'Successfully scaled {scale_direction} service {service_name}'
239            }
240            
241        except Exception as e:
242            return {
243                'success': False,
244                'reason': f'Failed to scale service {service_name}: {str(e)}'
245            }
246
247class ModelSwitchStrategy(RecoveryStrategy):
248    def __init__(self, model_manager):
249        super().__init__("ModelSwitch")
250        self.model_manager = model_manager
251    
252    async def execute(self, failure_context: Dict) -> Dict:
253        """模型切換恢復策略"""
254        
255        current_model = failure_context.get('model_name')
256        service_name = failure_context.get('service_name')
257        
258        if not current_model:
259            return {
260                'success': False,
261                'reason': 'Current model name not provided in failure context'
262            }
263        
264        # 獲取備用模型
265        fallback_models = await self.model_manager.get_fallback_models(current_model)
266        
267        if not fallback_models:
268            return {
269                'success': False,
270                'reason': f'No fallback models available for {current_model}'
271            }
272        
273        # 嘗試切換到每個備用模型
274        for fallback_model in fallback_models:
275            try:
276                # 執行模型切換
277                await self.model_manager.switch_model(service_name, fallback_model)
278                
279                # 驗證新模型健康狀態
280                health_check_passed = await self._retry_with_backoff(
281                    lambda: self.model_manager.check_model_health(fallback_model)
282                )
283                
284                if health_check_passed:
285                    return {
286                        'success': True,
287                        'action': RecoveryAction.SWITCH_MODEL.value,
288                        'original_model': current_model,
289                        'fallback_model': fallback_model,
290                        'service_name': service_name,
291                        'message': f'Successfully switched from {current_model} to {fallback_model}'
292                    }
293                    
294            except Exception as e:
295                continue  # 嘗試下一個備用模型
296        
297        return {
298            'success': False,
299            'reason': f'Failed to switch to any fallback model for {current_model}'
300        }
301
302# 使用範例
303async def setup_auto_recovery():
304    recovery_system = AutoRecoverySystem()
305    
306    # 註冊恢復策略
307    recovery_system.register_recovery_strategy(
308        'high_latency',
309        AutoScalingStrategy(scaling_manager)
310    )
311    
312    recovery_system.register_recovery_strategy(
313        'high_error_rate',
314        ModelSwitchStrategy(model_manager)
315    )
316    
317    recovery_system.register_recovery_strategy(
318        'connection_failure',
319        ServiceRestartStrategy(service_manager)
320    )
321    
322    return recovery_system

4. 成本最佳化與資源管理

智能成本控制

動態資源分配與成本監控:

  1import asyncio
  2from dataclasses import dataclass
  3from typing import Dict, List, Optional, Tuple
  4from enum import Enum
  5import numpy as np
  6from datetime import datetime, timedelta
  7
  8class ResourceType(Enum):
  9    GPU = "gpu"
 10    CPU = "cpu"
 11    MEMORY = "memory"
 12    STORAGE = "storage"
 13    NETWORK = "network"
 14
 15@dataclass
 16class ResourceUsage:
 17    resource_type: ResourceType
 18    used: float
 19    total: float
 20    cost_per_unit: float
 21    timestamp: datetime
 22    model_name: str
 23    service_name: str
 24
 25@dataclass
 26class CostOptimizationRecommendation:
 27    action_type: str  # scale_down, switch_instance_type, schedule_shutdown
 28    estimated_savings: float  # 每月預估節省金額
 29    impact_assessment: str  # 對效能的影響評估
 30    confidence_score: float  # 建議信心分數
 31    implementation_details: Dict
 32
 33class IntelligentCostOptimizer:
 34    def __init__(self):
 35        self.resource_usage_history = {}
 36        self.cost_models = {}
 37        self.optimization_rules = []
 38        self.budget_limits = {}
 39        
 40    def register_cost_model(self, resource_type: ResourceType, cost_function: Callable):
 41        """註冊成本計算模型"""
 42        self.cost_models[resource_type] = cost_function
 43    
 44    def set_budget_limit(self, service_name: str, monthly_budget: float):
 45        """設定服務預算限制"""
 46        self.budget_limits[service_name] = monthly_budget
 47    
 48    def track_resource_usage(self, usage: ResourceUsage):
 49        """追蹤資源使用情況"""
 50        key = f"{usage.service_name}_{usage.model_name}_{usage.resource_type.value}"
 51        
 52        if key not in self.resource_usage_history:
 53            self.resource_usage_history[key] = deque(maxlen=1000)
 54        
 55        self.resource_usage_history[key].append(usage)
 56    
 57    def calculate_current_costs(self, time_window: timedelta = timedelta(hours=24)) -> Dict:
 58        """計算當前成本"""
 59        
 60        current_time = datetime.now()
 61        cutoff_time = current_time - time_window
 62        
 63        cost_breakdown = {}
 64        
 65        for key, usage_history in self.resource_usage_history.items():
 66            service_name, model_name, resource_type = key.split('_')
 67            
 68            # 篩選時間窗口內的使用數據
 69            recent_usage = [
 70                usage for usage in usage_history 
 71                if usage.timestamp >= cutoff_time
 72            ]
 73            
 74            if not recent_usage:
 75                continue
 76            
 77            # 計算平均使用量
 78            avg_usage = np.mean([usage.used for usage in recent_usage])
 79            cost_per_unit = recent_usage[-1].cost_per_unit
 80            
 81            # 計算時間窗口內的總成本
 82            hours_in_window = time_window.total_seconds() / 3600
 83            total_cost = avg_usage * cost_per_unit * hours_in_window
 84            
 85            # 組織成本分解
 86            if service_name not in cost_breakdown:
 87                cost_breakdown[service_name] = {}
 88            
 89            if model_name not in cost_breakdown[service_name]:
 90                cost_breakdown[service_name][model_name] = {}
 91            
 92            cost_breakdown[service_name][model_name][resource_type] = {
 93                'usage': avg_usage,
 94                'cost_per_unit': cost_per_unit,
 95                'total_cost': total_cost
 96            }
 97        
 98        return cost_breakdown
 99    
100    def analyze_cost_patterns(self, service_name: str) -> Dict:
101        """分析成本模式"""
102        
103        patterns = {
104            'hourly_distribution': self._analyze_hourly_cost_distribution(service_name),
105            'weekly_trend': self._analyze_weekly_cost_trend(service_name),
106            'resource_efficiency': self._analyze_resource_efficiency(service_name),
107            'cost_drivers': self._identify_cost_drivers(service_name)
108        }
109        
110        return patterns
111    
112    def generate_optimization_recommendations(self, service_name: str) -> List[CostOptimizationRecommendation]:
113        """生成成本最佳化建議"""
114        
115        recommendations = []
116        
117        # 分析資源使用模式
118        patterns = self.analyze_cost_patterns(service_name)
119        
120        # 1. 閒置資源檢測
121        idle_recommendations = self._detect_idle_resources(service_name, patterns)
122        recommendations.extend(idle_recommendations)
123        
124        # 2. 過度配置檢測
125        overprovisioning_recommendations = self._detect_overprovisioning(service_name, patterns)
126        recommendations.extend(overprovisioning_recommendations)
127        
128        # 3. 時間排程最佳化
129        scheduling_recommendations = self._optimize_scheduling(service_name, patterns)
130        recommendations.extend(scheduling_recommendations)
131        
132        # 4. 實例類型最佳化
133        instance_recommendations = self._optimize_instance_types(service_name, patterns)
134        recommendations.extend(instance_recommendations)
135        
136        # 按預估節省金額排序
137        recommendations.sort(key=lambda x: x.estimated_savings, reverse=True)
138        
139        return recommendations
140    
141    def _analyze_hourly_cost_distribution(self, service_name: str) -> Dict:
142        """分析每小時成本分佈"""
143        
144        hourly_costs = defaultdict(list)
145        
146        for key, usage_history in self.resource_usage_history.items():
147            if not key.startswith(service_name):
148                continue
149            
150            for usage in usage_history:
151                hour = usage.timestamp.hour
152                cost = usage.used * usage.cost_per_unit
153                hourly_costs[hour].append(cost)
154        
155        # 計算每小時平均成本
156        hourly_avg = {
157            hour: np.mean(costs) for hour, costs in hourly_costs.items()
158        }
159        
160        return {
161            'hourly_average': hourly_avg,
162            'peak_hours': sorted(hourly_avg.keys(), key=lambda h: hourly_avg[h], reverse=True)[:3],
163            'low_usage_hours': sorted(hourly_avg.keys(), key=lambda h: hourly_avg[h])[:3]
164        }
165    
166    def _detect_idle_resources(self, service_name: str, patterns: Dict) -> List[CostOptimizationRecommendation]:
167        """檢測閒置資源"""
168        
169        recommendations = []
170        
171        # 檢查低使用時段
172        low_usage_hours = patterns['hourly_distribution']['low_usage_hours']
173        
174        if len(low_usage_hours) >= 6:  # 如果有6小時或更多的低使用時段
175            # 計算潛在節省
176            current_hourly_cost = self._calculate_average_hourly_cost(service_name)
177            potential_savings = current_hourly_cost * len(low_usage_hours) * 30  # 每月
178            
179            recommendation = CostOptimizationRecommendation(
180                action_type="schedule_shutdown",
181                estimated_savings=potential_savings,
182                impact_assessment="Minimal impact during low-usage hours",
183                confidence_score=0.8,
184                implementation_details={
185                    "shutdown_hours": low_usage_hours,
186                    "automation_script": "schedule_auto_shutdown.py",
187                    "wake_up_trigger": "demand_based"
188                }
189            )
190            
191            recommendations.append(recommendation)
192        
193        return recommendations
194    
195    def _detect_overprovisioning(self, service_name: str, patterns: Dict) -> List[CostOptimizationRecommendation]:
196        """檢測過度配置"""
197        
198        recommendations = []
199        efficiency = patterns['resource_efficiency']
200        
201        for resource_type, efficiency_score in efficiency.items():
202            if efficiency_score < 0.6:  # 效率低於60%
203                
204                # 建議縮減配置
205                current_cost = self._get_current_resource_cost(service_name, resource_type)
206                potential_savings = current_cost * (1 - efficiency_score) * 0.8  # 保守估計
207                
208                recommendation = CostOptimizationRecommendation(
209                    action_type="scale_down",
210                    estimated_savings=potential_savings,
211                    impact_assessment=f"Low impact - {resource_type} utilization is only {efficiency_score*100:.1f}%",
212                    confidence_score=0.9,
213                    implementation_details={
214                        "resource_type": resource_type,
215                        "current_efficiency": efficiency_score,
216                        "recommended_reduction": f"{(1-efficiency_score)*50:.1f}%",
217                        "gradual_scaling": True
218                    }
219                )
220                
221                recommendations.append(recommendation)
222        
223        return recommendations
224    
225    def _optimize_instance_types(self, service_name: str, patterns: Dict) -> List[CostOptimizationRecommendation]:
226        """最佳化實例類型"""
227        
228        recommendations = []
229        
230        # 分析當前資源使用比例
231        resource_usage = patterns['resource_efficiency']
232        
233        gpu_usage = resource_usage.get('gpu', 0)
234        cpu_usage = resource_usage.get('cpu', 0)
235        memory_usage = resource_usage.get('memory', 0)
236        
237        # GPU 最佳化建議
238        if gpu_usage < 0.5 and gpu_usage > 0:  # GPU 使用率低但有使用
239            current_gpu_cost = self._get_current_resource_cost(service_name, 'gpu')
240            
241            # 建議使用較小的 GPU 實例
242            potential_savings = current_gpu_cost * 0.4  # 假設可節省40%
243            
244            recommendation = CostOptimizationRecommendation(
245                action_type="switch_instance_type",
246                estimated_savings=potential_savings,
247                impact_assessment="Moderate impact - may increase latency slightly",
248                confidence_score=0.7,
249                implementation_details={
250                    "current_gpu_usage": gpu_usage,
251                    "recommended_action": "Switch to smaller GPU instance",
252                    "suggested_instance": "g4dn.xlarge instead of g4dn.2xlarge",
253                    "rollback_plan": "Monitor latency and scale back if needed"
254                }
255            )
256            
257            recommendations.append(recommendation)
258        
259        # CPU/Memory 比例最佳化
260        if abs(cpu_usage - memory_usage) > 0.3:  # CPU 和記憶體使用率差異大
261            current_compute_cost = self._get_current_resource_cost(service_name, 'cpu')
262            potential_savings = current_compute_cost * 0.15
263            
264            if cpu_usage > memory_usage:
265                instance_type = "CPU-optimized"
266            else:
267                instance_type = "Memory-optimized"
268            
269            recommendation = CostOptimizationRecommendation(
270                action_type="switch_instance_type",
271                estimated_savings=potential_savings,
272                impact_assessment="Low impact - better resource alignment",
273                confidence_score=0.8,
274                implementation_details={
275                    "cpu_usage": cpu_usage,
276                    "memory_usage": memory_usage,
277                    "recommended_instance_type": instance_type,
278                    "migration_plan": "Blue-green deployment recommended"
279                }
280            )
281            
282            recommendations.append(recommendation)
283        
284        return recommendations
285    
286    async def implement_recommendation(self, recommendation: CostOptimizationRecommendation, 
287                                     service_name: str) -> Dict:
288        """實施最佳化建議"""
289        
290        implementation_result = {
291            'success': False,
292            'action_taken': recommendation.action_type,
293            'estimated_savings': recommendation.estimated_savings,
294            'actual_impact': None
295        }
296        
297        try:
298            if recommendation.action_type == "scale_down":
299                result = await self._execute_scale_down(service_name, recommendation.implementation_details)
300            elif recommendation.action_type == "schedule_shutdown":
301                result = await self._execute_scheduled_shutdown(service_name, recommendation.implementation_details)
302            elif recommendation.action_type == "switch_instance_type":
303                result = await self._execute_instance_switch(service_name, recommendation.implementation_details)
304            else:
305                result = {'success': False, 'reason': 'Unsupported action type'}
306            
307            implementation_result.update(result)
308            
309        except Exception as e:
310            implementation_result['error'] = str(e)
311        
312        return implementation_result
313    
314    def monitor_budget_usage(self) -> Dict:
315        """監控預算使用情況"""
316        
317        budget_status = {}
318        current_month_costs = self.calculate_current_costs(timedelta(days=30))
319        
320        for service_name, budget_limit in self.budget_limits.items():
321            if service_name in current_month_costs:
322                total_service_cost = sum(
323                    sum(model_costs.values()) if isinstance(model_costs, dict) 
324                    else model_costs
325                    for model_costs in current_month_costs[service_name].values()
326                )
327                
328                usage_percentage = (total_service_cost / budget_limit) * 100
329                
330                budget_status[service_name] = {
331                    'budget_limit': budget_limit,
332                    'current_spend': total_service_cost,
333                    'usage_percentage': usage_percentage,
334                    'remaining_budget': budget_limit - total_service_cost,
335                    'alert_level': self._get_budget_alert_level(usage_percentage)
336                }
337        
338        return budget_status
339    
340    def _get_budget_alert_level(self, usage_percentage: float) -> str:
341        """獲取預算告警級別"""
342        
343        if usage_percentage >= 95:
344            return 'critical'
345        elif usage_percentage >= 80:
346            return 'warning'
347        elif usage_percentage >= 70:
348            return 'watch'
349        else:
350            return 'normal'
351
352# 使用範例
353async def setup_cost_optimization():
354    optimizer = IntelligentCostOptimizer()
355    
356    # 設定預算限制
357    optimizer.set_budget_limit("ai-inference-service", 10000)  # $10,000/月
358    optimizer.set_budget_limit("model-training-service", 5000)  # $5,000/月
359    
360    # 定期生成最佳化建議
361    while True:
362        services = ["ai-inference-service", "model-training-service"]
363        
364        for service in services:
365            recommendations = optimizer.generate_optimization_recommendations(service)
366            
367            # 自動實施低風險建議
368            for recommendation in recommendations:
369                if (recommendation.confidence_score > 0.8 and 
370                    recommendation.estimated_savings > 100):  # 節省超過$100
371                    
372                    print(f"Implementing optimization for {service}: {recommendation.action_type}")
373                    result = await optimizer.implement_recommendation(recommendation, service)
374                    print(f"Implementation result: {result}")
375        
376        # 檢查預算狀態
377        budget_status = optimizer.monitor_budget_usage()
378        
379        for service, status in budget_status.items():
380            if status['alert_level'] in ['warning', 'critical']:
381                print(f"Budget alert for {service}: {status['usage_percentage']:.1f}% used")
382        
383        # 每小時檢查一次
384        await asyncio.sleep(3600)

總結

本文深入探討了生產環境 AI 系統監控與最佳化的核心技術:

  1. LLM-native 指標:時延、品質、成本與可靠性的全方位評估體系
  2. 分散式監控:OpenTelemetry、Prometheus 與 ELK Stack 的企業整合
  3. 智能故障診斷:多層次異常檢測與自動恢復機制
  4. 成本最佳化:動態資源分配、預算控制與智能成本建議

下一篇將完成系列最終篇,專注於客戶協作與問題解決實務,包含需求分析、技術溝通與專案交付管理。

<function_calls> [{“content”: “Plan 5-part AI FDE blog series structure and topics”, “status”: “completed”, “activeForm”: “Planning 5-part AI FDE blog series structure and topics”}, {“content”: “Create Part 1: AI FDE \u57fa\u790e\u6838\u5fc3\u6982\u5ff5\u8207\u6280\u8853\u68e7”, “status”: “completed”, “activeForm”: “Creating Part 1: AI FDE \u57fa\u790e\u6838\u5fc3\u6982\u5ff5\u8207\u6280\u8853\u68e7”}, {“content”: “Create Part 2: \u591a\u667a\u6167\u9ad4\u7cfb\u7d71\u8207\u6846\u67b6\u5be6\u6230”, “status”: “completed”, “activeForm”: “Creating Part 2: \u591a\u667a\u6167\u9ad4\u7cfb\u7d71\u8207\u6846\u67b6\u5be6\u6230”}, {“content”: “Create Part 3: \u4f01\u696d\u7d1a AI \u6574\u5408\u8207\u90e8\u7f72\u7b56\u7565”, “status”: “completed”, “activeForm”: “Creating Part 3: \u4f01\u696d\u7d1a AI \u6574\u5408\u8207\u90e8\u7f72\u7b56\u7565”}, {“content”: “Create Part 4: \u751f\u7522\u74b0\u5883 AI \u7cfb\u7d71\u76e3\u63a7\u8207\u6700\u4f73\u5316”, “status”: “completed”, “activeForm”: “Creating Part 4: \u751f\u7522\u74b0\u5883 AI \u7cfb\u7d71\u76e3\u63a7\u8207\u6700\u4f73\u5316”}, {“content”: “Create Part 5: \u5ba2\u6236\u5354\u4f5c\u8207\u554f\u984c\u89e3\u6c7a\u5be6\u52d9”, “status”: “in_progress”, “activeForm”: “Creating Part 5: \u5ba2\u6236\u5354\u4f5c\u8207\u554f\u984c\u89e3\u6c7a\u5be6\u52d9”}]

Yen

Yen

Yen