news 2026/4/23 16:29:20

Python实现跨平台商品价格追踪:构建智能比价爬虫系统

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python实现跨平台商品价格追踪:构建智能比价爬虫系统

引言:电商时代的价格监控革命

在当今的电商时代,商品价格波动频繁且迅速。对于精明的消费者、电商从业者、数据分析师和市场研究人员来说,实时监控亚马逊、淘宝、京东等主要电商平台的商品价格变化已成为一项重要技能。通过构建智能价格追踪爬虫系统,我们不仅可以捕捉最佳购买时机,还能进行市场趋势分析和竞争对手监控。本文将深入探讨如何使用Python最新技术构建一个高效、稳定的跨平台商品价格追踪系统。

技术栈概览

我们将在本项目中采用以下现代Python技术栈:

  • Playwright:微软开发的现代化浏览器自动化工具,支持无头浏览器操作

  • Asyncio:Python原生异步IO框架,实现高并发爬取

  • BeautifulSoup4/Selectolax:高效的HTML解析库

  • Pydantic:数据验证与设置管理

  • SQLAlchemy + SQLite:轻量级数据存储方案

  • FastAPI:可选的可视化API后端

  • 代理池支持:处理反爬机制

  • 机器学习预警:基于历史价格的智能预测

系统架构设计

1. 项目结构规划

text

price-tracker/ ├── crawlers/ # 各平台爬虫模块 │ ├── base.py # 基础爬虫类 │ ├── amazon.py # 亚马逊爬虫 │ ├── taobao.py # 淘宝爬虫 │ └── jd.py # 京东爬虫 ├── models/ # 数据模型 │ ├── product.py # 商品模型 │ └── price.py # 价格记录模型 ├── database/ # 数据库操作 ├── config/ # 配置文件 ├── utils/ # 工具函数 ├── alerts/ # 预警系统 └── main.py # 主程序入口

2. 核心功能模块

  • 多平台适配器:统一接口支持不同电商平台

  • 智能请求调度:自适应请求频率控制

  • 反爬虫对抗:自动切换代理、用户代理和浏览器指纹

  • 数据清洗管道:标准化不同平台的数据格式

  • 实时监控与预警:基于规则和机器学习的价格异常检测

完整代码实现

1. 环境配置与依赖安装

首先创建并激活虚拟环境,然后安装所需依赖:

bash

# 创建虚拟环境 python -m venv venv source venv/bin/activate # Linux/Mac # 或 venv\Scripts\activate # Windows # 安装核心依赖 pip install playwright selectolax pydantic sqlalchemy aiohttp pip install asyncio nest-asyncio pandas numpy pip install fastapi uvicorn jinja2 # 可选,用于Web界面 # 安装Playwright浏览器 playwright install chromium

2. 配置管理模块

python

# config/settings.py from pydantic import BaseSettings from typing import List, Optional import os class Settings(BaseSettings): # 数据库配置 DATABASE_URL: str = "sqlite:///./price_tracker.db" # 爬虫配置 REQUEST_TIMEOUT: int = 30 MAX_CONCURRENT_REQUESTS: int = 5 DEFAULT_USER_AGENT: str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" # 代理配置 PROXY_ENABLED: bool = False PROXY_POOL: List[str] = [] # 平台特定配置 AMAZON_DOMAINS: dict = { 'US': 'amazon.com', 'UK': 'amazon.co.uk', 'DE': 'amazon.de', 'JP': 'amazon.co.jp' } # 价格监控阈值 PRICE_DROP_THRESHOLD: float = 0.15 # 价格下降15%时触发通知 CHECK_INTERVAL_HOURS: int = 6 class Config: env_file = ".env" settings = Settings()

3. 数据模型定义

python

# models/product.py from pydantic import BaseModel, HttpUrl from typing import Optional, List from datetime import datetime from enum import Enum class Platform(str, Enum): AMAZON = "amazon" TAOBAO = "taobao" JD = "jd" EBAY = "ebay" class Product(BaseModel): id: Optional[int] = None platform: Platform product_id: str url: HttpUrl title: str description: Optional[str] = None current_price: Optional[float] = None original_price: Optional[float] = None currency: str = "USD" availability: bool = True image_url: Optional[str] = None category: Optional[str] = None last_updated: datetime = datetime.now() class Config: orm_mode = True class PriceHistory(BaseModel): id: Optional[int] = None product_id: int price: float currency: str timestamp: datetime = datetime.now() availability: bool = True special_offer: Optional[str] = None class Config: orm_mode = True

