AI Forward Deployed Engineer 必備技能指南(三):企業級 AI 整合與部署策略

前言

企業級 AI 整合與部署是 AI FDE 最具挑戰性的工作之一。需要處理複雜的企業架構、安全合規要求、數據整合與系統可靠性問題。本文將深入探討雲端平台部署策略、企業安全框架、RAG 架構設計與數據管道建構等核心技術。

1. 雲端平台部署策略

Google Cloud Platform (GCP) 深度整合

Vertex AI 生產部署:

  1from google.cloud import aiplatform
  2from google.cloud.aiplatform import gapic
  3import yaml
  4
  5class GCPAIDeploymentManager:
  6    def __init__(self, project_id: str, region: str = "us-central1"):
  7        self.project_id = project_id
  8        self.region = region
  9        
 10        # 初始化 Vertex AI
 11        aiplatform.init(
 12            project=project_id,
 13            location=region,
 14            staging_bucket=f"gs://{project_id}-ml-staging"
 15        )
 16    
 17    def deploy_custom_model(self, model_config: dict):
 18        """部署客製化模型到 Vertex AI"""
 19        
 20        # 創建容器映像
 21        container_spec = {
 22            "image_uri": model_config["container_image"],
 23            "env": [
 24                {"name": "MODEL_NAME", "value": model_config["model_name"]},
 25                {"name": "MODEL_VERSION", "value": model_config["version"]},
 26                {"name": "BATCH_SIZE", "value": str(model_config.get("batch_size", 32))}
 27            ],
 28            "ports": [{"container_port": 8080}]
 29        }
 30        
 31        # 模型規格定義
 32        model = aiplatform.Model.upload(
 33            display_name=model_config["display_name"],
 34            artifact_uri=model_config["model_artifacts_uri"],
 35            serving_container_image_uri=model_config["container_image"],
 36            serving_container_predict_route="/predict",
 37            serving_container_health_route="/health",
 38            serving_container_environment_variables=model_config.get("env_vars", {}),
 39            sync=True
 40        )
 41        
 42        # 部署到端點
 43        endpoint = model.deploy(
 44            machine_type=model_config.get("machine_type", "n1-standard-4"),
 45            min_replica_count=model_config.get("min_replicas", 1),
 46            max_replica_count=model_config.get("max_replicas", 10),
 47            accelerator_type=model_config.get("accelerator_type"),
 48            accelerator_count=model_config.get("accelerator_count"),
 49            traffic_percentage=100,
 50            sync=True
 51        )
 52        
 53        return {
 54            "model_id": model.resource_name,
 55            "endpoint_id": endpoint.resource_name,
 56            "prediction_url": endpoint.predict_url
 57        }
 58    
 59    def setup_auto_scaling(self, endpoint_name: str, scaling_config: dict):
 60        """設定自動擴展策略"""
 61        client = gapic.EndpointServiceClient()
 62        
 63        # 自動擴展設定
 64        autoscaling_config = {
 65            "min_replica_count": scaling_config["min_replicas"],
 66            "max_replica_count": scaling_config["max_replicas"],
 67            "target_utilization": scaling_config.get("target_cpu_utilization", 70),
 68            "scale_in_replicas": scaling_config.get("scale_in_replicas", 1),
 69            "scale_out_replicas": scaling_config.get("scale_out_replicas", 2)
 70        }
 71        
 72        # 更新端點配置
 73        update_request = gapic.UpdateEndpointRequest(
 74            endpoint={
 75                "name": endpoint_name,
 76                "traffic_split": {"0": 100},
 77                "deployed_models": [{
 78                    "automatic_resources": {
 79                        "min_replica_count": autoscaling_config["min_replica_count"],
 80                        "max_replica_count": autoscaling_config["max_replica_count"]
 81                    }
 82                }]
 83            }
 84        )
 85        
 86        operation = client.update_endpoint(request=update_request)
 87        return operation.result()
 88
 89# 部署配置範例
 90deployment_config = {
 91    "model_name": "enterprise-llm-v1",
 92    "display_name": "Enterprise LLM Model",
 93    "version": "1.0.0",
 94    "container_image": "gcr.io/project-id/enterprise-llm:latest",
 95    "model_artifacts_uri": "gs://project-bucket/models/enterprise-llm-v1/",
 96    "machine_type": "n1-highmem-8",
 97    "min_replicas": 2,
 98    "max_replicas": 20,
 99    "accelerator_type": "NVIDIA_TESLA_T4",
100    "accelerator_count": 1,
101    "env_vars": {
102        "MAX_SEQUENCE_LENGTH": "2048",
103        "TEMPERATURE": "0.7",
104        "TOP_P": "0.9"
105    }
106}

AWS 企業級部署架構

