news 2026/5/6 7:35:45

07-项目实战与案例——端到端MLOps项目

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
07-项目实战与案例——端到端MLOps项目

端到端MLOps项目(训练、部署、监控)

一、MLOps概述

1.1 什么是MLOps?

importnumpyasnpimportmatplotlib.pyplotaspltfrommatplotlib.patchesimportRectangle,FancyBboxPatchimportwarnings warnings.filterwarnings('ignore')print("="*60)print("MLOps:机器学习运维")print("="*60)# MLOps生命周期fig,ax=plt.subplots(figsize=(14,8))ax.axis('off')# 生命周期阶段stages=[("数据管理",0.1,0.7),("模型训练",0.3,0.7),("模型验证",0.5,0.7),("模型部署",0.7,0.7),("监控运维",0.9,0.7),]forname,x,yinstages:circle=plt.Circle((x,y),0.08,color='lightblue',ec='black')ax.add_patch(circle)ax.text(x,y,name,ha='center',va='center',fontsize=7)ifx<0.85:ax.annotate('',xy=(x+0.18,y),xytext=(x+0.1,y),arrowprops=dict(arrowstyle='->',lw=1))# 循环箭头ax.annotate('',xy=(0.9,0.5),xytext=(0.1,0.5),arrowprops=dict(arrowstyle='->',lw=2,connectionstyle='arc3,rad=-0.3',color='red'))ax.text(0.5,0.45,'持续迭代',ha='center',fontsize=9,color='red')ax.set_xlim(0,1)ax.set_ylim(0,1)ax.set_title('MLOps生命周期',fontsize=14)plt.tight_layout()plt.show()print("\n💡 MLOps核心组件:")print(" - 实验追踪 (MLflow)")print(" - 数据版本控制 (DVC)")print(" - 模型注册与部署")print(" - 监控与告警")

二、实验追踪(MLflow)

2.1 MLflow配置

defmlflow_setup():"""MLflow配置"""print("\n"+"="*60)print("MLflow实验追踪")print("="*60)code=""" # 安装MLflow pip install mlflow import mlflow import mlflow.sklearn from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score from sklearn.model_selection import train_test_split import numpy as np # 设置实验 mlflow.set_experiment("mlops_demo") # 训练模型 def train_and_log(X, y, params): with mlflow.start_run(run_name="random_forest_experiment"): # 记录参数 mlflow.log_params(params) # 训练模型 X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) model = RandomForestClassifier(**params) model.fit(X_train, y_train) # 预测 y_pred = model.predict(X_test) # 记录指标 mlflow.log_metric("accuracy", accuracy_score(y_test, y_pred)) mlflow.log_metric("precision", precision_score(y_test, y_pred, average='weighted')) mlflow.log_metric("recall", recall_score(y_test, y_pred, average='weighted')) mlflow.log_metric("f1", f1_score(y_test, y_pred, average='weighted')) # 记录模型 mlflow.sklearn.log_model(model, "model") # 记录额外文件 mlflow.log_artifact("confusion_matrix.png") return model # 使用示例 params = { "n_estimators": 100, "max_depth": 10, "min_samples_split": 5, "random_state": 42 } # model = train_and_log(X, y, params) # 查看实验结果 # mlflow ui """print(code)mlflow_setup()

三、数据版本控制(DVC)

3.1 DVC配置

defdvc_setup():"""DVC配置"""print("\n"+"="*60)print("DVC数据版本控制")print("="*60)code=""" # 安装DVC pip install dvc # 初始化DVC dvc init # 添加数据 dvc add data/raw/dataset.csv # 配置远程存储(如S3、GCS) dvc remote add -d myremote s3://mybucket/dvcstore # 推送数据 dvc push # 拉取数据 dvc pull # 切换数据版本 git checkout v1.0 dvc checkout """print(code)dvc_setup()

四、模型训练Pipeline

4.1 完整训练Pipeline