4. 数据库管理

python

# database/database.py from sqlalchemy import create_engine, Column, Integer, String, Float, Boolean, DateTime, Text, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship from datetime import datetime import os from config.settings import settings Base = declarative_base() class ProductORM(Base): __tablename__ = "products" id = Column(Integer, primary_key=True, index=True) platform = Column(String(50), nullable=False) product_id = Column(String(100), unique=True, nullable=False) url = Column(String(500), nullable=False) title = Column(String(500), nullable=False) description = Column(Text) current_price = Column(Float) original_price = Column(Float) currency = Column(String(10), default="USD") availability = Column(Boolean, default=True) image_url = Column(String(500)) category = Column(String(200)) last_updated = Column(DateTime, default=datetime.now) price_history = relationship("PriceHistoryORM", back_populates="product", cascade="all, delete-orphan") class PriceHistoryORM(Base): __tablename__ = "price_history" id = Column(Integer, primary_key=True, index=True) product_id = Column(Integer, ForeignKey("products.id")) price = Column(Float, nullable=False) currency = Column(String(10), default="USD") timestamp = Column(DateTime, default=datetime.now) availability = Column(Boolean, default=True) special_offer = Column(String(200)) product = relationship("ProductORM", back_populates="price_history") # 数据库初始化 engine = create_engine(settings.DATABASE_URL, connect_args={"check_same_thread": False} if "sqlite" in settings.DATABASE_URL else {}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) def init_db(): Base.metadata.create_all(bind=engine) def get_db(): db = SessionLocal() try: yield db finally: db.close()

5. 基础爬虫类

python

# crawlers/base.py import asyncio import aiohttp from typing import Optional, Dict, Any from selectolax.parser import HTMLParser import random import time from dataclasses import dataclass from urllib.parse import urlparse import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class CrawlerConfig: user_agents: list = None proxy: Optional[str] = None timeout: int = 30 max_retries: int = 3 delay_range: tuple = (1, 3) class BaseCrawler: def __init__(self, config: CrawlerConfig = None): self.config = config or CrawlerConfig() if not self.config.user_agents: self.config.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36" ] def get_random_user_agent(self) -> str: return random.choice(self.config.user_agents) async def fetch(self, url: str, session: aiohttp.ClientSession) -> Optional[str]: headers = { "User-Agent": self.get_random_user_agent(), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", } for attempt in range(self.config.max_retries): try: async with session.get( url, headers=headers, proxy=self.config.proxy, timeout=aiohttp.ClientTimeout(total=self.config.timeout) ) as response: if response.status == 200: return await response.text() elif response.status == 429: # Too Many Requests wait_time = 2 ** attempt # Exponential backoff logger.warning(f"Rate limited. Waiting {wait_time} seconds...") await asyncio.sleep(wait_time) else: logger.error(f"Failed to fetch {url}: Status {response.status}") return None except Exception as e: logger.error(f"Attempt {attempt + 1} failed for {url}: {str(e)}") if attempt < self.config.max_retries - 1: await asyncio.sleep(2 ** attempt) else: return None # 随机延迟,避免请求过于频繁 await asyncio.sleep(random.uniform(*self.config.delay_range)) return None def parse_html(self, html: str) -> HTMLParser: return HTMLParser(html)

6. 亚马逊爬虫实现

python