Amazon SageMaker 多模型端點:

  1import boto3
  2import sagemaker
  3from sagemaker.multidatamodel import MultiDataModel
  4from sagemaker.pytorch import PyTorchModel
  5
  6class AWSAIDeploymentManager:
  7    def __init__(self, region: str, role_arn: str):
  8        self.region = region
  9        self.role_arn = role_arn
 10        self.session = sagemaker.Session()
 11        self.s3_client = boto3.client('s3')
 12        
 13    def deploy_multi_model_endpoint(self, models_config: list):
 14        """部署多模型端點以節省成本"""
 15        
 16        # 設定多模型容器
 17        container = {
 18            'Image': '763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference:1.12.0-gpu-py38',
 19            'ModelDataUrl': 's3://my-bucket/models/',
 20            'Mode': 'MultiModel'
 21        }
 22        
 23        # 創建多模型數據模型
 24        multi_data_model = MultiDataModel(
 25            name="enterprise-multi-model",
 26            model_data_prefix="s3://my-bucket/models/",
 27            role=self.role_arn,
 28            container_def=container,
 29            sagemaker_session=self.session
 30        )
 31        
 32        # 部署端點
 33        predictor = multi_data_model.deploy(
 34            initial_instance_count=2,
 35            instance_type='ml.g4dn.xlarge',
 36            endpoint_name='enterprise-multi-model-endpoint',
 37            data_capture_config={
 38                'EnableCapture': True,
 39                'InitialSamplingPercentage': 100,
 40                'DestinationS3Uri': 's3://my-bucket/data-capture/',
 41                'CaptureOptions': [
 42                    {'CaptureMode': 'Input'},
 43                    {'CaptureMode': 'Output'}
 44                ]
 45            }
 46        )
 47        
 48        return predictor
 49    
 50    def setup_serverless_inference(self, model_config: dict):
 51        """設定無伺服器推理端點"""
 52        
 53        model = PyTorchModel(
 54            entry_point='inference.py',
 55            source_dir='code/',
 56            model_data=model_config['model_data_url'],
 57            role=self.role_arn,
 58            framework_version='1.12.0',
 59            py_version='py38',
 60            predictor_cls=sagemaker.predictor.Predictor
 61        )
 62        
 63        # 無伺服器配置
 64        serverless_config = sagemaker.ServerlessInferenceConfig(
 65            memory_size_in_mb=6144,  # 6GB
 66            max_concurrency=50,
 67            provisioned_concurrency=10
 68        )
 69        
 70        predictor = model.deploy(
 71            serverless_inference_config=serverless_config,
 72            endpoint_name=f"serverless-{model_config['name']}"
 73        )
 74        
 75        return predictor
 76    
 77    def configure_auto_scaling(self, endpoint_name: str, scaling_policy: dict):
 78        """配置 SageMaker 端點自動擴展"""
 79        
 80        autoscaling_client = boto3.client('application-autoscaling')
 81        
 82        # 註冊可擴展目標
 83        autoscaling_client.register_scalable_target(
 84            ServiceNamespace='sagemaker',
 85            ResourceId=f'endpoint/{endpoint_name}/variant/AllTraffic',
 86            ScalableDimension='sagemaker:variant:DesiredInstanceCount',
 87            MinCapacity=scaling_policy['min_capacity'],
 88            MaxCapacity=scaling_policy['max_capacity'],
 89            RoleARN=self.role_arn
 90        )
 91        
 92        # 設定擴展策略
 93        autoscaling_client.put_scaling_policy(
 94            PolicyName=f'{endpoint_name}-target-tracking-policy',
 95            ServiceNamespace='sagemaker',
 96            ResourceId=f'endpoint/{endpoint_name}/variant/AllTraffic',
 97            ScalableDimension='sagemaker:variant:DesiredInstanceCount',
 98            PolicyType='TargetTrackingScaling',
 99            TargetTrackingScalingPolicyConfiguration={
100                'TargetValue': scaling_policy['target_invocations_per_instance'],
101                'PredefinedMetricSpecification': {
102                    'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance'
103                },
104                'ScaleOutCooldown': 300,
105                'ScaleInCooldown': 600
106            }
107        )

Azure OpenAI 企業整合

Azure 認知服務部署:

 1from azure.identity import DefaultAzureCredential
 2from azure.mgmt.cognitiveservices import CognitiveServicesManagementClient
 3from azure.cognitiveservices.language.textanalytics import TextAnalyticsClient
 4
 5class AzureAIDeploymentManager:
 6    def __init__(self, subscription_id: str, resource_group: str):
 7        self.subscription_id = subscription_id
 8        self.resource_group = resource_group
 9        self.credential = DefaultAzureCredential()
10        
11    def deploy_openai_service(self, deployment_config: dict):
12        """部署 Azure OpenAI 服務"""
13        
14        client = CognitiveServicesManagementClient(
15            self.credential, 
16            self.subscription_id
17        )
18        
19        # 創建認知服務帳戶
20        account_info = {
21            'location': deployment_config['location'],
22            'sku': {'name': deployment_config['sku']},
23            'kind': 'OpenAI',
24            'properties': {
25                'customSubDomainName': deployment_config['subdomain'],
26                'publicNetworkAccess': 'Enabled',
27                'networkAcls': {
28                    'defaultAction': 'Allow',
29                    'virtualNetworkRules': [],
30                    'ipRules': deployment_config.get('allowed_ips', [])
31                }
32            }
33        }
34        
35        operation = client.accounts.begin_create(
36            resource_group_name=self.resource_group,
37            account_name=deployment_config['account_name'],
38            account=account_info
39        )
40        
41        account = operation.result()
42        
43        # 部署模型
44        model_deployments = []
45        for model_config in deployment_config['models']:
46            deployment = client.deployments.begin_create_or_update(
47                resource_group_name=self.resource_group,
48                account_name=deployment_config['account_name'],
49                deployment_name=model_config['deployment_name'],
50                deployment={
51                    'properties': {
52                        'model': {
53                            'format': 'OpenAI',
54                            'name': model_config['model_name'],
55                            'version': model_config['version']
56                        },
57                        'scaleSettings': {
58                            'scaleType': 'Standard',
59                            'capacity': model_config['capacity']
60                        }
61                    }
62                }
63            )
64            model_deployments.append(deployment.result())
65        
66        return {
67            'account': account,
68            'deployments': model_deployments
69        }
70    
71    def setup_private_endpoint(self, config: dict):
72        """設定私有端點以增強安全性"""
73        
74        from azure.mgmt.network import NetworkManagementClient
75        
76        network_client = NetworkManagementClient(
77            self.credential,
78            self.subscription_id
79        )
80        
81        # 創建私有端點
82        private_endpoint_params = {
83            'location': config['location'],
84            'subnet': {'id': config['subnet_id']},
85            'privateLinkServiceConnections': [{
86                'name': f"{config['service_name']}-connection",
87                'privateLinkServiceId': config['service_resource_id'],
88                'groupIds': ['account']
89            }]
90        }
91        
92        operation = network_client.private_endpoints.begin_create_or_update(
93            resource_group_name=self.resource_group,
94            private_endpoint_name=config['endpoint_name'],
95            parameters=private_endpoint_params
96        )
97        
98        return operation.result()

2. 企業安全框架與合規性

身份驗證與授權系統

