端到端MLOps项目(训练、部署、监控)
一、MLOps概述
1.1 什么是MLOps?
importnumpyasnpimportmatplotlib.pyplotaspltfrommatplotlib.patchesimportRectangle,FancyBboxPatchimportwarnings warnings.filterwarnings('ignore')print("="*60)print("MLOps:机器学习运维")print("="*60)# MLOps生命周期fig,ax=plt.subplots(figsize=(14,8))ax.axis('off')# 生命周期阶段stages=[("数据管理",0.1,0.7),("模型训练",0.3,0.7),("模型验证",0.5,0.7),("模型部署",0.7,0.7),("监控运维",0.9,0.7),]forname,x,yinstages:circle=plt.Circle((x,y),0.08,color='lightblue',ec='black')ax.add_patch(circle)ax.text(x,y,name,ha='center',va='center',fontsize=7)ifx<0.85:ax.annotate('',xy=(x+0.18,y),xytext=(x+0.1,y),arrowprops=dict(arrowstyle='->',lw=1))# 循环箭头ax.annotate('',xy=(0.9,0.5),xytext=(0.1,0.5),arrowprops=dict(arrowstyle='->',lw=2,connectionstyle='arc3,rad=-0.3',color='red'))ax.text(0.5,0.45,'持续迭代',ha='center',fontsize=9,color='red')ax.set_xlim(0,1)ax.set_ylim(0,1)ax.set_title('MLOps生命周期',fontsize=14)plt.tight_layout()plt.show()print("\n💡 MLOps核心组件:")print(" - 实验追踪 (MLflow)")print(" - 数据版本控制 (DVC)")print(" - 模型注册与部署")print(" - 监控与告警")二、实验追踪(MLflow)
2.1 MLflow配置
defmlflow_setup():"""MLflow配置"""print("\n"+"="*60)print("MLflow实验追踪")print("="*60)code=""" # 安装MLflow pip install mlflow import mlflow import mlflow.sklearn from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score from sklearn.model_selection import train_test_split import numpy as np # 设置实验 mlflow.set_experiment("mlops_demo") # 训练模型 def train_and_log(X, y, params): with mlflow.start_run(run_name="random_forest_experiment"): # 记录参数 mlflow.log_params(params) # 训练模型 X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) model = RandomForestClassifier(**params) model.fit(X_train, y_train) # 预测 y_pred = model.predict(X_test) # 记录指标 mlflow.log_metric("accuracy", accuracy_score(y_test, y_pred)) mlflow.log_metric("precision", precision_score(y_test, y_pred, average='weighted')) mlflow.log_metric("recall", recall_score(y_test, y_pred, average='weighted')) mlflow.log_metric("f1", f1_score(y_test, y_pred, average='weighted')) # 记录模型 mlflow.sklearn.log_model(model, "model") # 记录额外文件 mlflow.log_artifact("confusion_matrix.png") return model # 使用示例 params = { "n_estimators": 100, "max_depth": 10, "min_samples_split": 5, "random_state": 42 } # model = train_and_log(X, y, params) # 查看实验结果 # mlflow ui """print(code)mlflow_setup()三、数据版本控制(DVC)
3.1 DVC配置
defdvc_setup():"""DVC配置"""print("\n"+"="*60)print("DVC数据版本控制")print("="*60)code=""" # 安装DVC pip install dvc # 初始化DVC dvc init # 添加数据 dvc add data/raw/dataset.csv # 配置远程存储(如S3、GCS) dvc remote add -d myremote s3://mybucket/dvcstore # 推送数据 dvc push # 拉取数据 dvc pull # 切换数据版本 git checkout v1.0 dvc checkout """print(code)dvc_setup()四、模型训练Pipeline
4.1 完整训练Pipeline
deftraining_pipeline():"""训练Pipeline"""print("\n"+"="*60)print("训练Pipeline")print("="*60)code=""" import mlflow import pandas as pd import numpy as np from sklearn.model_selection import train_test_split, cross_val_score from sklearn.preprocessing import StandardScaler from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import classification_report, confusion_matrix import joblib import yaml class TrainingPipeline: def __init__(self, config_path="config.yaml"): with open(config_path, 'r') as f: self.config = yaml.safe_load(f) self.model = None self.scaler = None def load_data(self): """加载数据""" data = pd.read_csv(self.config['data']['path']) X = data.drop(self.config['data']['target_column'], axis=1) y = data[self.config['data']['target_column']] return X, y def preprocess(self, X_train, X_test): """预处理""" self.scaler = StandardScaler() X_train_scaled = self.scaler.fit_transform(X_train) X_test_scaled = self.scaler.transform(X_test) return X_train_scaled, X_test_scaled def train(self, X_train, y_train): """训练模型""" self.model = RandomForestClassifier( n_estimators=self.config['model']['n_estimators'], max_depth=self.config['model']['max_depth'], random_state=42 ) self.model.fit(X_train, y_train) return self.model def evaluate(self, X_test, y_test): """评估模型""" y_pred = self.model.predict(X_test) metrics = { 'accuracy': accuracy_score(y_test, y_pred), 'precision': precision_score(y_test, y_pred, average='weighted'), 'recall': recall_score(y_test, y_pred, average='weighted'), 'f1': f1_score(y_test, y_pred, average='weighted') } return metrics, y_pred def save_artifacts(self): """保存模型和预处理器""" joblib.dump(self.model, "models/model.pkl") joblib.dump(self.scaler, "models/scaler.pkl") def run(self): """运行完整Pipeline""" # 加载数据 X, y = self.load_data() # 划分数据 X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) # 预处理 X_train_scaled, X_test_scaled = self.preprocess(X_train, X_test) # 训练 self.train(X_train_scaled, y_train) # 评估 metrics, y_pred = self.evaluate(X_test_scaled, y_test) # 保存 self.save_artifacts() return metrics, y_pred # 配置文件示例 config = { 'data': { 'path': 'data/processed/dataset.csv', 'target_column': 'target' }, 'model': { 'n_estimators': 100, 'max_depth': 10 } } # 运行Pipeline # pipeline = TrainingPipeline() # metrics, predictions = pipeline.run() """print(code)training_pipeline()五、模型部署
5.1 FastAPI服务
deffastapi_deployment():"""FastAPI部署"""print("\n"+"="*60)print("FastAPI模型服务")print("="*60)code=""" # app.py from fastapi import FastAPI, HTTPException from pydantic import BaseModel import joblib import numpy as np import pandas as pd from typing import List, Dict, Any app = FastAPI(title="ML Model API", version="1.0.0") # 加载模型 model = joblib.load("models/model.pkl") scaler = joblib.load("models/scaler.pkl") class PredictionRequest(BaseModel): features: List[float] class PredictionResponse(BaseModel): prediction: int probability: float confidence: float class BatchPredictionRequest(BaseModel): features: List[List[float]] class BatchPredictionResponse(BaseModel): predictions: List[int] probabilities: List[float] @app.get("/") def root(): return {"message": "ML Model API", "status": "running"} @app.get("/health") def health_check(): return {"status": "healthy"} @app.post("/predict", response_model=PredictionResponse) def predict(request: PredictionRequest): try: # 预处理 features = np.array(request.features).reshape(1, -1) features_scaled = scaler.transform(features) # 预测 prediction = model.predict(features_scaled)[0] probability = model.predict_proba(features_scaled)[0].max() confidence = float(probability) return PredictionResponse( prediction=int(prediction), probability=float(probability), confidence=confidence ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.post("/predict/batch", response_model=BatchPredictionResponse) def predict_batch(request: BatchPredictionRequest): try: features = np.array(request.features) features_scaled = scaler.transform(features) predictions = model.predict(features_scaled) probabilities = model.predict_proba(features_scaled).max(axis=1) return BatchPredictionResponse( predictions=predictions.tolist(), probabilities=probabilities.tolist() ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) # 运行服务 # uvicorn app:app --host 0.0.0.0 --port 8000 --reload """print(code)fastapi_deployment()5.2 Docker部署
defdocker_deployment():"""Docker部署"""print("\n"+"="*60)print("Docker部署")print("="*60)code=""" # Dockerfile FROM python:3.9-slim WORKDIR /app # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制代码和模型 COPY app.py . COPY models/ ./models/ # 暴露端口 EXPOSE 8000 # 启动服务 CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] # requirements.txt fastapi==0.104.0 uvicorn==0.24.0 scikit-learn==1.3.0 numpy==1.24.0 pandas==2.0.0 joblib==1.3.0 python-multipart==0.0.6 # 构建镜像 docker build -t ml-model-api . # 运行容器 docker run -d -p 8000:8000 --name ml-api ml-model-api # 查看日志 docker logs ml-api # 停止容器 docker stop ml-api # 删除容器 docker rm ml-api # docker-compose.yml version: '3.8' services: api: build: . ports: - "8000:8000" environment: - MODEL_PATH=/app/models/model.pkl restart: unless-stopped monitor: image: prom/prometheus ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml grafana: image: grafana/grafana ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin """print(code)docker_deployment()六、模型监控
6.1 监控配置
defmodel_monitoring():"""模型监控"""print("\n"+"="*60)print("模型监控")print("="*60)code=""" import time import numpy as np from prometheus_client import Counter, Histogram, Gauge, start_http_server # Prometheus指标 PREDICTION_COUNT = Counter('model_predictions_total', 'Total number of predictions') ERROR_COUNT = Counter('model_errors_total', 'Total number of errors') PREDICTION_LATENCY = Histogram('model_prediction_latency_seconds', 'Prediction latency') MODEL_CONFIDENCE = Gauge('model_prediction_confidence', 'Model prediction confidence') DATA_DRIFT_SCORE = Gauge('data_drift_score', 'Data drift score') class ModelMonitor: def __init__(self, port=8001): self.port = port start_http_server(self.port) print(f"Metrics server started on port {self.port}") def record_prediction(self, latency, confidence, error=False): """记录预测指标""" PREDICTION_COUNT.inc() PREDICTION_LATENCY.observe(latency) MODEL_CONFIDENCE.set(confidence) if error: ERROR_COUNT.inc() def record_drift(self, drift_score): """记录数据漂移""" DATA_DRIFT_SCORE.set(drift_score) # 在API中使用监控 from functools import wraps import time monitor = ModelMonitor() def track_predictions(func): @wraps(func) def wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) latency = time.time() - start_time monitor.record_prediction(latency, result.get('confidence', 0.5)) return result except Exception as e: latency = time.time() - start_time monitor.record_prediction(latency, 0, error=True) raise e return wrapper # 使用装饰器 @app.post("/predict") @track_predictions def predict(request: PredictionRequest): # ... 预测逻辑 pass # 数据漂移检测 from scipy.stats import ks_2samp class DriftDetector: def __init__(self, reference_data, threshold=0.05): self.reference_data = reference_data self.threshold = threshold def detect_drift(self, current_data): drift_scores = [] for col in self.reference_data.columns: stat, p_value = ks_2samp( self.reference_data[col].dropna(), current_data[col].dropna() ) drift_scores.append(p_value) if p_value < self.threshold: print(f"Warning: Drift detected in column {col}") avg_drift = np.mean(drift_scores) monitor.record_drift(1 - avg_drift) return avg_drift < self.threshold """print(code)model_monitoring()七、CI/CD Pipeline
7.1 GitHub Actions配置
defcicd_pipeline():"""CI/CD Pipeline"""print("\n"+"="*60)print("CI/CD Pipeline")print("="*60)code=""" # .github/workflows/mlops.yml name: MLOps Pipeline on: push: branches: [main] pull_request: branches: [main] jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Setup Python uses: actions/setup-python@v2 with: python-version: '3.9' - name: Install dependencies run: | pip install -r requirements.txt pip install pytest pytest-cov - name: Run tests run: | pytest tests/ --cov=src --cov-report=xml - name: Upload coverage uses: codecov/codecov-action@v2 train: needs: test runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Setup Python uses: actions/setup-python@v2 with: python-version: '3.9' - name: Install dependencies run: pip install -r requirements.txt - name: Train model run: python scripts/train.py - name: Upload model uses: actions/upload-artifact@v2 with: name: model path: models/ deploy: needs: train runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - uses: actions/checkout@v2 - name: Download model uses: actions/download-artifact@v2 with: name: model path: models/ - name: Configure AWS credentials uses: aws-actions/configure-aws-credentials@v1 with: aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} aws-region: us-east-1 - name: Deploy to ECS run: | aws ecs update-service --cluster ml-cluster --service ml-service --force-new-deployment """print(code)cicd_pipeline()八、完整MLOps架构
8.1 架构总览
defmlops_architecture():"""完整MLOps架构"""print("\n"+"="*60)print("完整MLOps架构")print("="*60)fig,ax=plt.subplots(figsize=(14,10))ax.axis('off')# 各层layers=[("数据层\n(Data Layer)",0.1,0.8,'lightblue'),("实验层\n(Experiment Layer)",0.1,0.6,'lightgreen'),("部署层\n(Deployment Layer)",0.1,0.4,'lightcoral'),("监控层\n(Monitoring Layer)",0.1,0.2,'lightyellow'),]forname,x,y,colorinlayers:box=FancyBboxPatch((x,y-0.05),0.8,0.1,boxstyle="round,pad=0.02",facecolor=color,ec='black')ax.add_patch(box)ax.text(x+0.4,y,name,ha='center',va='center',fontsize=9,fontweight='bold')# 组件components={'数据层':['数据仓库','数据版本(DVC)','特征存储'],'实验层':['MLflow','Jupyter','超参数优化'],'部署层':['FastAPI','Docker','Kubernetes'],'监控层':['Prometheus','Grafana','日志']}y_pos=[0.73,0.53,0.33,0.13]fori,(layer,comps)inenumerate(components.items()):forj,compinenumerate(comps):ax.text(0.25,y_pos[i]-j*0.04,f"•{comp}",fontsize=7)ax.set_xlim(0,1)ax.set_ylim(0,1)ax.set_title('完整MLOps架构',fontsize=14)plt.tight_layout()plt.show()mlops_architecture()九、总结
| 组件 | 工具 | 作用 |
|---|---|---|
| 实验追踪 | MLflow | 记录参数、指标、模型 |
| 数据版本 | DVC | 版本控制大型数据 |
| API服务 | FastAPI | 模型推理接口 |
| 容器化 | Docker | 环境隔离和部署 |
| 监控 | Prometheus + Grafana | 性能监控和告警 |
MLOps最佳实践:
- 版本控制代码、数据、模型
- 自动化测试和部署
- 监控模型性能和漂移
- 建立回滚机制