# crawlers/amazon.py from crawlers.base import BaseCrawler, CrawlerConfig from models.product import Product from typing import Optional import re from urllib.parse import urlparse, parse_qs import json import logging logger = logging.getLogger(__name__) class AmazonCrawler(BaseCrawler): def __init__(self, country: str = "US", config: CrawlerConfig = None): super().__init__(config) self.country = country.upper() self.base_domain = f"amazon.{self._get_domain_suffix()}" def _get_domain_suffix(self) -> str: domains = { "US": "com", "UK": "co.uk", "DE": "de", "JP": "co.jp", "FR": "fr", "IT": "it", "ES": "es", "CA": "ca", "AU": "com.au" } return domains.get(self.country, "com") def extract_product_id(self, url: str) -> Optional[str]: """提取亚马逊商品ID""" patterns = [ r'/dp/([A-Z0-9]{10})', r'/gp/product/([A-Z0-9]{10})', r'/product/([A-Z0-9]{10})', r'/dp/([A-Z0-9]{10})/' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) # 尝试从查询参数中提取 parsed = urlparse(url) query_params = parse_qs(parsed.query) if 'asin' in query_params: return query_params['asin'][0] return None async def get_product_info(self, url: str, session) -> Optional[Product]: html = await self.fetch(url, session) if not html: return None tree = self.parse_html(html) # 提取商品信息 product_data = self._parse_product_page(tree) if product_data: return Product( platform="amazon", product_id=product_data["product_id"], url=url, title=product_data["title"], current_price=product_data["current_price"], original_price=product_data["original_price"], currency=product_data["currency"], availability=product_data["availability"], image_url=product_data.get("image_url"), description=product_data.get("description") ) return None def _parse_product_page(self, tree) -> Optional[dict]: """解析亚马逊商品页面""" try: # 尝试从JSON-LD中提取数据 script_tags = tree.css('script[type="application/ld+json"]') for script in script_tags: try: data = json.loads(script.text()) if data.get("@type") == "Product": product_info = { "product_id": data.get("sku") or data.get("productID", ""), "title": data.get("name", ""), "description": data.get("description", ""), "image_url": data.get("image", ""), "currency": "USD", "availability": False, "current_price": None, "original_price": None } # 提取价格信息 if "offers" in data: offers = data["offers"] if isinstance(offers, dict): offers = [offers] for offer in offers: if offer.get("availability") == "https://schema.org/InStock": product_info["availability"] = True price = offer.get("price") if price: product_info["current_price"] = float(price) product_info["currency"] = offer.get("priceCurrency", "USD") return product_info except json.JSONDecodeError: continue # 备用解析方法:直接解析HTML元素 title_elem = tree.css_first('#productTitle') title = title_elem.text(strip=True) if title_elem else "" # 价格解析 price_selectors = [ '.a-price .a-offscreen', '#price_inside_buybox', '#priceblock_ourprice', '#priceblock_dealprice', '.a-color-price' ] current_price = None for selector in price_selectors: price_elem = tree.css_first(selector) if price_elem: price_text = price_elem.text(strip=True) price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', '')) if price_match: current_price = float(price_match.group()) break # 提取商品ID product_id = None asin_elem = tree.css_first('input[name="asin"]') if asin_elem and asin_elem.attributes.get('value'): product_id = asin_elem.attributes['value'] return { "product_id": product_id or "", "title": title, "current_price": current_price, "original_price": None, "currency": "USD", "availability": current_price is not None, "image_url": None, "description": "" } except Exception as e: logger.error(f"Error parsing Amazon page: {str(e)}") return None

7. 淘宝爬虫实现

python

# crawlers/taobao.py from crawlers.base import BaseCrawler, CrawlerConfig from models.product import Product from typing import Optional import re import json import logging logger = logging.getLogger(__name__) class TaobaoCrawler(BaseCrawler): def __init__(self, config: CrawlerConfig = None): super().__init__(config) self.base_url = "https://item.taobao.com" def extract_product_id(self, url: str) -> Optional[str]: """提取淘宝商品ID""" # 匹配淘宝商品URL模式 patterns = [ r'item\.taobao\.com/item\.htm\?id=(\d+)', r'taobao\.com/item\.htm\?id=(\d+)', r'/item/(\d+)\.html' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None async def get_product_info(self, url: str, session) -> Optional[Product]: # 淘宝需要特殊处理,可能需要使用无头浏览器 from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent=self.get_random_user_agent(), viewport={'width': 1920, 'height': 1080} ) page = await context.new_page() try: await page.goto(url, timeout=30000) # 等待页面加载完成 await page.wait_for_load_state('networkidle', timeout=10000) # 提取商品信息 title = await self._extract_title(page) price = await self._extract_price(page) product_id = self.extract_product_id(url) if title and product_id: return Product( platform="taobao", product_id=product_id, url=url, title=title, current_price=float(price) if price else None, currency="CNY", availability=price is not None ) except Exception as e: logger.error(f"Error crawling Taobao: {str(e)}") finally: await browser.close() return None async def _extract_title(self, page) -> Optional[str]: """提取商品标题""" try: # 尝试多种选择器 selectors = [ '.tb-detail-hd h1', '.tb-main-title', '[data-title]', '.title-text' ] for selector in selectors: element = await page.query_selector(selector) if element: title = await element.text_content() if title and len(title.strip()) > 0: return title.strip() return None except Exception as e: logger.error(f"Error extracting title: {str(e)}") return None async def _extract_price(self, page) -> Optional[str]: """提取商品价格""" try: # 价格选择器 selectors = [ '.tb-rmb-num', '.tm-price', '.price', '[property="og:product:price"]' ] for selector in selectors: element = await page.query_selector(selector) if element: price_text = await element.text_content() if price_text: # 提取数字 match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', '')) if match: return match.group() return None except Exception as e: logger.error(f"Error extracting price: {str(e)}") return None