企業級 RBAC 實作:

  1from enum import Enum
  2from dataclasses import dataclass
  3from typing import Set, List, Dict, Optional
  4import jwt
  5import hashlib
  6from datetime import datetime, timedelta
  7
  8class UserRole(Enum):
  9    ADMIN = "admin"
 10    DATA_SCIENTIST = "data_scientist"
 11    ANALYST = "analyst"
 12    VIEWER = "viewer"
 13    EXTERNAL_CLIENT = "external_client"
 14
 15class ResourceType(Enum):
 16    MODEL = "model"
 17    DATASET = "dataset"
 18    PIPELINE = "pipeline"
 19    ENDPOINT = "endpoint"
 20    LOGS = "logs"
 21
 22class Permission(Enum):
 23    CREATE = "create"
 24    READ = "read"
 25    UPDATE = "update"
 26    DELETE = "delete"
 27    EXECUTE = "execute"
 28    DEPLOY = "deploy"
 29
 30@dataclass
 31class AccessControl:
 32    resource_type: ResourceType
 33    resource_id: str
 34    permissions: Set[Permission]
 35    conditions: Dict[str, any] = None
 36
 37class EnterpriseSecurityManager:
 38    def __init__(self, secret_key: str):
 39        self.secret_key = secret_key
 40        self.role_permissions = self._setup_role_permissions()
 41        self.audit_log = []
 42        
 43    def _setup_role_permissions(self) -> Dict[UserRole, List[AccessControl]]:
 44        """定義角色權限矩陣"""
 45        return {
 46            UserRole.ADMIN: [
 47                AccessControl(ResourceType.MODEL, "*", {Permission.CREATE, Permission.READ, Permission.UPDATE, Permission.DELETE, Permission.DEPLOY}),
 48                AccessControl(ResourceType.DATASET, "*", {Permission.CREATE, Permission.READ, Permission.UPDATE, Permission.DELETE}),
 49                AccessControl(ResourceType.PIPELINE, "*", {Permission.CREATE, Permission.READ, Permission.UPDATE, Permission.DELETE, Permission.EXECUTE}),
 50                AccessControl(ResourceType.LOGS, "*", {Permission.READ})
 51            ],
 52            UserRole.DATA_SCIENTIST: [
 53                AccessControl(ResourceType.MODEL, "*", {Permission.CREATE, Permission.READ, Permission.UPDATE}),
 54                AccessControl(ResourceType.DATASET, "*", {Permission.READ}),
 55                AccessControl(ResourceType.PIPELINE, "*", {Permission.CREATE, Permission.READ, Permission.EXECUTE})
 56            ],
 57            UserRole.ANALYST: [
 58                AccessControl(ResourceType.MODEL, "*", {Permission.READ}),
 59                AccessControl(ResourceType.DATASET, "*", {Permission.READ}),
 60                AccessControl(ResourceType.PIPELINE, "*", {Permission.READ, Permission.EXECUTE})
 61            ],
 62            UserRole.VIEWER: [
 63                AccessControl(ResourceType.MODEL, "*", {Permission.READ}),
 64                AccessControl(ResourceType.DATASET, "public_*", {Permission.READ})
 65            ],
 66            UserRole.EXTERNAL_CLIENT: [
 67                AccessControl(ResourceType.ENDPOINT, "client_*", {Permission.READ}, 
 68                           conditions={"time_limit": "business_hours", "rate_limit": 1000})
 69            ]
 70        }
 71    
 72    def authenticate_user(self, token: str) -> Optional[Dict]:
 73        """驗證用戶令牌"""
 74        try:
 75            payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
 76            
 77            # 檢查令牌是否過期
 78            if datetime.utcnow() > datetime.fromtimestamp(payload['exp']):
 79                return None
 80                
 81            return {
 82                'user_id': payload['user_id'],
 83                'role': UserRole(payload['role']),
 84                'permissions': payload.get('permissions', []),
 85                'organization': payload.get('organization'),
 86                'expires_at': payload['exp']
 87            }
 88        except jwt.InvalidTokenError:
 89            return None
 90    
 91    def authorize_action(self, user_context: Dict, resource_type: ResourceType, 
 92                        resource_id: str, action: Permission) -> bool:
 93        """授權檢查"""
 94        user_role = user_context['role']
 95        user_permissions = self.role_permissions.get(user_role, [])
 96        
 97        for access_control in user_permissions:
 98            if (access_control.resource_type == resource_type and
 99                self._match_resource_pattern(access_control.resource_id, resource_id) and
100                action in access_control.permissions):
101                
102                # 檢查額外條件
103                if access_control.conditions:
104                    if not self._check_conditions(access_control.conditions, user_context):
105                        continue
106                
107                # 記錄訪問日誌
108                self._log_access(user_context, resource_type, resource_id, action, "GRANTED")
109                return True
110        
111        self._log_access(user_context, resource_type, resource_id, action, "DENIED")
112        return False
113    
114    def _match_resource_pattern(self, pattern: str, resource_id: str) -> bool:
115        """資源模式匹配"""
116        if pattern == "*":
117            return True
118        if pattern.endswith("*"):
119            return resource_id.startswith(pattern[:-1])
120        return pattern == resource_id
121    
122    def _check_conditions(self, conditions: Dict, user_context: Dict) -> bool:
123        """檢查額外訪問條件"""
124        # 時間限制檢查
125        if "time_limit" in conditions:
126            current_hour = datetime.now().hour
127            if conditions["time_limit"] == "business_hours":
128                if not (9 <= current_hour <= 17):
129                    return False
130        
131        # 速率限制檢查
132        if "rate_limit" in conditions:
133            # 實作速率限制邏輯
134            pass
135        
136        return True
137    
138    def _log_access(self, user_context: Dict, resource_type: ResourceType,
139                   resource_id: str, action: Permission, result: str):
140        """記錄訪問審計日誌"""
141        log_entry = {
142            'timestamp': datetime.utcnow().isoformat(),
143            'user_id': user_context['user_id'],
144            'organization': user_context.get('organization'),
145            'resource_type': resource_type.value,
146            'resource_id': resource_id,
147            'action': action.value,
148            'result': result,
149            'ip_address': user_context.get('ip_address'),
150            'user_agent': user_context.get('user_agent')
151        }
152        
153        self.audit_log.append(log_entry)
154        
155        # 實際環境中應該寫入外部審計系統
156        print(f"AUDIT: {log_entry}")

數據安全與加密

