前言
企業級 AI 整合與部署是 AI FDE 最具挑戰性的工作之一。需要處理複雜的企業架構、安全合規要求、數據整合與系統可靠性問題。本文將深入探討雲端平台部署策略、企業安全框架、RAG 架構設計與數據管道建構等核心技術。
1. 雲端平台部署策略
Google Cloud Platform (GCP) 深度整合
Vertex AI 生產部署:
1from google.cloud import aiplatform
2from google.cloud.aiplatform import gapic
3import yaml
4
5class GCPAIDeploymentManager:
6 def __init__(self, project_id: str, region: str = "us-central1"):
7 self.project_id = project_id
8 self.region = region
9
10 # 初始化 Vertex AI
11 aiplatform.init(
12 project=project_id,
13 location=region,
14 staging_bucket=f"gs://{project_id}-ml-staging"
15 )
16
17 def deploy_custom_model(self, model_config: dict):
18 """部署客製化模型到 Vertex AI"""
19
20 # 創建容器映像
21 container_spec = {
22 "image_uri": model_config["container_image"],
23 "env": [
24 {"name": "MODEL_NAME", "value": model_config["model_name"]},
25 {"name": "MODEL_VERSION", "value": model_config["version"]},
26 {"name": "BATCH_SIZE", "value": str(model_config.get("batch_size", 32))}
27 ],
28 "ports": [{"container_port": 8080}]
29 }
30
31 # 模型規格定義
32 model = aiplatform.Model.upload(
33 display_name=model_config["display_name"],
34 artifact_uri=model_config["model_artifacts_uri"],
35 serving_container_image_uri=model_config["container_image"],
36 serving_container_predict_route="/predict",
37 serving_container_health_route="/health",
38 serving_container_environment_variables=model_config.get("env_vars", {}),
39 sync=True
40 )
41
42 # 部署到端點
43 endpoint = model.deploy(
44 machine_type=model_config.get("machine_type", "n1-standard-4"),
45 min_replica_count=model_config.get("min_replicas", 1),
46 max_replica_count=model_config.get("max_replicas", 10),
47 accelerator_type=model_config.get("accelerator_type"),
48 accelerator_count=model_config.get("accelerator_count"),
49 traffic_percentage=100,
50 sync=True
51 )
52
53 return {
54 "model_id": model.resource_name,
55 "endpoint_id": endpoint.resource_name,
56 "prediction_url": endpoint.predict_url
57 }
58
59 def setup_auto_scaling(self, endpoint_name: str, scaling_config: dict):
60 """設定自動擴展策略"""
61 client = gapic.EndpointServiceClient()
62
63 # 自動擴展設定
64 autoscaling_config = {
65 "min_replica_count": scaling_config["min_replicas"],
66 "max_replica_count": scaling_config["max_replicas"],
67 "target_utilization": scaling_config.get("target_cpu_utilization", 70),
68 "scale_in_replicas": scaling_config.get("scale_in_replicas", 1),
69 "scale_out_replicas": scaling_config.get("scale_out_replicas", 2)
70 }
71
72 # 更新端點配置
73 update_request = gapic.UpdateEndpointRequest(
74 endpoint={
75 "name": endpoint_name,
76 "traffic_split": {"0": 100},
77 "deployed_models": [{
78 "automatic_resources": {
79 "min_replica_count": autoscaling_config["min_replica_count"],
80 "max_replica_count": autoscaling_config["max_replica_count"]
81 }
82 }]
83 }
84 )
85
86 operation = client.update_endpoint(request=update_request)
87 return operation.result()
88
89# 部署配置範例
90deployment_config = {
91 "model_name": "enterprise-llm-v1",
92 "display_name": "Enterprise LLM Model",
93 "version": "1.0.0",
94 "container_image": "gcr.io/project-id/enterprise-llm:latest",
95 "model_artifacts_uri": "gs://project-bucket/models/enterprise-llm-v1/",
96 "machine_type": "n1-highmem-8",
97 "min_replicas": 2,
98 "max_replicas": 20,
99 "accelerator_type": "NVIDIA_TESLA_T4",
100 "accelerator_count": 1,
101 "env_vars": {
102 "MAX_SEQUENCE_LENGTH": "2048",
103 "TEMPERATURE": "0.7",
104 "TOP_P": "0.9"
105 }
106}
AWS 企業級部署架構
Amazon SageMaker 多模型端點:
1import boto3
2import sagemaker
3from sagemaker.multidatamodel import MultiDataModel
4from sagemaker.pytorch import PyTorchModel
5
6class AWSAIDeploymentManager:
7 def __init__(self, region: str, role_arn: str):
8 self.region = region
9 self.role_arn = role_arn
10 self.session = sagemaker.Session()
11 self.s3_client = boto3.client('s3')
12
13 def deploy_multi_model_endpoint(self, models_config: list):
14 """部署多模型端點以節省成本"""
15
16 # 設定多模型容器
17 container = {
18 'Image': '763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference:1.12.0-gpu-py38',
19 'ModelDataUrl': 's3://my-bucket/models/',
20 'Mode': 'MultiModel'
21 }
22
23 # 創建多模型數據模型
24 multi_data_model = MultiDataModel(
25 name="enterprise-multi-model",
26 model_data_prefix="s3://my-bucket/models/",
27 role=self.role_arn,
28 container_def=container,
29 sagemaker_session=self.session
30 )
31
32 # 部署端點
33 predictor = multi_data_model.deploy(
34 initial_instance_count=2,
35 instance_type='ml.g4dn.xlarge',
36 endpoint_name='enterprise-multi-model-endpoint',
37 data_capture_config={
38 'EnableCapture': True,
39 'InitialSamplingPercentage': 100,
40 'DestinationS3Uri': 's3://my-bucket/data-capture/',
41 'CaptureOptions': [
42 {'CaptureMode': 'Input'},
43 {'CaptureMode': 'Output'}
44 ]
45 }
46 )
47
48 return predictor
49
50 def setup_serverless_inference(self, model_config: dict):
51 """設定無伺服器推理端點"""
52
53 model = PyTorchModel(
54 entry_point='inference.py',
55 source_dir='code/',
56 model_data=model_config['model_data_url'],
57 role=self.role_arn,
58 framework_version='1.12.0',
59 py_version='py38',
60 predictor_cls=sagemaker.predictor.Predictor
61 )
62
63 # 無伺服器配置
64 serverless_config = sagemaker.ServerlessInferenceConfig(
65 memory_size_in_mb=6144, # 6GB
66 max_concurrency=50,
67 provisioned_concurrency=10
68 )
69
70 predictor = model.deploy(
71 serverless_inference_config=serverless_config,
72 endpoint_name=f"serverless-{model_config['name']}"
73 )
74
75 return predictor
76
77 def configure_auto_scaling(self, endpoint_name: str, scaling_policy: dict):
78 """配置 SageMaker 端點自動擴展"""
79
80 autoscaling_client = boto3.client('application-autoscaling')
81
82 # 註冊可擴展目標
83 autoscaling_client.register_scalable_target(
84 ServiceNamespace='sagemaker',
85 ResourceId=f'endpoint/{endpoint_name}/variant/AllTraffic',
86 ScalableDimension='sagemaker:variant:DesiredInstanceCount',
87 MinCapacity=scaling_policy['min_capacity'],
88 MaxCapacity=scaling_policy['max_capacity'],
89 RoleARN=self.role_arn
90 )
91
92 # 設定擴展策略
93 autoscaling_client.put_scaling_policy(
94 PolicyName=f'{endpoint_name}-target-tracking-policy',
95 ServiceNamespace='sagemaker',
96 ResourceId=f'endpoint/{endpoint_name}/variant/AllTraffic',
97 ScalableDimension='sagemaker:variant:DesiredInstanceCount',
98 PolicyType='TargetTrackingScaling',
99 TargetTrackingScalingPolicyConfiguration={
100 'TargetValue': scaling_policy['target_invocations_per_instance'],
101 'PredefinedMetricSpecification': {
102 'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance'
103 },
104 'ScaleOutCooldown': 300,
105 'ScaleInCooldown': 600
106 }
107 )
Azure OpenAI 企業整合
Azure 認知服務部署:
1from azure.identity import DefaultAzureCredential
2from azure.mgmt.cognitiveservices import CognitiveServicesManagementClient
3from azure.cognitiveservices.language.textanalytics import TextAnalyticsClient
4
5class AzureAIDeploymentManager:
6 def __init__(self, subscription_id: str, resource_group: str):
7 self.subscription_id = subscription_id
8 self.resource_group = resource_group
9 self.credential = DefaultAzureCredential()
10
11 def deploy_openai_service(self, deployment_config: dict):
12 """部署 Azure OpenAI 服務"""
13
14 client = CognitiveServicesManagementClient(
15 self.credential,
16 self.subscription_id
17 )
18
19 # 創建認知服務帳戶
20 account_info = {
21 'location': deployment_config['location'],
22 'sku': {'name': deployment_config['sku']},
23 'kind': 'OpenAI',
24 'properties': {
25 'customSubDomainName': deployment_config['subdomain'],
26 'publicNetworkAccess': 'Enabled',
27 'networkAcls': {
28 'defaultAction': 'Allow',
29 'virtualNetworkRules': [],
30 'ipRules': deployment_config.get('allowed_ips', [])
31 }
32 }
33 }
34
35 operation = client.accounts.begin_create(
36 resource_group_name=self.resource_group,
37 account_name=deployment_config['account_name'],
38 account=account_info
39 )
40
41 account = operation.result()
42
43 # 部署模型
44 model_deployments = []
45 for model_config in deployment_config['models']:
46 deployment = client.deployments.begin_create_or_update(
47 resource_group_name=self.resource_group,
48 account_name=deployment_config['account_name'],
49 deployment_name=model_config['deployment_name'],
50 deployment={
51 'properties': {
52 'model': {
53 'format': 'OpenAI',
54 'name': model_config['model_name'],
55 'version': model_config['version']
56 },
57 'scaleSettings': {
58 'scaleType': 'Standard',
59 'capacity': model_config['capacity']
60 }
61 }
62 }
63 )
64 model_deployments.append(deployment.result())
65
66 return {
67 'account': account,
68 'deployments': model_deployments
69 }
70
71 def setup_private_endpoint(self, config: dict):
72 """設定私有端點以增強安全性"""
73
74 from azure.mgmt.network import NetworkManagementClient
75
76 network_client = NetworkManagementClient(
77 self.credential,
78 self.subscription_id
79 )
80
81 # 創建私有端點
82 private_endpoint_params = {
83 'location': config['location'],
84 'subnet': {'id': config['subnet_id']},
85 'privateLinkServiceConnections': [{
86 'name': f"{config['service_name']}-connection",
87 'privateLinkServiceId': config['service_resource_id'],
88 'groupIds': ['account']
89 }]
90 }
91
92 operation = network_client.private_endpoints.begin_create_or_update(
93 resource_group_name=self.resource_group,
94 private_endpoint_name=config['endpoint_name'],
95 parameters=private_endpoint_params
96 )
97
98 return operation.result()
2. 企業安全框架與合規性
身份驗證與授權系統
企業級 RBAC 實作:
1from enum import Enum
2from dataclasses import dataclass
3from typing import Set, List, Dict, Optional
4import jwt
5import hashlib
6from datetime import datetime, timedelta
7
8class UserRole(Enum):
9 ADMIN = "admin"
10 DATA_SCIENTIST = "data_scientist"
11 ANALYST = "analyst"
12 VIEWER = "viewer"
13 EXTERNAL_CLIENT = "external_client"
14
15class ResourceType(Enum):
16 MODEL = "model"
17 DATASET = "dataset"
18 PIPELINE = "pipeline"
19 ENDPOINT = "endpoint"
20 LOGS = "logs"
21
22class Permission(Enum):
23 CREATE = "create"
24 READ = "read"
25 UPDATE = "update"
26 DELETE = "delete"
27 EXECUTE = "execute"
28 DEPLOY = "deploy"
29
30@dataclass
31class AccessControl:
32 resource_type: ResourceType
33 resource_id: str
34 permissions: Set[Permission]
35 conditions: Dict[str, any] = None
36
37class EnterpriseSecurityManager:
38 def __init__(self, secret_key: str):
39 self.secret_key = secret_key
40 self.role_permissions = self._setup_role_permissions()
41 self.audit_log = []
42
43 def _setup_role_permissions(self) -> Dict[UserRole, List[AccessControl]]:
44 """定義角色權限矩陣"""
45 return {
46 UserRole.ADMIN: [
47 AccessControl(ResourceType.MODEL, "*", {Permission.CREATE, Permission.READ, Permission.UPDATE, Permission.DELETE, Permission.DEPLOY}),
48 AccessControl(ResourceType.DATASET, "*", {Permission.CREATE, Permission.READ, Permission.UPDATE, Permission.DELETE}),
49 AccessControl(ResourceType.PIPELINE, "*", {Permission.CREATE, Permission.READ, Permission.UPDATE, Permission.DELETE, Permission.EXECUTE}),
50 AccessControl(ResourceType.LOGS, "*", {Permission.READ})
51 ],
52 UserRole.DATA_SCIENTIST: [
53 AccessControl(ResourceType.MODEL, "*", {Permission.CREATE, Permission.READ, Permission.UPDATE}),
54 AccessControl(ResourceType.DATASET, "*", {Permission.READ}),
55 AccessControl(ResourceType.PIPELINE, "*", {Permission.CREATE, Permission.READ, Permission.EXECUTE})
56 ],
57 UserRole.ANALYST: [
58 AccessControl(ResourceType.MODEL, "*", {Permission.READ}),
59 AccessControl(ResourceType.DATASET, "*", {Permission.READ}),
60 AccessControl(ResourceType.PIPELINE, "*", {Permission.READ, Permission.EXECUTE})
61 ],
62 UserRole.VIEWER: [
63 AccessControl(ResourceType.MODEL, "*", {Permission.READ}),
64 AccessControl(ResourceType.DATASET, "public_*", {Permission.READ})
65 ],
66 UserRole.EXTERNAL_CLIENT: [
67 AccessControl(ResourceType.ENDPOINT, "client_*", {Permission.READ},
68 conditions={"time_limit": "business_hours", "rate_limit": 1000})
69 ]
70 }
71
72 def authenticate_user(self, token: str) -> Optional[Dict]:
73 """驗證用戶令牌"""
74 try:
75 payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
76
77 # 檢查令牌是否過期
78 if datetime.utcnow() > datetime.fromtimestamp(payload['exp']):
79 return None
80
81 return {
82 'user_id': payload['user_id'],
83 'role': UserRole(payload['role']),
84 'permissions': payload.get('permissions', []),
85 'organization': payload.get('organization'),
86 'expires_at': payload['exp']
87 }
88 except jwt.InvalidTokenError:
89 return None
90
91 def authorize_action(self, user_context: Dict, resource_type: ResourceType,
92 resource_id: str, action: Permission) -> bool:
93 """授權檢查"""
94 user_role = user_context['role']
95 user_permissions = self.role_permissions.get(user_role, [])
96
97 for access_control in user_permissions:
98 if (access_control.resource_type == resource_type and
99 self._match_resource_pattern(access_control.resource_id, resource_id) and
100 action in access_control.permissions):
101
102 # 檢查額外條件
103 if access_control.conditions:
104 if not self._check_conditions(access_control.conditions, user_context):
105 continue
106
107 # 記錄訪問日誌
108 self._log_access(user_context, resource_type, resource_id, action, "GRANTED")
109 return True
110
111 self._log_access(user_context, resource_type, resource_id, action, "DENIED")
112 return False
113
114 def _match_resource_pattern(self, pattern: str, resource_id: str) -> bool:
115 """資源模式匹配"""
116 if pattern == "*":
117 return True
118 if pattern.endswith("*"):
119 return resource_id.startswith(pattern[:-1])
120 return pattern == resource_id
121
122 def _check_conditions(self, conditions: Dict, user_context: Dict) -> bool:
123 """檢查額外訪問條件"""
124 # 時間限制檢查
125 if "time_limit" in conditions:
126 current_hour = datetime.now().hour
127 if conditions["time_limit"] == "business_hours":
128 if not (9 <= current_hour <= 17):
129 return False
130
131 # 速率限制檢查
132 if "rate_limit" in conditions:
133 # 實作速率限制邏輯
134 pass
135
136 return True
137
138 def _log_access(self, user_context: Dict, resource_type: ResourceType,
139 resource_id: str, action: Permission, result: str):
140 """記錄訪問審計日誌"""
141 log_entry = {
142 'timestamp': datetime.utcnow().isoformat(),
143 'user_id': user_context['user_id'],
144 'organization': user_context.get('organization'),
145 'resource_type': resource_type.value,
146 'resource_id': resource_id,
147 'action': action.value,
148 'result': result,
149 'ip_address': user_context.get('ip_address'),
150 'user_agent': user_context.get('user_agent')
151 }
152
153 self.audit_log.append(log_entry)
154
155 # 實際環境中應該寫入外部審計系統
156 print(f"AUDIT: {log_entry}")
數據安全與加密
端到端數據保護:
1from cryptography.fernet import Fernet
2from cryptography.hazmat.primitives import hashes, serialization
3from cryptography.hazmat.primitives.asymmetric import rsa, padding
4import base64
5
6class DataSecurityManager:
7 def __init__(self):
8 self.symmetric_key = Fernet.generate_key()
9 self.cipher_suite = Fernet(self.symmetric_key)
10
11 # 生成 RSA 密鑰對
12 self.private_key = rsa.generate_private_key(
13 public_exponent=65537,
14 key_size=2048
15 )
16 self.public_key = self.private_key.public_key()
17
18 def encrypt_sensitive_data(self, data: bytes, classification: str = "confidential") -> Dict:
19 """根據數據分類進行加密"""
20
21 if classification == "public":
22 # 公開數據不需加密
23 return {
24 "data": base64.b64encode(data).decode(),
25 "encrypted": False,
26 "classification": classification
27 }
28
29 elif classification in ["internal", "confidential", "restricted"]:
30 # 使用對稱加密
31 encrypted_data = self.cipher_suite.encrypt(data)
32
33 return {
34 "data": base64.b64encode(encrypted_data).decode(),
35 "encrypted": True,
36 "encryption_type": "symmetric",
37 "classification": classification,
38 "key_id": self._get_key_fingerprint(self.symmetric_key)
39 }
40
41 elif classification == "top_secret":
42 # 使用非對稱加密
43 encrypted_data = self.public_key.encrypt(
44 data,
45 padding.OAEP(
46 mgf=padding.MGF1(algorithm=hashes.SHA256()),
47 algorithm=hashes.SHA256(),
48 label=None
49 )
50 )
51
52 return {
53 "data": base64.b64encode(encrypted_data).decode(),
54 "encrypted": True,
55 "encryption_type": "asymmetric",
56 "classification": classification
57 }
58
59 def decrypt_sensitive_data(self, encrypted_package: Dict) -> bytes:
60 """解密敏感數據"""
61
62 if not encrypted_package["encrypted"]:
63 return base64.b64decode(encrypted_package["data"])
64
65 encrypted_data = base64.b64decode(encrypted_package["data"])
66
67 if encrypted_package["encryption_type"] == "symmetric":
68 return self.cipher_suite.decrypt(encrypted_data)
69
70 elif encrypted_package["encryption_type"] == "asymmetric":
71 return self.private_key.decrypt(
72 encrypted_data,
73 padding.OAEP(
74 mgf=padding.MGF1(algorithm=hashes.SHA256()),
75 algorithm=hashes.SHA256(),
76 label=None
77 )
78 )
79
80 def _get_key_fingerprint(self, key: bytes) -> str:
81 """生成密鑰指紋"""
82 digest = hashes.Hash(hashes.SHA256())
83 digest.update(key)
84 return base64.b64encode(digest.finalize()).decode()[:16]
85
86 def implement_data_masking(self, data: Dict, user_role: UserRole) -> Dict:
87 """根據用戶角色實施數據遮罩"""
88
89 masking_rules = {
90 UserRole.EXTERNAL_CLIENT: {
91 "personal_id": "***MASKED***",
92 "email": lambda x: x.split('@')[0][:3] + "***@" + x.split('@')[1],
93 "phone": lambda x: x[:3] + "***" + x[-4:] if len(x) > 7 else "***MASKED***"
94 },
95 UserRole.ANALYST: {
96 "personal_id": lambda x: x[:4] + "***" + x[-2:] if len(x) > 6 else "***MASKED***"
97 },
98 UserRole.DATA_SCIENTIST: {
99 # 數據科學家可以看到更多詳細數據,但仍需遮罩部分敏感信息
100 },
101 UserRole.ADMIN: {
102 # 管理員可以看到所有數據
103 }
104 }
105
106 rules = masking_rules.get(user_role, {})
107 masked_data = data.copy()
108
109 for field, mask_func in rules.items():
110 if field in masked_data:
111 if callable(mask_func):
112 masked_data[field] = mask_func(str(masked_data[field]))
113 else:
114 masked_data[field] = mask_func
115
116 return masked_data
3. RAG (Retrieval-Augmented Generation) 架構設計
企業級向量數據庫實作
向量搜尋與語意檢索:
1import chromadb
2from sentence_transformers import SentenceTransformer
3import numpy as np
4from typing import List, Dict, Optional
5import hashlib
6
7class EnterpriseRAGSystem:
8 def __init__(self, collection_name: str = "enterprise_knowledge"):
9 # 初始化向量數據庫
10 self.chroma_client = chromadb.PersistentClient(path="./vector_db")
11
12 # 初始化嵌入模型
13 self.embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
14
15 # 創建或獲取集合
16 self.collection = self.chroma_client.get_or_create_collection(
17 name=collection_name,
18 metadata={"hnsw:space": "cosine"}
19 )
20
21 def add_documents(self, documents: List[Dict[str, any]], batch_size: int = 100):
22 """批量添加文件到向量數據庫"""
23
24 for i in range(0, len(documents), batch_size):
25 batch = documents[i:i+batch_size]
26
27 # 準備批次數據
28 texts = [doc['content'] for doc in batch]
29 metadatas = []
30 ids = []
31
32 for doc in batch:
33 # 生成唯一 ID
34 doc_id = self._generate_doc_id(doc)
35 ids.append(doc_id)
36
37 # 準備元數據
38 metadata = {
39 'source': doc.get('source', 'unknown'),
40 'title': doc.get('title', ''),
41 'category': doc.get('category', 'general'),
42 'access_level': doc.get('access_level', 'internal'),
43 'created_at': doc.get('created_at', ''),
44 'last_updated': doc.get('last_updated', ''),
45 'content_length': len(doc['content'])
46 }
47 metadatas.append(metadata)
48
49 # 生成嵌入向量
50 embeddings = self.embedding_model.encode(texts).tolist()
51
52 # 添加到向量數據庫
53 self.collection.add(
54 embeddings=embeddings,
55 documents=texts,
56 metadatas=metadatas,
57 ids=ids
58 )
59
60 def semantic_search(self, query: str, n_results: int = 5,
61 access_level: str = "internal",
62 filters: Optional[Dict] = None) -> List[Dict]:
63 """執行語意搜尋"""
64
65 # 生成查詢嵌入
66 query_embedding = self.embedding_model.encode([query]).tolist()
67
68 # 構建過濾條件
69 where_clause = {"access_level": {"$in": self._get_allowed_access_levels(access_level)}}
70
71 if filters:
72 where_clause.update(filters)
73
74 # 執行搜尋
75 results = self.collection.query(
76 query_embeddings=query_embedding,
77 n_results=n_results,
78 where=where_clause,
79 include=["documents", "metadatas", "distances"]
80 )
81
82 # 格式化結果
83 formatted_results = []
84 for i, doc in enumerate(results['documents'][0]):
85 formatted_results.append({
86 'content': doc,
87 'metadata': results['metadatas'][0][i],
88 'similarity_score': 1 - results['distances'][0][i], # 轉換為相似度分數
89 'relevance_rank': i + 1
90 })
91
92 return formatted_results
93
94 def hybrid_search(self, query: str, keyword_weight: float = 0.3,
95 semantic_weight: float = 0.7) -> List[Dict]:
96 """混合搜尋:結合關鍵詞與語意搜尋"""
97
98 # 語意搜尋
99 semantic_results = self.semantic_search(query, n_results=20)
100
101 # 關鍵詞搜尋 (簡化實作)
102 keyword_results = self._keyword_search(query, n_results=20)
103
104 # 合併與重新排序
105 combined_scores = {}
106
107 for result in semantic_results:
108 doc_id = result['metadata'].get('id', hash(result['content']))
109 combined_scores[doc_id] = {
110 'doc': result,
111 'semantic_score': result['similarity_score'],
112 'keyword_score': 0
113 }
114
115 for result in keyword_results:
116 doc_id = result['metadata'].get('id', hash(result['content']))
117 if doc_id in combined_scores:
118 combined_scores[doc_id]['keyword_score'] = result['score']
119 else:
120 combined_scores[doc_id] = {
121 'doc': result,
122 'semantic_score': 0,
123 'keyword_score': result['score']
124 }
125
126 # 計算最終分數
127 final_results = []
128 for doc_id, scores in combined_scores.items():
129 final_score = (semantic_weight * scores['semantic_score'] +
130 keyword_weight * scores['keyword_score'])
131
132 result_doc = scores['doc'].copy()
133 result_doc['final_score'] = final_score
134 final_results.append(result_doc)
135
136 # 按分數排序
137 final_results.sort(key=lambda x: x['final_score'], reverse=True)
138
139 return final_results[:10]
140
141 def _generate_doc_id(self, doc: Dict) -> str:
142 """生成文件唯一 ID"""
143 content_hash = hashlib.md5(doc['content'].encode()).hexdigest()
144 source = doc.get('source', 'unknown')
145 return f"{source}_{content_hash[:12]}"
146
147 def _get_allowed_access_levels(self, user_access_level: str) -> List[str]:
148 """根據用戶權限獲取允許訪問的數據級別"""
149 access_hierarchy = {
150 "public": ["public"],
151 "internal": ["public", "internal"],
152 "confidential": ["public", "internal", "confidential"],
153 "restricted": ["public", "internal", "confidential", "restricted"]
154 }
155
156 return access_hierarchy.get(user_access_level, ["public"])
157
158 def _keyword_search(self, query: str, n_results: int) -> List[Dict]:
159 """關鍵詞搜尋實作 (簡化版)"""
160 # 實際環境中應使用如 Elasticsearch 等全文搜尋引擎
161 query_terms = query.lower().split()
162
163 # 獲取所有文檔
164 all_docs = self.collection.get()
165
166 scored_docs = []
167 for i, doc in enumerate(all_docs['documents']):
168 score = sum(term in doc.lower() for term in query_terms) / len(query_terms)
169 if score > 0:
170 scored_docs.append({
171 'content': doc,
172 'metadata': all_docs['metadatas'][i],
173 'score': score
174 })
175
176 scored_docs.sort(key=lambda x: x['score'], reverse=True)
177 return scored_docs[:n_results]
智能文件處理與分塊策略
適應性文件分塊:
1import re
2from typing import List, Dict
3from dataclasses import dataclass
4
5@dataclass
6class DocumentChunk:
7 content: str
8 chunk_type: str
9 metadata: Dict
10 start_position: int
11 end_position: int
12 chunk_id: str
13
14class IntelligentDocumentChunker:
15 def __init__(self, max_chunk_size: int = 512, overlap_size: int = 50):
16 self.max_chunk_size = max_chunk_size
17 self.overlap_size = overlap_size
18
19 def process_document(self, document: Dict) -> List[DocumentChunk]:
20 """智能文件分塊處理"""
21
22 content = document['content']
23 doc_type = self._detect_document_type(content)
24
25 if doc_type == "code":
26 return self._chunk_code_document(content, document)
27 elif doc_type == "structured":
28 return self._chunk_structured_document(content, document)
29 elif doc_type == "academic":
30 return self._chunk_academic_document(content, document)
31 else:
32 return self._chunk_general_document(content, document)
33
34 def _detect_document_type(self, content: str) -> str:
35 """檢測文件類型"""
36
37 # 程式碼檢測
38 code_patterns = [
39 r'def\s+\w+\(', # Python 函數
40 r'function\s+\w+\(', # JavaScript 函數
41 r'class\s+\w+', # 類別定義
42 r'import\s+\w+', # 導入語句
43 r'\{\s*[\w\s:]+\s*\}', # JSON 物件
44 ]
45
46 if any(re.search(pattern, content) for pattern in code_patterns):
47 return "code"
48
49 # 學術論文檢測
50 academic_patterns = [
51 r'Abstract\s*:',
52 r'Keywords\s*:',
53 r'References\s*\n',
54 r'Fig\.\s+\d+',
55 r'Table\s+\d+',
56 ]
57
58 if any(re.search(pattern, content, re.IGNORECASE) for pattern in academic_patterns):
59 return "academic"
60
61 # 結構化文件檢測
62 structured_patterns = [
63 r'^#+\s', # Markdown 標題
64 r'^\d+\.\s', # 編號列表
65 r'^\*\s', # 項目符號
66 ]
67
68 if any(re.search(pattern, content, re.MULTILINE) for pattern in structured_patterns):
69 return "structured"
70
71 return "general"
72
73 def _chunk_code_document(self, content: str, document: Dict) -> List[DocumentChunk]:
74 """程式碼文件分塊"""
75 chunks = []
76
77 # 按函數/類別分塊
78 function_pattern = r'(def\s+\w+.*?(?=def\s+\w+|class\s+\w+|$))'
79 class_pattern = r'(class\s+\w+.*?(?=class\s+\w+|def\s+\w+|$))'
80
81 code_blocks = re.findall(f'{class_pattern}|{function_pattern}', content, re.DOTALL)
82
83 position = 0
84 for i, block_match in enumerate(code_blocks):
85 block = block_match[0] or block_match[1] # 取非空的匹配
86
87 if len(block.strip()) > 0:
88 chunk = DocumentChunk(
89 content=block.strip(),
90 chunk_type="code_block",
91 metadata={
92 **document.get('metadata', {}),
93 'block_index': i,
94 'programming_language': self._detect_language(block)
95 },
96 start_position=position,
97 end_position=position + len(block),
98 chunk_id=f"{document.get('id', 'unknown')}_code_{i}"
99 )
100 chunks.append(chunk)
101
102 position += len(block)
103
104 return chunks
105
106 def _chunk_structured_document(self, content: str, document: Dict) -> List[DocumentChunk]:
107 """結構化文件分塊(按章節)"""
108 chunks = []
109
110 # 按標題分塊
111 section_pattern = r'(^#+\s+.+$)'
112 sections = re.split(section_pattern, content, flags=re.MULTILINE)
113
114 current_section = ""
115 section_content = ""
116 position = 0
117
118 for i, part in enumerate(sections):
119 if re.match(r'^#+\s+', part):
120 # 這是一個標題
121 if section_content.strip():
122 # 保存前一個章節
123 chunk = DocumentChunk(
124 content=f"{current_section}\n{section_content}".strip(),
125 chunk_type="section",
126 metadata={
127 **document.get('metadata', {}),
128 'section_title': current_section.strip('#').strip(),
129 'section_level': len(current_section) - len(current_section.lstrip('#'))
130 },
131 start_position=position - len(section_content),
132 end_position=position,
133 chunk_id=f"{document.get('id', 'unknown')}_section_{len(chunks)}"
134 )
135 chunks.append(chunk)
136
137 current_section = part
138 section_content = ""
139 else:
140 section_content += part
141
142 position += len(part)
143
144 # 添加最後一個章節
145 if section_content.strip():
146 chunk = DocumentChunk(
147 content=f"{current_section}\n{section_content}".strip(),
148 chunk_type="section",
149 metadata={
150 **document.get('metadata', {}),
151 'section_title': current_section.strip('#').strip() if current_section else "Final Section"
152 },
153 start_position=position - len(section_content),
154 end_position=position,
155 chunk_id=f"{document.get('id', 'unknown')}_section_{len(chunks)}"
156 )
157 chunks.append(chunk)
158
159 return chunks
160
161 def _chunk_general_document(self, content: str, document: Dict) -> List[DocumentChunk]:
162 """一般文件分塊(固定大小 + 重疊)"""
163 chunks = []
164 words = content.split()
165
166 for i in range(0, len(words), self.max_chunk_size - self.overlap_size):
167 chunk_words = words[i:i + self.max_chunk_size]
168 chunk_content = ' '.join(chunk_words)
169
170 chunk = DocumentChunk(
171 content=chunk_content,
172 chunk_type="text_chunk",
173 metadata={
174 **document.get('metadata', {}),
175 'chunk_index': i // (self.max_chunk_size - self.overlap_size),
176 'word_count': len(chunk_words)
177 },
178 start_position=i,
179 end_position=i + len(chunk_words),
180 chunk_id=f"{document.get('id', 'unknown')}_chunk_{len(chunks)}"
181 )
182 chunks.append(chunk)
183
184 return chunks
185
186 def _detect_language(self, code: str) -> str:
187 """檢測程式語言"""
188 language_patterns = {
189 'python': [r'def\s+\w+', r'import\s+\w+', r'class\s+\w+', r'if\s+__name__'],
190 'javascript': [r'function\s+\w+', r'var\s+\w+', r'const\s+\w+', r'=>'],
191 'java': [r'public\s+class', r'import\s+java\.', r'public\s+static\s+void'],
192 'cpp': [r'#include\s*<', r'int\s+main\s*\(', r'std::'],
193 'sql': [r'SELECT\s+', r'FROM\s+', r'WHERE\s+', r'INSERT\s+INTO']
194 }
195
196 for language, patterns in language_patterns.items():
197 if any(re.search(pattern, code, re.IGNORECASE) for pattern in patterns):
198 return language
199
200 return "unknown"
4. 企業數據管道與 ETL 架構
即時數據處理管道
Apache Kafka + Apache Spark 整合:
1from kafka import KafkaProducer, KafkaConsumer
2from pyspark.sql import SparkSession
3from pyspark.sql.functions import *
4from pyspark.sql.types import *
5import json
6
7class EnterpriseDataPipeline:
8 def __init__(self, kafka_config: Dict, spark_config: Dict):
9 self.kafka_config = kafka_config
10
11 # 初始化 Spark Session
12 self.spark = SparkSession.builder \
13 .appName("EnterpriseAIPipeline") \
14 .config("spark.sql.adaptive.enabled", "true") \
15 .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
16 .getOrCreate()
17
18 # 設定日誌級別
19 self.spark.sparkContext.setLogLevel("WARN")
20
21 def create_streaming_pipeline(self, input_topic: str, output_topic: str):
22 """創建即時數據處理管道"""
23
24 # 從 Kafka 讀取串流數據
25 df = self.spark \
26 .readStream \
27 .format("kafka") \
28 .option("kafka.bootstrap.servers", self.kafka_config['bootstrap_servers']) \
29 .option("subscribe", input_topic) \
30 .option("startingOffsets", "latest") \
31 .load()
32
33 # 定義數據結構
34 schema = StructType([
35 StructField("user_id", StringType(), True),
36 StructField("event_type", StringType(), True),
37 StructField("timestamp", TimestampType(), True),
38 StructField("data", MapType(StringType(), StringType()), True)
39 ])
40
41 # 解析 JSON 數據
42 parsed_df = df.select(
43 from_json(col("value").cast("string"), schema).alias("parsed_data"),
44 col("timestamp").alias("kafka_timestamp")
45 ).select("parsed_data.*", "kafka_timestamp")
46
47 # 數據清洗與轉換
48 cleaned_df = self._clean_and_transform_data(parsed_df)
49
50 # 特徵工程
51 enriched_df = self._feature_engineering(cleaned_df)
52
53 # 異常檢測
54 anomaly_df = self._detect_anomalies(enriched_df)
55
56 # 輸出到 Kafka
57 query = anomaly_df \
58 .select(to_json(struct("*")).alias("value")) \
59 .writeStream \
60 .format("kafka") \
61 .option("kafka.bootstrap.servers", self.kafka_config['bootstrap_servers']) \
62 .option("topic", output_topic) \
63 .option("checkpointLocation", "/tmp/kafka-checkpoint") \
64 .outputMode("append") \
65 .start()
66
67 return query
68
69 def _clean_and_transform_data(self, df):
70 """數據清洗與轉換"""
71
72 return df \
73 .filter(col("user_id").isNotNull()) \
74 .filter(col("event_type").isin(["click", "view", "purchase", "search"])) \
75 .withColumn("hour", hour(col("timestamp"))) \
76 .withColumn("day_of_week", dayofweek(col("timestamp"))) \
77 .withColumn("is_weekend", when(col("day_of_week").isin([1, 7]), 1).otherwise(0))
78
79 def _feature_engineering(self, df):
80 """特徵工程"""
81
82 # 時間窗口聚合
83 windowed_df = df \
84 .withWatermark("timestamp", "10 minutes") \
85 .groupBy(
86 col("user_id"),
87 window(col("timestamp"), "5 minutes")
88 ) \
89 .agg(
90 count("*").alias("event_count"),
91 countDistinct("event_type").alias("unique_event_types"),
92 collect_list("event_type").alias("event_sequence")
93 )
94
95 # 添加衍生特徵
96 enhanced_df = windowed_df \
97 .withColumn("events_per_minute", col("event_count") / 5.0) \
98 .withColumn("event_diversity", col("unique_event_types") / col("event_count"))
99
100 return enhanced_df
101
102 def _detect_anomalies(self, df):
103 """異常檢測"""
104
105 # 簡單的統計異常檢測
106 stats_df = df \
107 .select(
108 mean("event_count").alias("mean_events"),
109 stddev("event_count").alias("stddev_events")
110 )
111
112 # 收集統計信息
113 stats = stats_df.collect()[0]
114 mean_events = stats["mean_events"]
115 stddev_events = stats["stddev_events"]
116
117 # 標記異常 (使用 Z-score)
118 threshold = 2.0 # Z-score 閾值
119
120 anomaly_df = df \
121 .withColumn(
122 "z_score",
123 abs(col("event_count") - lit(mean_events)) / lit(stddev_events)
124 ) \
125 .withColumn(
126 "is_anomaly",
127 when(col("z_score") > threshold, 1).otherwise(0)
128 ) \
129 .withColumn("anomaly_score", col("z_score"))
130
131 return anomaly_df
132
133 def setup_batch_processing(self, input_path: str, output_path: str):
134 """設定批次處理作業"""
135
136 # 讀取批次數據
137 batch_df = self.spark.read \
138 .option("multiline", "true") \
139 .json(input_path)
140
141 # 數據品質檢查
142 quality_report = self._data_quality_check(batch_df)
143
144 # 數據處理
145 processed_df = batch_df \
146 .transform(self._clean_and_transform_data) \
147 .transform(self._feature_engineering)
148
149 # 分區並寫入
150 processed_df \
151 .repartition(col("day_of_week")) \
152 .write \
153 .mode("overwrite") \
154 .partitionBy("day_of_week") \
155 .parquet(output_path)
156
157 return quality_report
158
159 def _data_quality_check(self, df):
160 """數據品質檢查"""
161
162 total_records = df.count()
163
164 quality_metrics = {
165 "total_records": total_records,
166 "null_user_ids": df.filter(col("user_id").isNull()).count(),
167 "null_timestamps": df.filter(col("timestamp").isNull()).count(),
168 "duplicate_records": total_records - df.dropDuplicates().count(),
169 "completeness_rate": 1.0 - (df.filter(col("user_id").isNull()).count() / total_records)
170 }
171
172 return quality_metrics
機器學習管道自動化
MLOps 流程實作:
1from mlflow import mlflow
2import mlflow.sklearn
3from sklearn.ensemble import RandomForestClassifier
4from sklearn.metrics import accuracy_score, precision_score, recall_score
5import joblib
6from typing import Dict, Any
7
8class MLOpsManager:
9 def __init__(self, experiment_name: str):
10 self.experiment_name = experiment_name
11 mlflow.set_experiment(experiment_name)
12
13 def create_training_pipeline(self, config: Dict):
14 """創建訓練管道"""
15
16 with mlflow.start_run(run_name=f"training_{config['model_name']}"):
17
18 # 記錄參數
19 mlflow.log_params(config)
20
21 # 數據準備
22 X_train, y_train, X_test, y_test = self._prepare_data(config['data_config'])
23
24 # 模型訓練
25 model = self._train_model(X_train, y_train, config['model_config'])
26
27 # 模型評估
28 metrics = self._evaluate_model(model, X_test, y_test)
29 mlflow.log_metrics(metrics)
30
31 # 記錄模型
32 mlflow.sklearn.log_model(
33 model,
34 "model",
35 registered_model_name=config['model_name']
36 )
37
38 # 模型驗證
39 validation_passed = self._validate_model(model, metrics, config['validation_criteria'])
40
41 if validation_passed:
42 # 推進到生產環境
43 self._promote_to_production(config['model_name'], mlflow.active_run().info.run_id)
44
45 return {
46 "run_id": mlflow.active_run().info.run_id,
47 "metrics": metrics,
48 "validation_passed": validation_passed
49 }
50
51 def _prepare_data(self, data_config: Dict):
52 """數據準備"""
53 # 實際實作會從數據湖或數據倉庫讀取數據
54 # 這裡使用模擬數據
55 from sklearn.datasets import make_classification
56 from sklearn.model_selection import train_test_split
57
58 X, y = make_classification(
59 n_samples=data_config.get('n_samples', 10000),
60 n_features=data_config.get('n_features', 20),
61 n_informative=data_config.get('n_informative', 10),
62 random_state=42
63 )
64
65 return train_test_split(X, y, test_size=0.2, random_state=42)
66
67 def _train_model(self, X_train, y_train, model_config: Dict):
68 """模型訓練"""
69
70 model = RandomForestClassifier(
71 n_estimators=model_config.get('n_estimators', 100),
72 max_depth=model_config.get('max_depth', 10),
73 random_state=42
74 )
75
76 model.fit(X_train, y_train)
77 return model
78
79 def _evaluate_model(self, model, X_test, y_test):
80 """模型評估"""
81
82 y_pred = model.predict(X_test)
83
84 return {
85 "accuracy": accuracy_score(y_test, y_pred),
86 "precision": precision_score(y_test, y_pred, average='weighted'),
87 "recall": recall_score(y_test, y_pred, average='weighted')
88 }
89
90 def _validate_model(self, model, metrics: Dict, criteria: Dict) -> bool:
91 """模型驗證"""
92
93 for metric_name, threshold in criteria.items():
94 if metrics.get(metric_name, 0) < threshold:
95 print(f"Model failed validation: {metric_name} = {metrics.get(metric_name)} < {threshold}")
96 return False
97
98 return True
99
100 def _promote_to_production(self, model_name: str, run_id: str):
101 """推進模型到生產環境"""
102
103 client = mlflow.tracking.MlflowClient()
104
105 # 獲取模型版本
106 model_version = client.create_model_version(
107 name=model_name,
108 source=f"runs:/{run_id}/model",
109 run_id=run_id
110 )
111
112 # 設定為生產階段
113 client.transition_model_version_stage(
114 name=model_name,
115 version=model_version.version,
116 stage="Production"
117 )
118
119 print(f"Model {model_name} version {model_version.version} promoted to Production")
120
121 def setup_model_monitoring(self, model_name: str):
122 """設定模型監控"""
123
124 # 模型效能監控
125 def monitor_model_performance():
126 # 獲取生產模型
127 client = mlflow.tracking.MlflowClient()
128 model_version = client.get_latest_versions(model_name, stages=["Production"])[0]
129
130 # 載入模型
131 model_uri = f"models:/{model_name}/{model_version.version}"
132 model = mlflow.sklearn.load_model(model_uri)
133
134 # 獲取新數據並評估
135 # 實際實作會從監控系統獲取新數據
136 new_data = self._get_new_data()
137
138 if new_data:
139 predictions = model.predict(new_data['X'])
140
141 # 計算效能指標
142 if 'y' in new_data: # 如果有真實標籤
143 accuracy = accuracy_score(new_data['y'], predictions)
144
145 # 記錄監控指標
146 with mlflow.start_run():
147 mlflow.log_metric("production_accuracy", accuracy)
148 mlflow.log_metric("data_drift_score", self._calculate_data_drift(new_data['X']))
149
150 # 檢查是否需要重新訓練
151 if accuracy < 0.8: # 效能閾值
152 self._trigger_retraining(model_name)
153
154 return monitor_model_performance
155
156 def _get_new_data(self):
157 """獲取新數據用於監控"""
158 # 實際實作會從生產環境獲取數據
159 return None
160
161 def _calculate_data_drift(self, new_data):
162 """計算數據漂移"""
163 # 實際實作會使用如 evidently 等工具
164 return 0.0
165
166 def _trigger_retraining(self, model_name: str):
167 """觸發模型重新訓練"""
168 print(f"Triggering retraining for model: {model_name}")
169 # 實際實作會啟動重新訓練流程
總結
本文深入探討了企業級 AI 整合與部署的關鍵技術:
- 雲端平台部署:GCP Vertex AI、AWS SageMaker、Azure OpenAI 的企業級應用
- 安全框架:RBAC 權限管理、數據加密、審計日誌與合規性
- RAG 架構:向量數據庫、智能文件分塊、語意搜尋與混合檢索
- 數據管道:即時串流處理、批次作業、MLOps 自動化與模型監控
下一篇將專注於生產環境 AI 系統的監控與最佳化,包含效能調優、故障診斷與成本管理。
<function_calls>