8. 异步爬虫调度器

python

# crawlers/scheduler.py import asyncio from typing import List, Dict, Any, Optional import aiohttp from datetime import datetime import logging from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor from crawlers.amazon import AmazonCrawler from crawlers.taobao import TaobaoCrawler from models.product import Product, Platform from database.database import SessionLocal, ProductORM, PriceHistoryORM logger = logging.getLogger(__name__) @dataclass class TrackingTask: url: str platform: Platform check_interval: int = 6 # 检查间隔(小时) last_checked: Optional[datetime] = None class PriceTrackerScheduler: def __init__(self, max_concurrent: int = 5): self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) self.crawlers = { Platform.AMAZON: AmazonCrawler(), Platform.TAOBAO: TaobaoCrawler(), } self.tracking_tasks: List[TrackingTask] = [] def add_tracking_task(self, url: str, platform: Platform, check_interval: int = 6): """添加追踪任务""" task = TrackingTask(url=url, platform=platform, check_interval=check_interval) self.tracking_tasks.append(task) logger.info(f"Added tracking task: {url}") async def crawl_product(self, task: TrackingTask, session: aiohttp.ClientSession) -> Optional[Product]: """爬取单个商品信息""" async with self.semaphore: try: crawler = self.crawlers.get(task.platform) if not crawler: logger.error(f"No crawler for platform: {task.platform}") return None product = await crawler.get_product_info(task.url, session) if product: logger.info(f"Successfully crawled {product.title}") return product except Exception as e: logger.error(f"Error crawling {task.url}: {str(e)}") return None async def process_batch(self, tasks: List[TrackingTask]): """批量处理追踪任务""" async with aiohttp.ClientSession() as session: tasks_list = [self.crawl_product(task, session) for task in tasks] results = await asyncio.gather(*tasks_list, return_exceptions=True) successful_products = [] for result in results: if isinstance(result, Product): successful_products.append(result) elif isinstance(result, Exception): logger.error(f"Task failed with exception: {str(result)}") # 保存到数据库 await self.save_to_database(successful_products) return successful_products async def save_to_database(self, products: List[Product]): """保存商品信息到数据库""" db = SessionLocal() try: for product in products: # 检查商品是否已存在 db_product = db.query(ProductORM).filter( ProductORM.platform == product.platform, ProductORM.product_id == product.product_id ).first() if db_product: # 更新现有商品 price_changed = db_product.current_price != product.current_price db_product.current_price = product.current_price db_product.original_price = product.original_price or db_product.original_price db_product.availability = product.availability db_product.last_updated = datetime.now() # 如果价格变化,添加历史记录 if price_changed and product.current_price: price_history = PriceHistoryORM( product_id=db_product.id, price=product.current_price, currency=product.currency, availability=product.availability, timestamp=datetime.now() ) db.add(price_history) logger.info(f"Updated product: {product.title}") else: # 添加新商品 new_product = ProductORM( platform=product.platform.value, product_id=product.product_id, url=str(product.url), title=product.title, description=product.description, current_price=product.current_price, original_price=product.original_price, currency=product.currency, availability=product.availability, image_url=product.image_url, category=product.category ) db.add(new_product) db.flush() # 获取新商品的ID # 添加初始价格记录 if product.current_price: price_history = PriceHistoryORM( product_id=new_product.id, price=product.current_price, currency=product.currency, availability=product.availability, timestamp=datetime.now() ) db.add(price_history) logger.info(f"Added new product: {product.title}") db.commit() except Exception as e: db.rollback() logger.error(f"Error saving to database: {str(e)}") finally: db.close() async def run_continuous_monitoring(self, interval_hours: int = 6): """持续监控任务""" logger.info(f"Starting continuous monitoring with {interval_hours} hour interval") while True: try: # 筛选需要检查的任务 now = datetime.now() tasks_to_check = [ task for task in self.tracking_tasks if task.last_checked is None or (now - task.last_checked).total_seconds() >= task.check_interval * 3600 ] if tasks_to_check: logger.info(f"Checking {len(tasks_to_check)} products...") results = await self.process_batch(tasks_to_check) # 更新最后检查时间 for task in tasks_to_check: task.last_checked = now logger.info(f"Completed checking {len(results)} products") else: logger.info("No products need checking at this time") # 等待下一个检查周期 await asyncio.sleep(interval_hours * 3600) except KeyboardInterrupt: logger.info("Monitoring stopped by user") break except Exception as e: logger.error(f"Error in monitoring loop: {str(e)}") await asyncio.sleep(300) # 出错后等待5分钟再试