端到端數據保護:

  1from cryptography.fernet import Fernet
  2from cryptography.hazmat.primitives import hashes, serialization
  3from cryptography.hazmat.primitives.asymmetric import rsa, padding
  4import base64
  5
  6class DataSecurityManager:
  7    def __init__(self):
  8        self.symmetric_key = Fernet.generate_key()
  9        self.cipher_suite = Fernet(self.symmetric_key)
 10        
 11        # 生成 RSA 密鑰對
 12        self.private_key = rsa.generate_private_key(
 13            public_exponent=65537,
 14            key_size=2048
 15        )
 16        self.public_key = self.private_key.public_key()
 17    
 18    def encrypt_sensitive_data(self, data: bytes, classification: str = "confidential") -> Dict:
 19        """根據數據分類進行加密"""
 20        
 21        if classification == "public":
 22            # 公開數據不需加密
 23            return {
 24                "data": base64.b64encode(data).decode(),
 25                "encrypted": False,
 26                "classification": classification
 27            }
 28        
 29        elif classification in ["internal", "confidential", "restricted"]:
 30            # 使用對稱加密
 31            encrypted_data = self.cipher_suite.encrypt(data)
 32            
 33            return {
 34                "data": base64.b64encode(encrypted_data).decode(),
 35                "encrypted": True,
 36                "encryption_type": "symmetric",
 37                "classification": classification,
 38                "key_id": self._get_key_fingerprint(self.symmetric_key)
 39            }
 40        
 41        elif classification == "top_secret":
 42            # 使用非對稱加密
 43            encrypted_data = self.public_key.encrypt(
 44                data,
 45                padding.OAEP(
 46                    mgf=padding.MGF1(algorithm=hashes.SHA256()),
 47                    algorithm=hashes.SHA256(),
 48                    label=None
 49                )
 50            )
 51            
 52            return {
 53                "data": base64.b64encode(encrypted_data).decode(),
 54                "encrypted": True,
 55                "encryption_type": "asymmetric",
 56                "classification": classification
 57            }
 58    
 59    def decrypt_sensitive_data(self, encrypted_package: Dict) -> bytes:
 60        """解密敏感數據"""
 61        
 62        if not encrypted_package["encrypted"]:
 63            return base64.b64decode(encrypted_package["data"])
 64        
 65        encrypted_data = base64.b64decode(encrypted_package["data"])
 66        
 67        if encrypted_package["encryption_type"] == "symmetric":
 68            return self.cipher_suite.decrypt(encrypted_data)
 69        
 70        elif encrypted_package["encryption_type"] == "asymmetric":
 71            return self.private_key.decrypt(
 72                encrypted_data,
 73                padding.OAEP(
 74                    mgf=padding.MGF1(algorithm=hashes.SHA256()),
 75                    algorithm=hashes.SHA256(),
 76                    label=None
 77                )
 78            )
 79    
 80    def _get_key_fingerprint(self, key: bytes) -> str:
 81        """生成密鑰指紋"""
 82        digest = hashes.Hash(hashes.SHA256())
 83        digest.update(key)
 84        return base64.b64encode(digest.finalize()).decode()[:16]
 85    
 86    def implement_data_masking(self, data: Dict, user_role: UserRole) -> Dict:
 87        """根據用戶角色實施數據遮罩"""
 88        
 89        masking_rules = {
 90            UserRole.EXTERNAL_CLIENT: {
 91                "personal_id": "***MASKED***",
 92                "email": lambda x: x.split('@')[0][:3] + "***@" + x.split('@')[1],
 93                "phone": lambda x: x[:3] + "***" + x[-4:] if len(x) > 7 else "***MASKED***"
 94            },
 95            UserRole.ANALYST: {
 96                "personal_id": lambda x: x[:4] + "***" + x[-2:] if len(x) > 6 else "***MASKED***"
 97            },
 98            UserRole.DATA_SCIENTIST: {
 99                # 數據科學家可以看到更多詳細數據,但仍需遮罩部分敏感信息
100            },
101            UserRole.ADMIN: {
102                # 管理員可以看到所有數據
103            }
104        }
105        
106        rules = masking_rules.get(user_role, {})
107        masked_data = data.copy()
108        
109        for field, mask_func in rules.items():
110            if field in masked_data:
111                if callable(mask_func):
112                    masked_data[field] = mask_func(str(masked_data[field]))
113                else:
114                    masked_data[field] = mask_func
115        
116        return masked_data

3. RAG (Retrieval-Augmented Generation) 架構設計

企業級向量數據庫實作

向量搜尋與語意檢索:

  1import chromadb
  2from sentence_transformers import SentenceTransformer
  3import numpy as np
  4from typing import List, Dict, Optional
  5import hashlib
  6
  7class EnterpriseRAGSystem:
  8    def __init__(self, collection_name: str = "enterprise_knowledge"):
  9        # 初始化向量數據庫
 10        self.chroma_client = chromadb.PersistentClient(path="./vector_db")
 11        
 12        # 初始化嵌入模型
 13        self.embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
 14        
 15        # 創建或獲取集合
 16        self.collection = self.chroma_client.get_or_create_collection(
 17            name=collection_name,
 18            metadata={"hnsw:space": "cosine"}
 19        )
 20        
 21    def add_documents(self, documents: List[Dict[str, any]], batch_size: int = 100):
 22        """批量添加文件到向量數據庫"""
 23        
 24        for i in range(0, len(documents), batch_size):
 25            batch = documents[i:i+batch_size]
 26            
 27            # 準備批次數據
 28            texts = [doc['content'] for doc in batch]
 29            metadatas = []
 30            ids = []
 31            
 32            for doc in batch:
 33                # 生成唯一 ID
 34                doc_id = self._generate_doc_id(doc)
 35                ids.append(doc_id)
 36                
 37                # 準備元數據
 38                metadata = {
 39                    'source': doc.get('source', 'unknown'),
 40                    'title': doc.get('title', ''),
 41                    'category': doc.get('category', 'general'),
 42                    'access_level': doc.get('access_level', 'internal'),
 43                    'created_at': doc.get('created_at', ''),
 44                    'last_updated': doc.get('last_updated', ''),
 45                    'content_length': len(doc['content'])
 46                }
 47                metadatas.append(metadata)
 48            
 49            # 生成嵌入向量
 50            embeddings = self.embedding_model.encode(texts).tolist()
 51            
 52            # 添加到向量數據庫
 53            self.collection.add(
 54                embeddings=embeddings,
 55                documents=texts,
 56                metadatas=metadatas,
 57                ids=ids
 58            )
 59    
 60    def semantic_search(self, query: str, n_results: int = 5, 
 61                       access_level: str = "internal",
 62                       filters: Optional[Dict] = None) -> List[Dict]:
 63        """執行語意搜尋"""
 64        
 65        # 生成查詢嵌入
 66        query_embedding = self.embedding_model.encode([query]).tolist()
 67        
 68        # 構建過濾條件
 69        where_clause = {"access_level": {"$in": self._get_allowed_access_levels(access_level)}}
 70        
 71        if filters:
 72            where_clause.update(filters)
 73        
 74        # 執行搜尋
 75        results = self.collection.query(
 76            query_embeddings=query_embedding,
 77            n_results=n_results,
 78            where=where_clause,
 79            include=["documents", "metadatas", "distances"]
 80        )
 81        
 82        # 格式化結果
 83        formatted_results = []
 84        for i, doc in enumerate(results['documents'][0]):
 85            formatted_results.append({
 86                'content': doc,
 87                'metadata': results['metadatas'][0][i],
 88                'similarity_score': 1 - results['distances'][0][i],  # 轉換為相似度分數
 89                'relevance_rank': i + 1
 90            })
 91        
 92        return formatted_results
 93    
 94    def hybrid_search(self, query: str, keyword_weight: float = 0.3, 
 95                     semantic_weight: float = 0.7) -> List[Dict]:
 96        """混合搜尋:結合關鍵詞與語意搜尋"""
 97        
 98        # 語意搜尋
 99        semantic_results = self.semantic_search(query, n_results=20)