deftraining_pipeline():"""训练Pipeline"""print("\n"+"="*60)print("训练Pipeline")print("="*60)code=""" import mlflow import pandas as pd import numpy as np from sklearn.model_selection import train_test_split, cross_val_score from sklearn.preprocessing import StandardScaler from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import classification_report, confusion_matrix import joblib import yaml class TrainingPipeline: def __init__(self, config_path="config.yaml"): with open(config_path, 'r') as f: self.config = yaml.safe_load(f) self.model = None self.scaler = None def load_data(self): """加载数据""" data = pd.read_csv(self.config['data']['path']) X = data.drop(self.config['data']['target_column'], axis=1) y = data[self.config['data']['target_column']] return X, y def preprocess(self, X_train, X_test): """预处理""" self.scaler = StandardScaler() X_train_scaled = self.scaler.fit_transform(X_train) X_test_scaled = self.scaler.transform(X_test) return X_train_scaled, X_test_scaled def train(self, X_train, y_train): """训练模型""" self.model = RandomForestClassifier( n_estimators=self.config['model']['n_estimators'], max_depth=self.config['model']['max_depth'], random_state=42 ) self.model.fit(X_train, y_train) return self.model def evaluate(self, X_test, y_test): """评估模型""" y_pred = self.model.predict(X_test) metrics = { 'accuracy': accuracy_score(y_test, y_pred), 'precision': precision_score(y_test, y_pred, average='weighted'), 'recall': recall_score(y_test, y_pred, average='weighted'), 'f1': f1_score(y_test, y_pred, average='weighted') } return metrics, y_pred def save_artifacts(self): """保存模型和预处理器""" joblib.dump(self.model, "models/model.pkl") joblib.dump(self.scaler, "models/scaler.pkl") def run(self): """运行完整Pipeline""" # 加载数据 X, y = self.load_data() # 划分数据 X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) # 预处理 X_train_scaled, X_test_scaled = self.preprocess(X_train, X_test) # 训练 self.train(X_train_scaled, y_train) # 评估 metrics, y_pred = self.evaluate(X_test_scaled, y_test) # 保存 self.save_artifacts() return metrics, y_pred # 配置文件示例 config = { 'data': { 'path': 'data/processed/dataset.csv', 'target_column': 'target' }, 'model': { 'n_estimators': 100, 'max_depth': 10 } } # 运行Pipeline # pipeline = TrainingPipeline() # metrics, predictions = pipeline.run() """print(code)training_pipeline()

五、模型部署

5.1 FastAPI服务

deffastapi_deployment():"""FastAPI部署"""print("\n"+"="*60)print("FastAPI模型服务")print("="*60)code=""" # app.py from fastapi import FastAPI, HTTPException from pydantic import BaseModel import joblib import numpy as np import pandas as pd from typing import List, Dict, Any app = FastAPI(title="ML Model API", version="1.0.0") # 加载模型 model = joblib.load("models/model.pkl") scaler = joblib.load("models/scaler.pkl") class PredictionRequest(BaseModel): features: List[float] class PredictionResponse(BaseModel): prediction: int probability: float confidence: float class BatchPredictionRequest(BaseModel): features: List[List[float]] class BatchPredictionResponse(BaseModel): predictions: List[int] probabilities: List[float] @app.get("/") def root(): return {"message": "ML Model API", "status": "running"} @app.get("/health") def health_check(): return {"status": "healthy"} @app.post("/predict", response_model=PredictionResponse) def predict(request: PredictionRequest): try: # 预处理 features = np.array(request.features).reshape(1, -1) features_scaled = scaler.transform(features) # 预测 prediction = model.predict(features_scaled)[0] probability = model.predict_proba(features_scaled)[0].max() confidence = float(probability) return PredictionResponse( prediction=int(prediction), probability=float(probability), confidence=confidence ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.post("/predict/batch", response_model=BatchPredictionResponse) def predict_batch(request: BatchPredictionRequest): try: features = np.array(request.features) features_scaled = scaler.transform(features) predictions = model.predict(features_scaled) probabilities = model.predict_proba(features_scaled).max(axis=1) return BatchPredictionResponse( predictions=predictions.tolist(), probabilities=probabilities.tolist() ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) # 运行服务 # uvicorn app:app --host 0.0.0.0 --port 8000 --reload """print(code)fastapi_deployment()

5.2 Docker部署

defdocker_deployment():"""Docker部署"""print("\n"+"="*60)print("Docker部署")print("="*60)code=""" # Dockerfile FROM python:3.9-slim WORKDIR /app # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制代码和模型 COPY app.py . COPY models/ ./models/ # 暴露端口 EXPOSE 8000 # 启动服务 CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] # requirements.txt fastapi==0.104.0 uvicorn==0.24.0 scikit-learn==1.3.0 numpy==1.24.0 pandas==2.0.0 joblib==1.3.0 python-multipart==0.0.6 # 构建镜像 docker build -t ml-model-api . # 运行容器 docker run -d -p 8000:8000 --name ml-api ml-model-api # 查看日志 docker logs ml-api # 停止容器 docker stop ml-api # 删除容器 docker rm ml-api # docker-compose.yml version: '3.8' services: api: build: . ports: - "8000:8000" environment: - MODEL_PATH=/app/models/model.pkl restart: unless-stopped monitor: image: prom/prometheus ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml grafana: image: grafana/grafana ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin """print(code)docker_deployment()