9. 价格分析与预警系统

python

# alerts/price_analyzer.py import numpy as np from typing import List, Dict, Any, Optional from datetime import datetime, timedelta import logging from dataclasses import dataclass from database.database import SessionLocal, PriceHistoryORM logger = logging.getLogger(__name__) @dataclass class PriceAlert: product_id: int alert_type: str # "price_drop", "price_increase", "availability_change" message: str threshold: float current_value: float previous_value: float timestamp: datetime class PriceAnalyzer: def __init__(self, price_drop_threshold: float = 0.15): self.price_drop_threshold = price_drop_threshold def analyze_price_history(self, product_id: int, days_back: int = 30) -> List[PriceAlert]: """分析商品价格历史""" alerts = [] db = SessionLocal() try: # 获取价格历史 cutoff_date = datetime.now() - timedelta(days=days_back) price_history = db.query(PriceHistoryORM).filter( PriceHistoryORM.product_id == product_id, PriceHistoryORM.timestamp >= cutoff_date ).order_by(PriceHistoryORM.timestamp.desc()).all() if len(price_history) < 2: return alerts # 获取当前价格和之前的价格 current_price = price_history[0].price previous_price = price_history[1].price if current_price and previous_price: # 计算价格变化百分比 price_change = (current_price - previous_price) / previous_price # 检查价格下降 if price_change <= -self.price_drop_threshold: alerts.append(PriceAlert( product_id=product_id, alert_type="price_drop", message=f"价格下降 {abs(price_change*100):.1f}%", threshold=self.price_drop_threshold, current_value=current_price, previous_value=previous_price, timestamp=datetime.now() )) # 检查价格上涨(可选) if price_change >= 0.10: # 上涨10% alerts.append(PriceAlert( product_id=product_id, alert_type="price_increase", message=f"价格上涨 {price_change*100:.1f}%", threshold=0.10, current_value=current_price, previous_value=previous_price, timestamp=datetime.now() )) # 检查可用性变化 current_availability = price_history[0].availability previous_availability = price_history[1].availability if current_availability != previous_availability: status = "有货" if current_availability else "缺货" alerts.append(PriceAlert( product_id=product_id, alert_type="availability_change", message=f"商品状态变化: 现在{status}", threshold=0, current_value=current_availability, previous_value=previous_availability, timestamp=datetime.now() )) except Exception as e: logger.error(f"Error analyzing price history: {str(e)}") finally: db.close() return alerts def detect_price_patterns(self, prices: List[float]) -> Dict[str, Any]: """检测价格模式""" if len(prices) < 5: return {} prices_array = np.array(prices) # 计算统计信息 stats = { "mean": float(np.mean(prices_array)), "std": float(np.std(prices_array)), "min": float(np.min(prices_array)), "max": float(np.max(prices_array)), "current": float(prices_array[-1]), "is_lowest": prices_array[-1] <= np.min(prices_array[:-1]), "trend": "unknown" } # 检测趋势(简单线性回归) if len(prices_array) >= 3: x = np.arange(len(prices_array)) slope, intercept = np.polyfit(x, prices_array, 1) if slope < -0.01: stats["trend"] = "downward" elif slope > 0.01: stats["trend"] = "upward" else: stats["trend"] = "stable" return stats

10. 主程序与Web界面

python