100        
101        # 關鍵詞搜尋 (簡化實作)
102        keyword_results = self._keyword_search(query, n_results=20)
103        
104        # 合併與重新排序
105        combined_scores = {}
106        
107        for result in semantic_results:
108            doc_id = result['metadata'].get('id', hash(result['content']))
109            combined_scores[doc_id] = {
110                'doc': result,
111                'semantic_score': result['similarity_score'],
112                'keyword_score': 0
113            }
114        
115        for result in keyword_results:
116            doc_id = result['metadata'].get('id', hash(result['content']))
117            if doc_id in combined_scores:
118                combined_scores[doc_id]['keyword_score'] = result['score']
119            else:
120                combined_scores[doc_id] = {
121                    'doc': result,
122                    'semantic_score': 0,
123                    'keyword_score': result['score']
124                }
125        
126        # 計算最終分數
127        final_results = []
128        for doc_id, scores in combined_scores.items():
129            final_score = (semantic_weight * scores['semantic_score'] + 
130                          keyword_weight * scores['keyword_score'])
131            
132            result_doc = scores['doc'].copy()
133            result_doc['final_score'] = final_score
134            final_results.append(result_doc)
135        
136        # 按分數排序
137        final_results.sort(key=lambda x: x['final_score'], reverse=True)
138        
139        return final_results[:10]
140    
141    def _generate_doc_id(self, doc: Dict) -> str:
142        """生成文件唯一 ID"""
143        content_hash = hashlib.md5(doc['content'].encode()).hexdigest()
144        source = doc.get('source', 'unknown')
145        return f"{source}_{content_hash[:12]}"
146    
147    def _get_allowed_access_levels(self, user_access_level: str) -> List[str]:
148        """根據用戶權限獲取允許訪問的數據級別"""
149        access_hierarchy = {
150            "public": ["public"],
151            "internal": ["public", "internal"],
152            "confidential": ["public", "internal", "confidential"],
153            "restricted": ["public", "internal", "confidential", "restricted"]
154        }
155        
156        return access_hierarchy.get(user_access_level, ["public"])
157    
158    def _keyword_search(self, query: str, n_results: int) -> List[Dict]:
159        """關鍵詞搜尋實作 (簡化版)"""
160        # 實際環境中應使用如 Elasticsearch 等全文搜尋引擎
161        query_terms = query.lower().split()
162        
163        # 獲取所有文檔
164        all_docs = self.collection.get()
165        
166        scored_docs = []
167        for i, doc in enumerate(all_docs['documents']):
168            score = sum(term in doc.lower() for term in query_terms) / len(query_terms)
169            if score > 0:
170                scored_docs.append({
171                    'content': doc,
172                    'metadata': all_docs['metadatas'][i],
173                    'score': score
174                })
175        
176        scored_docs.sort(key=lambda x: x['score'], reverse=True)
177        return scored_docs[:n_results]

智能文件處理與分塊策略

適應性文件分塊:

  1import re
  2from typing import List, Dict
  3from dataclasses import dataclass
  4
  5@dataclass
  6class DocumentChunk:
  7    content: str
  8    chunk_type: str
  9    metadata: Dict
 10    start_position: int
 11    end_position: int
 12    chunk_id: str
 13
 14class IntelligentDocumentChunker:
 15    def __init__(self, max_chunk_size: int = 512, overlap_size: int = 50):
 16        self.max_chunk_size = max_chunk_size
 17        self.overlap_size = overlap_size
 18        
 19    def process_document(self, document: Dict) -> List[DocumentChunk]:
 20        """智能文件分塊處理"""
 21        
 22        content = document['content']
 23        doc_type = self._detect_document_type(content)
 24        
 25        if doc_type == "code":
 26            return self._chunk_code_document(content, document)
 27        elif doc_type == "structured":
 28            return self._chunk_structured_document(content, document)
 29        elif doc_type == "academic":
 30            return self._chunk_academic_document(content, document)
 31        else:
 32            return self._chunk_general_document(content, document)
 33    
 34    def _detect_document_type(self, content: str) -> str:
 35        """檢測文件類型"""
 36        
 37        # 程式碼檢測
 38        code_patterns = [
 39            r'def\s+\w+\(',  # Python 函數
 40            r'function\s+\w+\(',  # JavaScript 函數
 41            r'class\s+\w+',  # 類別定義
 42            r'import\s+\w+',  # 導入語句
 43            r'\{\s*[\w\s:]+\s*\}',  # JSON 物件
 44        ]
 45        
 46        if any(re.search(pattern, content) for pattern in code_patterns):
 47            return "code"
 48        
 49        # 學術論文檢測
 50        academic_patterns = [
 51            r'Abstract\s*:',
 52            r'Keywords\s*:',
 53            r'References\s*\n',
 54            r'Fig\.\s+\d+',
 55            r'Table\s+\d+',
 56        ]
 57        
 58        if any(re.search(pattern, content, re.IGNORECASE) for pattern in academic_patterns):
 59            return "academic"
 60        
 61        # 結構化文件檢測
 62        structured_patterns = [
 63            r'^#+\s',  # Markdown 標題
 64            r'^\d+\.\s',  # 編號列表
 65            r'^\*\s',  # 項目符號
 66        ]
 67        
 68        if any(re.search(pattern, content, re.MULTILINE) for pattern in structured_patterns):
 69            return "structured"
 70        
 71        return "general"
 72    
 73    def _chunk_code_document(self, content: str, document: Dict) -> List[DocumentChunk]:
 74        """程式碼文件分塊"""
 75        chunks = []
 76        
 77        # 按函數/類別分塊
 78        function_pattern = r'(def\s+\w+.*?(?=def\s+\w+|class\s+\w+|$))'
 79        class_pattern = r'(class\s+\w+.*?(?=class\s+\w+|def\s+\w+|$))'
 80        
 81        code_blocks = re.findall(f'{class_pattern}|{function_pattern}', content, re.DOTALL)
 82        
 83        position = 0
 84        for i, block_match in enumerate(code_blocks):
 85            block = block_match[0] or block_match[1]  # 取非空的匹配
 86            
 87            if len(block.strip()) > 0:
 88                chunk = DocumentChunk(
 89                    content=block.strip(),
 90                    chunk_type="code_block",
 91                    metadata={
 92                        **document.get('metadata', {}),
 93                        'block_index': i,
 94                        'programming_language': self._detect_language(block)
 95                    },
 96                    start_position=position,
 97                    end_position=position + len(block),
 98                    chunk_id=f"{document.get('id', 'unknown')}_code_{i}"
 99                )