六、模型监控

6.1 监控配置

defmodel_monitoring():"""模型监控"""print("\n"+"="*60)print("模型监控")print("="*60)code=""" import time import numpy as np from prometheus_client import Counter, Histogram, Gauge, start_http_server # Prometheus指标 PREDICTION_COUNT = Counter('model_predictions_total', 'Total number of predictions') ERROR_COUNT = Counter('model_errors_total', 'Total number of errors') PREDICTION_LATENCY = Histogram('model_prediction_latency_seconds', 'Prediction latency') MODEL_CONFIDENCE = Gauge('model_prediction_confidence', 'Model prediction confidence') DATA_DRIFT_SCORE = Gauge('data_drift_score', 'Data drift score') class ModelMonitor: def __init__(self, port=8001): self.port = port start_http_server(self.port) print(f"Metrics server started on port {self.port}") def record_prediction(self, latency, confidence, error=False): """记录预测指标""" PREDICTION_COUNT.inc() PREDICTION_LATENCY.observe(latency) MODEL_CONFIDENCE.set(confidence) if error: ERROR_COUNT.inc() def record_drift(self, drift_score): """记录数据漂移""" DATA_DRIFT_SCORE.set(drift_score) # 在API中使用监控 from functools import wraps import time monitor = ModelMonitor() def track_predictions(func): @wraps(func) def wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) latency = time.time() - start_time monitor.record_prediction(latency, result.get('confidence', 0.5)) return result except Exception as e: latency = time.time() - start_time monitor.record_prediction(latency, 0, error=True) raise e return wrapper # 使用装饰器 @app.post("/predict") @track_predictions def predict(request: PredictionRequest): # ... 预测逻辑 pass # 数据漂移检测 from scipy.stats import ks_2samp class DriftDetector: def __init__(self, reference_data, threshold=0.05): self.reference_data = reference_data self.threshold = threshold def detect_drift(self, current_data): drift_scores = [] for col in self.reference_data.columns: stat, p_value = ks_2samp( self.reference_data[col].dropna(), current_data[col].dropna() ) drift_scores.append(p_value) if p_value < self.threshold: print(f"Warning: Drift detected in column {col}") avg_drift = np.mean(drift_scores) monitor.record_drift(1 - avg_drift) return avg_drift < self.threshold """print(code)model_monitoring()

七、CI/CD Pipeline

7.1 GitHub Actions配置

defcicd_pipeline():"""CI/CD Pipeline"""print("\n"+"="*60)print("CI/CD Pipeline")print("="*60)code=""" # .github/workflows/mlops.yml name: MLOps Pipeline on: push: branches: [main] pull_request: branches: [main] jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Setup Python uses: actions/setup-python@v2 with: python-version: '3.9' - name: Install dependencies run: | pip install -r requirements.txt pip install pytest pytest-cov - name: Run tests run: | pytest tests/ --cov=src --cov-report=xml - name: Upload coverage uses: codecov/codecov-action@v2 train: needs: test runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Setup Python uses: actions/setup-python@v2 with: python-version: '3.9' - name: Install dependencies run: pip install -r requirements.txt - name: Train model run: python scripts/train.py - name: Upload model uses: actions/upload-artifact@v2 with: name: model path: models/ deploy: needs: train runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - uses: actions/checkout@v2 - name: Download model uses: actions/download-artifact@v2 with: name: model path: models/ - name: Configure AWS credentials uses: aws-actions/configure-aws-credentials@v1 with: aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} aws-region: us-east-1 - name: Deploy to ECS run: | aws ecs update-service --cluster ml-cluster --service ml-service --force-new-deployment """print(code)cicd_pipeline()

八、完整MLOps架构

8.1 架构总览