# main.py import asyncio import uvicorn from fastapi import FastAPI, Depends, HTTPException, Request from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session from typing import List import logging from crawlers.scheduler import PriceTrackerScheduler, TrackingTask from models.product import Product, Platform from database.database import get_db, init_db, ProductORM, PriceHistoryORM from config.settings import settings # 初始化FastAPI应用 app = FastAPI(title="Price Tracker API", version="1.0.0") templates = Jinja2Templates(directory="templates") # 全局调度器实例 scheduler = PriceTrackerScheduler(max_concurrent=settings.MAX_CONCURRENT_REQUESTS) @app.on_event("startup") async def startup_event(): """应用启动时初始化数据库和调度器""" init_db() logger.info("Application started") @app.get("/", response_class=HTMLResponse) async def dashboard(request: Request, db: Session = Depends(get_db)): """价格追踪仪表板""" products = db.query(ProductORM).order_by(ProductORM.last_updated.desc()).limit(50).all() return templates.TemplateResponse("dashboard.html", {"request": request, "products": products}) @app.post("/api/track") async def track_product(url: str, platform: Platform): """开始追踪商品""" scheduler.add_tracking_task(url, platform) return {"message": f"Started tracking {url}", "platform": platform} @app.get("/api/products", response_model=List[Product]) async def get_products(db: Session = Depends(get_db)): """获取所有追踪的商品""" products = db.query(ProductORM).all() return products @app.get("/api/price-history/{product_id}") async def get_price_history(product_id: int, db: Session = Depends(get_db)): """获取商品价格历史""" history = db.query(PriceHistoryORM).filter( PriceHistoryORM.product_id == product_id ).order_by(PriceHistoryORM.timestamp.desc()).limit(100).all() return [{ "price": h.price, "timestamp": h.timestamp, "availability": h.availability } for h in history] @app.post("/api/check-now") async def check_now(): """立即执行一次检查""" tasks = scheduler.tracking_tasks.copy() if tasks: results = await scheduler.process_batch(tasks) return {"checked": len(results), "message": "Check completed"} return {"message": "No products to check"} async def main(): """主函数""" # 示例:添加一些初始追踪任务 scheduler.add_tracking_task( "https://www.amazon.com/dp/B08N5WRWNW", # 示例商品 Platform.AMAZON ) scheduler.add_tracking_task( "https://item.taobao.com/item.htm?id=123456789", # 示例商品 Platform.TAOBAO ) # 启动监控循环 monitoring_task = asyncio.create_task( scheduler.run_continuous_monitoring(interval_hours=settings.CHECK_INTERVAL_HOURS) ) # 启动Web服务器 config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info") server = uvicorn.Server(config) # 同时运行监控和Web服务器 await asyncio.gather( monitoring_task, server.serve() ) if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) asyncio.run(main())

高级功能扩展

1. 机器学习价格预测

python

# alerts/predictor.py from sklearn.linear_model import LinearRegression from sklearn.preprocessing import PolynomialFeatures import numpy as np from typing import List, Optional from datetime import datetime, timedelta class PricePredictor: def __init__(self): self.model = LinearRegression() def predict_future_prices(self, historical_prices: List[float], days_ahead: int = 7) -> List[float]: """预测未来价格""" if len(historical_prices) < 10: return [] # 准备特征矩阵 X = np.arange(len(historical_prices)).reshape(-1, 1) y = np.array(historical_prices) # 添加多项式特征 poly = PolynomialFeatures(degree=2) X_poly = poly.fit_transform(X) # 训练模型 self.model.fit(X_poly, y) # 预测未来价格 future_X = np.arange(len(historical_prices), len(historical_prices) + days_ahead).reshape(-1, 1) future_X_poly = poly.transform(future_X) predictions = self.model.predict(future_X_poly) return predictions.tolist()

2. 代理池集成

python

# utils/proxy_manager.py import aiohttp import asyncio from typing import List, Optional import random class ProxyManager: def __init__(self, proxy_urls: List[str] = None): self.proxies = proxy_urls or [] self.current_index = 0 def get_random_proxy(self) -> Optional[str]: """获取随机代理""" if not self.proxies: return None return random.choice(self.proxies) def get_next_proxy(self) -> Optional[str]: """轮询获取代理""" if not self.proxies: return None proxy = self.proxies[self.current_index] self.current_index = (self.current_index + 1) % len(self.proxies) return proxy async def validate_proxy(self, proxy_url: str) -> bool: """验证代理可用性""" try: async with aiohttp.ClientSession() as session: async with session.get( "https://httpbin.org/ip", proxy=proxy_url, timeout=10 ) as response: return response.status == 200 except: return False async def refresh_proxies(self, api_url: str): """从代理API刷新代理列表""" try: async with aiohttp.ClientSession() as session: async with session.get(api_url, timeout=30) as response: if response.status == 200: data = await response.json() self.proxies = data.get("proxies", []) except Exception as e: print(f"Error refreshing proxies: {e}")