100                chunks.append(chunk)
101            
102            position += len(block)
103        
104        return chunks
105    
106    def _chunk_structured_document(self, content: str, document: Dict) -> List[DocumentChunk]:
107        """結構化文件分塊(按章節)"""
108        chunks = []
109        
110        # 按標題分塊
111        section_pattern = r'(^#+\s+.+$)'
112        sections = re.split(section_pattern, content, flags=re.MULTILINE)
113        
114        current_section = ""
115        section_content = ""
116        position = 0
117        
118        for i, part in enumerate(sections):
119            if re.match(r'^#+\s+', part):
120                # 這是一個標題
121                if section_content.strip():
122                    # 保存前一個章節
123                    chunk = DocumentChunk(
124                        content=f"{current_section}\n{section_content}".strip(),
125                        chunk_type="section",
126                        metadata={
127                            **document.get('metadata', {}),
128                            'section_title': current_section.strip('#').strip(),
129                            'section_level': len(current_section) - len(current_section.lstrip('#'))
130                        },
131                        start_position=position - len(section_content),
132                        end_position=position,
133                        chunk_id=f"{document.get('id', 'unknown')}_section_{len(chunks)}"
134                    )
135                    chunks.append(chunk)
136                
137                current_section = part
138                section_content = ""
139            else:
140                section_content += part
141            
142            position += len(part)
143        
144        # 添加最後一個章節
145        if section_content.strip():
146            chunk = DocumentChunk(
147                content=f"{current_section}\n{section_content}".strip(),
148                chunk_type="section",
149                metadata={
150                    **document.get('metadata', {}),
151                    'section_title': current_section.strip('#').strip() if current_section else "Final Section"
152                },
153                start_position=position - len(section_content),
154                end_position=position,
155                chunk_id=f"{document.get('id', 'unknown')}_section_{len(chunks)}"
156            )
157            chunks.append(chunk)
158        
159        return chunks
160    
161    def _chunk_general_document(self, content: str, document: Dict) -> List[DocumentChunk]:
162        """一般文件分塊(固定大小 + 重疊)"""
163        chunks = []
164        words = content.split()
165        
166        for i in range(0, len(words), self.max_chunk_size - self.overlap_size):
167            chunk_words = words[i:i + self.max_chunk_size]
168            chunk_content = ' '.join(chunk_words)
169            
170            chunk = DocumentChunk(
171                content=chunk_content,
172                chunk_type="text_chunk",
173                metadata={
174                    **document.get('metadata', {}),
175                    'chunk_index': i // (self.max_chunk_size - self.overlap_size),
176                    'word_count': len(chunk_words)
177                },
178                start_position=i,
179                end_position=i + len(chunk_words),
180                chunk_id=f"{document.get('id', 'unknown')}_chunk_{len(chunks)}"
181            )
182            chunks.append(chunk)
183        
184        return chunks
185    
186    def _detect_language(self, code: str) -> str:
187        """檢測程式語言"""
188        language_patterns = {
189            'python': [r'def\s+\w+', r'import\s+\w+', r'class\s+\w+', r'if\s+__name__'],
190            'javascript': [r'function\s+\w+', r'var\s+\w+', r'const\s+\w+', r'=>'],
191            'java': [r'public\s+class', r'import\s+java\.', r'public\s+static\s+void'],
192            'cpp': [r'#include\s*<', r'int\s+main\s*\(', r'std::'],
193            'sql': [r'SELECT\s+', r'FROM\s+', r'WHERE\s+', r'INSERT\s+INTO']
194        }
195        
196        for language, patterns in language_patterns.items():
197            if any(re.search(pattern, code, re.IGNORECASE) for pattern in patterns):
198                return language
199        
200        return "unknown"

4. 企業數據管道與 ETL 架構

即時數據處理管道