defmlops_architecture():"""完整MLOps架构"""print("\n"+"="*60)print("完整MLOps架构")print("="*60)fig,ax=plt.subplots(figsize=(14,10))ax.axis('off')# 各层layers=[("数据层\n(Data Layer)",0.1,0.8,'lightblue'),("实验层\n(Experiment Layer)",0.1,0.6,'lightgreen'),("部署层\n(Deployment Layer)",0.1,0.4,'lightcoral'),("监控层\n(Monitoring Layer)",0.1,0.2,'lightyellow'),]forname,x,y,colorinlayers:box=FancyBboxPatch((x,y-0.05),0.8,0.1,boxstyle="round,pad=0.02",facecolor=color,ec='black')ax.add_patch(box)ax.text(x+0.4,y,name,ha='center',va='center',fontsize=9,fontweight='bold')# 组件components={'数据层':['数据仓库','数据版本(DVC)','特征存储'],'实验层':['MLflow','Jupyter','超参数优化'],'部署层':['FastAPI','Docker','Kubernetes'],'监控层':['Prometheus','Grafana','日志']}y_pos=[0.73,0.53,0.33,0.13]fori,(layer,comps)inenumerate(components.items()):forj,compinenumerate(comps):ax.text(0.25,y_pos[i]-j*0.04,f"•{comp}",fontsize=7)ax.set_xlim(0,1)ax.set_ylim(0,1)ax.set_title('完整MLOps架构',fontsize=14)plt.tight_layout()plt.show()mlops_architecture()

九、总结

组件工具作用
实验追踪MLflow记录参数、指标、模型
数据版本DVC版本控制大型数据
API服务FastAPI模型推理接口
容器化Docker环境隔离和部署
监控Prometheus + Grafana性能监控和告警

MLOps最佳实践:

  1. 版本控制代码、数据、模型
  2. 自动化测试和部署
  3. 监控模型性能和漂移
  4. 建立回滚机制
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/6 7:35:42

告别重复劳动:用快马智能组合mc指令,实现游戏管理效率倍增

告别重复劳动&#xff1a;用快马智能组合mc指令&#xff0c;实现游戏管理效率倍增 作为一个《我的世界》的老玩家和服务器管理员&#xff0c;我深知游戏指令的强大和复杂。mc指令大全虽然涵盖了各种功能&#xff0c;但手动查找、组合和调试这些指令往往耗时耗力。特别是当需要…

作者头像 李华
网站建设 2026/5/6 7:34:49

终极音乐解锁指南:5步搞定QQ音乐、网易云音乐加密文件

终极音乐解锁指南&#xff1a;5步搞定QQ音乐、网易云音乐加密文件 【免费下载链接】unlock-music-electron Unlock Music Project - Electron Edition 在Electron构建的桌面应用中解锁各种加密的音乐文件 项目地址: https://gitcode.com/gh_mirrors/un/unlock-music-electron…

作者头像 李华
网站建设 2026/5/6 7:27:36

借助 Taotoken 用量看板优化团队月度大模型 API 预算分配

借助 Taotoken 用量看板优化团队月度大模型 API 预算分配 1. 用量看板的核心价值 对于使用大模型 API 的团队而言&#xff0c;成本控制与预算分配是持续面临的挑战。Taotoken 用量看板通过聚合各项目、各模型的调用数据&#xff0c;将原本分散的 token 消耗转化为可视化报表。…

作者头像 李华
网站建设 2026/5/6 7:23:35

如何打造纯净动画观影体验:智能广告拦截插件完全指南

如何打造纯净动画观影体验&#xff1a;智能广告拦截插件完全指南 【免费下载链接】Hanime1Plugin Android插件(https://hanime1.me) (NSFW) 项目地址: https://gitcode.com/gh_mirrors/ha/Hanime1Plugin 你是否厌倦了在观看心爱动画时被各种广告频繁打断&#xff1f;Han…

作者头像 李华
网站建设 2026/5/6 7:22:47

Kagantic-Codebase:AI协作代码库治理框架的设计与实践

1. 项目概述&#xff1a;为AI协作而生的代码库治理框架如果你正在尝试将AI助手&#xff08;比如Claude Code、Cursor、GitHub Copilot&#xff09;深度集成到你的开发工作流中&#xff0c;并且已经受够了每次都要在聊天框里重复解释项目结构、编码规范和操作边界的麻烦&#xf…

作者头像 李华
网站建设 2026/5/6 7:20:36

文明越复杂,伪装就越精致,人性就越容易迷失在符号之中

你说得非常透彻&#xff0c;而且带着一种历史穿透力的清醒。“看最原始的东西就行了”——这其实是一种政治经济学的底层思维&#xff1a;剥开制度、话语、技术、法律的外衣&#xff0c;直视权力与资源分配的本质。一、你说的“原始的东西”是什么&#xff1f;其实就是人类组织…

作者头像 李华