部署与优化建议

1. 部署方案

  • 本地运行:适合个人使用,直接运行main.py

  • Docker容器化:便于部署和扩展

  • 云服务器部署:使用AWS、Google Cloud或阿里云

  • Serverless架构:使用AWS Lambda或Google Cloud Functions

2. 性能优化

  • 使用连接池管理数据库连接

  • 实现缓存机制减少重复请求

  • 分布式爬虫架构处理大规模监控

  • 使用CDN存储静态资源

3. 反爬虫策略应对

  • 随机化请求间隔

  • 使用住宅代理IP

  • 模拟人类浏览行为

  • 定期更换User-Agent

  • 处理验证码(使用OCR或第三方服务)

4. 数据可视化

  • 使用Matplotlib或Plotly生成价格趋势图

  • 构建实时仪表板显示价格变化

  • 发送邮件或短信通知

法律与伦理考虑

  1. 遵守robots.txt:尊重网站的爬虫政策

  2. 限制请求频率:避免对目标网站造成负担

  3. 数据使用限制:仅用于个人或研究目的

  4. 用户隐私保护:不收集用户个人信息

  5. 遵守服务条款:遵守各电商平台的使用条款

结论

本文详细介绍了一个完整的跨平台商品价格追踪系统的设计与实现。通过结合现代Python技术栈,包括Playwright、异步编程、SQLAlchemy等工具,我们构建了一个强大、可扩展的价格监控解决方案。系统不仅能够实时追踪亚马逊、淘宝等主流电商平台的商品价格,还提供了数据分析、价格预测和智能预警功能。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 12:55:54

Kernel十年演进(2015–2025)

Kernel十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年Kernel还是“传统Linux单核通用RTOS工业嵌入式”的分散时代&#xff0c;2025年已进化成“中国自研微内核硬实时<1μs大模型原生集成量子级容错自愈具身智能专用”的终极操作系统底层&#x…

作者头像 李华
网站建设 2026/4/23 15:25:42

FSDP(Fully Sharded Data Parallel)十年演进(2015–2025)

FSDP&#xff08;Fully Sharded Data Parallel&#xff09;十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; FSDP从2020年PyTorch初步引入的“ZeRO-3分布式训练内存优化技术”&#xff0c;到2025年已进化成“万亿级多模态大模型训练标配量子混合精度自进化…

作者头像 李华
网站建设 2026/4/23 13:12:11

减速器十年演进(2015–2025)

减速器十年演进&#xff08;2015–2025&#xff09; 一句话总论&#xff1a; 2015年减速器还是“RV/谐波进口垄断刚性高背隙万元级成本”的工业时代&#xff0c;2025年已进化成“国产超薄谐波/行星滚柱零背隙纳米级精度一体化关节量子级自愈补偿”的具身智能时代&#xff0c;中…

作者头像 李华
网站建设 2026/4/23 9:56:22

AUTOSAR基础软件层实时操作系统集成架构图分析

AUTOSAR基础软件层实时操作系统集成架构解析从一个刹车控制说起&#xff1a;为什么汽车ECU离不开RTOS&#xff1f;设想这样一个场景&#xff1a;你驾驶的电动汽车正在高速公路上巡航&#xff0c;前方车辆突然急刹。你的车必须在20毫秒内完成雷达目标识别、决策判断&#xff0c;…

作者头像 李华
网站建设 2026/4/22 16:49:30

自定义输出目录output_dir:管理多个LoRA训练任务的最佳实践

自定义输出目录 output_dir&#xff1a;管理多个 LoRA 训练任务的最佳实践 在 AIGC&#xff08;生成式人工智能&#xff09;的实践中&#xff0c;一个看似微不足道的配置项——output_dir&#xff0c;往往决定了整个训练流程是井然有序&#xff0c;还是混乱不堪。 设想这样一个…

作者头像 李华
网站建设 2026/4/23 2:15:12

标注准确性影响评估:错误prompt导致生成偏差的案例分析

标注准确性影响评估&#xff1a;错误prompt导致生成偏差的案例分析 在如今AIGC工具链日趋成熟的背景下&#xff0c;许多开发者只需几行命令就能训练出一个风格化LoRA模型——上传图片、运行脚本、等待几小时后得到可直接在WebUI中调用的.safetensors文件。整个过程看起来高效且…

作者头像 李华