Apache Kafka + Apache Spark 整合:

  1from kafka import KafkaProducer, KafkaConsumer
  2from pyspark.sql import SparkSession
  3from pyspark.sql.functions import *
  4from pyspark.sql.types import *
  5import json
  6
  7class EnterpriseDataPipeline:
  8    def __init__(self, kafka_config: Dict, spark_config: Dict):
  9        self.kafka_config = kafka_config
 10        
 11        # 初始化 Spark Session
 12        self.spark = SparkSession.builder \
 13            .appName("EnterpriseAIPipeline") \
 14            .config("spark.sql.adaptive.enabled", "true") \
 15            .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
 16            .getOrCreate()
 17        
 18        # 設定日誌級別
 19        self.spark.sparkContext.setLogLevel("WARN")
 20        
 21    def create_streaming_pipeline(self, input_topic: str, output_topic: str):
 22        """創建即時數據處理管道"""
 23        
 24        # 從 Kafka 讀取串流數據
 25        df = self.spark \
 26            .readStream \
 27            .format("kafka") \
 28            .option("kafka.bootstrap.servers", self.kafka_config['bootstrap_servers']) \
 29            .option("subscribe", input_topic) \
 30            .option("startingOffsets", "latest") \
 31            .load()
 32        
 33        # 定義數據結構
 34        schema = StructType([
 35            StructField("user_id", StringType(), True),
 36            StructField("event_type", StringType(), True),
 37            StructField("timestamp", TimestampType(), True),
 38            StructField("data", MapType(StringType(), StringType()), True)
 39        ])
 40        
 41        # 解析 JSON 數據
 42        parsed_df = df.select(
 43            from_json(col("value").cast("string"), schema).alias("parsed_data"),
 44            col("timestamp").alias("kafka_timestamp")
 45        ).select("parsed_data.*", "kafka_timestamp")
 46        
 47        # 數據清洗與轉換
 48        cleaned_df = self._clean_and_transform_data(parsed_df)
 49        
 50        # 特徵工程
 51        enriched_df = self._feature_engineering(cleaned_df)
 52        
 53        # 異常檢測
 54        anomaly_df = self._detect_anomalies(enriched_df)
 55        
 56        # 輸出到 Kafka
 57        query = anomaly_df \
 58            .select(to_json(struct("*")).alias("value")) \
 59            .writeStream \
 60            .format("kafka") \
 61            .option("kafka.bootstrap.servers", self.kafka_config['bootstrap_servers']) \
 62            .option("topic", output_topic) \
 63            .option("checkpointLocation", "/tmp/kafka-checkpoint") \
 64            .outputMode("append") \
 65            .start()
 66        
 67        return query
 68    
 69    def _clean_and_transform_data(self, df):
 70        """數據清洗與轉換"""
 71        
 72        return df \
 73            .filter(col("user_id").isNotNull()) \
 74            .filter(col("event_type").isin(["click", "view", "purchase", "search"])) \
 75            .withColumn("hour", hour(col("timestamp"))) \
 76            .withColumn("day_of_week", dayofweek(col("timestamp"))) \
 77            .withColumn("is_weekend", when(col("day_of_week").isin([1, 7]), 1).otherwise(0))
 78    
 79    def _feature_engineering(self, df):
 80        """特徵工程"""
 81        
 82        # 時間窗口聚合
 83        windowed_df = df \
 84            .withWatermark("timestamp", "10 minutes") \
 85            .groupBy(
 86                col("user_id"),
 87                window(col("timestamp"), "5 minutes")
 88            ) \
 89            .agg(
 90                count("*").alias("event_count"),
 91                countDistinct("event_type").alias("unique_event_types"),
 92                collect_list("event_type").alias("event_sequence")
 93            )
 94        
 95        # 添加衍生特徵
 96        enhanced_df = windowed_df \
 97            .withColumn("events_per_minute", col("event_count") / 5.0) \
 98            .withColumn("event_diversity", col("unique_event_types") / col("event_count"))
 99        
100        return enhanced_df
101    
102    def _detect_anomalies(self, df):
103        """異常檢測"""
104        
105        # 簡單的統計異常檢測
106        stats_df = df \
107            .select(
108                mean("event_count").alias("mean_events"),
109                stddev("event_count").alias("stddev_events")
110            )
111        
112        # 收集統計信息
113        stats = stats_df.collect()[0]
114        mean_events = stats["mean_events"]
115        stddev_events = stats["stddev_events"]
116        
117        # 標記異常 (使用 Z-score)
118        threshold = 2.0  # Z-score 閾值
119        
120        anomaly_df = df \
121            .withColumn(
122                "z_score",
123                abs(col("event_count") - lit(mean_events)) / lit(stddev_events)
124            ) \
125            .withColumn(
126                "is_anomaly",
127                when(col("z_score") > threshold, 1).otherwise(0)
128            ) \
129            .withColumn("anomaly_score", col("z_score"))
130        
131        return anomaly_df
132    
133    def setup_batch_processing(self, input_path: str, output_path: str):
134        """設定批次處理作業"""
135        
136        # 讀取批次數據
137        batch_df = self.spark.read \
138            .option("multiline", "true") \
139            .json(input_path)
140        
141        # 數據品質檢查
142        quality_report = self._data_quality_check(batch_df)
143        
144        # 數據處理
145        processed_df = batch_df \
146            .transform(self._clean_and_transform_data) \
147            .transform(self._feature_engineering)
148        
149        # 分區並寫入
150        processed_df \
151            .repartition(col("day_of_week")) \
152            .write \
153            .mode("overwrite") \
154            .partitionBy("day_of_week") \
155            .parquet(output_path)
156        
157        return quality_report
158    
159    def _data_quality_check(self, df):
160        """數據品質檢查"""
161        
162        total_records = df.count()
163        
164        quality_metrics = {
165            "total_records": total_records,
166            "null_user_ids": df.filter(col("user_id").isNull()).count(),
167            "null_timestamps": df.filter(col("timestamp").isNull()).count(),
168            "duplicate_records": total_records - df.dropDuplicates().count(),
169            "completeness_rate": 1.0 - (df.filter(col("user_id").isNull()).count() / total_records)
170        }
171        
172        return quality_metrics

機器學習管道自動化

MLOps 流程實作:

  1from mlflow import mlflow
  2import mlflow.sklearn
  3from sklearn.ensemble import RandomForestClassifier
  4from sklearn.metrics import accuracy_score, precision_score, recall_score
  5import joblib
  6from typing import Dict, Any
  7
  8class MLOpsManager:
  9    def __init__(self, experiment_name: str):
 10        self.experiment_name = experiment_name
 11        mlflow.set_experiment(experiment_name)
 12        
 13    def create_training_pipeline(self, config: Dict):
 14        """創建訓練管道"""
 15        
 16        with mlflow.start_run(run_name=f"training_{config['model_name']}"):
 17            
 18            # 記錄參數
 19            mlflow.log_params(config)
 20            
 21            # 數據準備
 22            X_train, y_train, X_test, y_test = self._prepare_data(config['data_config'])
 23            
 24            # 模型訓練
 25            model = self._train_model(X_train, y_train, config['model_config'])
 26            
 27            # 模型評估
 28            metrics = self._evaluate_model(model, X_test, y_test)
 29            mlflow.log_metrics(metrics)
 30            
 31            # 記錄模型
 32            mlflow.sklearn.log_model(
 33                model, 
 34                "model",
 35                registered_model_name=config['model_name']
 36            )
 37            
 38            # 模型驗證
 39            validation_passed = self._validate_model(model, metrics, config['validation_criteria'])
 40            
 41            if validation_passed:
 42                # 推進到生產環境
 43                self._promote_to_production(config['model_name'], mlflow.active_run().info.run_id)
 44            
 45            return {
 46                "run_id": mlflow.active_run().info.run_id,
 47                "metrics": metrics,
 48                "validation_passed": validation_passed
 49            }
 50    
 51    def _prepare_data(self, data_config: Dict):
 52        """數據準備"""
 53        # 實際實作會從數據湖或數據倉庫讀取數據
 54        # 這裡使用模擬數據
 55        from sklearn.datasets import make_classification
 56        from sklearn.model_selection import train_test_split
 57        
 58        X, y = make_classification(
 59            n_samples=data_config.get('n_samples', 10000),
 60            n_features=data_config.get('n_features', 20),
 61            n_informative=data_config.get('n_informative', 10),
 62            random_state=42
 63        )
 64        
 65        return train_test_split(X, y, test_size=0.2, random_state=42)
 66    
 67    def _train_model(self, X_train, y_train, model_config: Dict):
 68        """模型訓練"""
 69        
 70        model = RandomForestClassifier(
 71            n_estimators=model_config.get('n_estimators', 100),
 72            max_depth=model_config.get('max_depth', 10),
 73            random_state=42
 74        )
 75        
 76        model.fit(X_train, y_train)
 77        return model
 78    
 79    def _evaluate_model(self, model, X_test, y_test):
 80        """模型評估"""
 81        
 82        y_pred = model.predict(X_test)
 83        
 84        return {
 85            "accuracy": accuracy_score(y_test, y_pred),
 86            "precision": precision_score(y_test, y_pred, average='weighted'),
 87            "recall": recall_score(y_test, y_pred, average='weighted')
 88        }
 89    
 90    def _validate_model(self, model, metrics: Dict, criteria: Dict) -> bool:
 91        """模型驗證"""
 92        
 93        for metric_name, threshold in criteria.items():
 94            if metrics.get(metric_name, 0) < threshold:
 95                print(f"Model failed validation: {metric_name} = {metrics.get(metric_name)} < {threshold}")
 96                return False
 97        
 98        return True
 99    
100    def _promote_to_production(self, model_name: str, run_id: str):
101        """推進模型到生產環境"""
102        
103        client = mlflow.tracking.MlflowClient()
104        
105        # 獲取模型版本
106        model_version = client.create_model_version(
107            name=model_name,
108            source=f"runs:/{run_id}/model",
109            run_id=run_id
110        )
111        
112        # 設定為生產階段
113        client.transition_model_version_stage(
114            name=model_name,
115            version=model_version.version,
116            stage="Production"
117        )
118        
119        print(f"Model {model_name} version {model_version.version} promoted to Production")
120    
121    def setup_model_monitoring(self, model_name: str):
122        """設定模型監控"""
123        
124        # 模型效能監控
125        def monitor_model_performance():
126            # 獲取生產模型
127            client = mlflow.tracking.MlflowClient()
128            model_version = client.get_latest_versions(model_name, stages=["Production"])[0]
129            
130            # 載入模型
131            model_uri = f"models:/{model_name}/{model_version.version}"
132            model = mlflow.sklearn.load_model(model_uri)
133            
134            # 獲取新數據並評估
135            # 實際實作會從監控系統獲取新數據
136            new_data = self._get_new_data()
137            
138            if new_data:
139                predictions = model.predict(new_data['X'])
140                
141                # 計算效能指標
142                if 'y' in new_data:  # 如果有真實標籤
143                    accuracy = accuracy_score(new_data['y'], predictions)
144                    
145                    # 記錄監控指標
146                    with mlflow.start_run():
147                        mlflow.log_metric("production_accuracy", accuracy)
148                        mlflow.log_metric("data_drift_score", self._calculate_data_drift(new_data['X']))
149                        
150                    # 檢查是否需要重新訓練
151                    if accuracy < 0.8:  # 效能閾值
152                        self._trigger_retraining(model_name)
153        
154        return monitor_model_performance
155    
156    def _get_new_data(self):
157        """獲取新數據用於監控"""
158        # 實際實作會從生產環境獲取數據
159        return None
160    
161    def _calculate_data_drift(self, new_data):
162        """計算數據漂移"""
163        # 實際實作會使用如 evidently 等工具
164        return 0.0
165    
166    def _trigger_retraining(self, model_name: str):
167        """觸發模型重新訓練"""
168        print(f"Triggering retraining for model: {model_name}")
169        # 實際實作會啟動重新訓練流程

總結

本文深入探討了企業級 AI 整合與部署的關鍵技術:

  1. 雲端平台部署:GCP Vertex AI、AWS SageMaker、Azure OpenAI 的企業級應用
  2. 安全框架:RBAC 權限管理、數據加密、審計日誌與合規性
  3. RAG 架構:向量數據庫、智能文件分塊、語意搜尋與混合檢索
  4. 數據管道:即時串流處理、批次作業、MLOps 自動化與模型監控

下一篇將專注於生產環境 AI 系統的監控與最佳化,包含效能調優、故障診斷與成本管理。

<function_calls> [{“content”: “Plan 5-part AI FDE blog series structure and topics”, “status”: “completed”, “activeForm”: “Planning 5-part AI FDE blog series structure and topics”}, {“content”: “Create Part 1: AI FDE \u57fa\u790e\u6838\u5fc3\u6982\u5ff5\u8207\u6280\u8853\u68e7”, “status”: “completed”, “activeForm”: “Creating Part 1: AI FDE \u57fa\u790e\u6838\u5fc3\u6982\u5ff5\u8207\u6280\u8853\u68e7”}, {“content”: “Create Part 2: \u591a\u667a\u6167\u9ad4\u7cfb\u7d71\u8207\u6846\u67b6\u5be6\u6230”, “status”: “completed”, “activeForm”: “Creating Part 2: \u591a\u667a\u6167\u9ad4\u7cfb\u7d71\u8207\u6846\u67b6\u5be6\u6230”}, {“content”: “Create Part 3: \u4f01\u696d\u7d1a AI \u6574\u5408\u8207\u90e8\u7f72\u7b56\u7565”, “status”: “completed”, “activeForm”: “Creating Part 3: \u4f01\u696d\u7d1a AI \u6574\u5408\u8207\u90e8\u7f72\u7b56\u7565”}, {“content”: “Create Part 4: \u751f\u7522\u74b0\u5883 AI \u7cfb\u7d71\u76e3\u63a7\u8207\u6700\u4f73\u5316”, “status”: “in_progress”, “activeForm”: “Creating Part 4: \u751f\u7522\u74b0\u5883 AI \u7cfb\u7d71\u76e3\u63a7\u8207\u6700\u4f73\u5316”}, {“content”: “Create Part 5: \u5ba2\u6236\u5354\u4f5c\u8207\u554f\u984c\u89e3\u6c7a\u5be6\u52d9”, “status”: “pending”, “activeForm”: “Creating Part 5: \u5ba2\u6236\u5354\u4f5c\u8207\u554f\u984c\u89e3\u6c7a\u5be6\u52d9”}]

Yen

Yen

Yen