引言:数据驱动电影时代
在当今数字化的娱乐产业中,电影评分数据已成为制片方、发行商和观众决策的重要依据。无论是IMDb的专业评分、豆瓣的社区评价,还是烂番茄的新鲜度指标,这些数据都蕴含着巨大的商业价值和分析潜力。本文将详细介绍如何使用Python最新爬虫技术构建一个高效、稳定的电影评分数据收集系统。
技术栈概览
我们将在本项目中采用以下现代化技术:
Python 3.10+:最新的Python版本提供更好的性能和新特性
Playwright:微软开发的现代化浏览器自动化工具
Asyncio:Python原生异步IO框架
BeautifulSoup4+lxml:高效HTML解析
Pydantic:数据验证与设置管理
ProxyChain:代理轮转与反反爬策略
Redis:分布式任务队列与缓存
PostgreSQL:结构化数据存储
项目架构设计
1. 核心模块规划
python
""" movie_scraper/ │ ├── config/ # 配置文件 ├── core/ # 核心功能模块 │ ├── browser/ # 浏览器控制 │ ├── parser/ # 数据解析 │ ├── storage/ # 数据存储 │ └── proxy/ # 代理管理 ├── spiders/ # 爬虫实现 ├── models/ # 数据模型 ├── middleware/ # 中间件 ├── utils/ # 工具函数 └── tasks/ # 异步任务 """
2. 数据模型设计
python
from pydantic import BaseModel, Field, validator from datetime import datetime from typing import Optional, List, Dict, Any from enum import Enum class RatingSource(str, Enum): DOUBAN = "douban" IMDB = "imdb" ROTTEN_TOMATOES = "rotten_tomatoes" MAOYAN = "maoyan" class MovieRating(BaseModel): """电影评分数据模型""" movie_id: str = Field(..., description="电影唯一标识") title: str = Field(..., description="电影标题") original_title: Optional[str] = Field(None, description="原始标题") year: Optional[int] = Field(None, description="上映年份") # 评分信息 rating: Optional[float] = Field(None, ge=0, le=10, description="评分") rating_count: Optional[int] = Field(None, ge=0, description="评分人数") rating_distribution: Optional[Dict[str, float]] = Field(None, description="评分分布") # 元数据 source: RatingSource = Field(..., description="数据来源") genres: List[str] = Field(default_factory=list, description="电影类型") directors: List[str] = Field(default_factory=list, description="导演") actors: List[str] = Field(default_factory=list, description="主演") # 爬取元信息 crawled_at: datetime = Field(default_factory=datetime.now) url: str = Field(..., description="来源URL") @validator('rating') def validate_rating(cls, v): if v is not None and (v < 0 or v > 10): raise ValueError('评分必须在0-10之间') return v class Config: json_encoders = { datetime: lambda dt: dt.isoformat() }核心爬虫实现
1. 基于Playwright的异步爬虫引擎
python
import asyncio from typing import Optional, Dict, Any from playwright.async_api import async_playwright, Browser, Page from dataclasses import dataclass import logging import random import time @dataclass class BrowserConfig: headless: bool = True proxy: Optional[str] = None user_agent: Optional[str] = None viewport: Dict[str, int] = None timeout: int = 30000 def __post_init__(self): if self.viewport is None: self.viewport = {"width": 1920, "height": 1080} class PlaywrightScraper: """基于Playwright的高级爬虫类""" def __init__(self, config: BrowserConfig = None): self.config = config or BrowserConfig() self.logger = logging.getLogger(__name__) self.browser: Optional[Browser] = None self.context = None self.playwright = None async def __aenter__(self): await self.init_browser() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def init_browser(self): """初始化浏览器实例""" self.playwright = await async_playwright().start() launch_options = { "headless": self.config.headless, "timeout": self.config.timeout, } if self.config.proxy: launch_options["proxy"] = {"server": self.config.proxy} # 支持多种浏览器 browser_type = self.playwright.chromium # 可切换为 firefox 或 webkit self.browser = await browser_type.launch(**launch_options) # 创建浏览器上下文 context_options = { "viewport": self.config.viewport, "user_agent": self.config.user_agent or self._get_random_ua(), "ignore_https_errors": True, } self.context = await self.browser.new_context(**context_options) # 添加反爬对抗脚本 await self._add_stealth_script() async def _add_stealth_script(self): """添加反检测脚本""" stealth_script = """ // 覆盖webdriver属性 Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); // 覆盖plugins属性 Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] }); // 覆盖languages属性 Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh', 'en'] }); // 隐藏自动化特征 window.chrome = { runtime: {} }; // 覆盖permissions API const originalQuery = window.navigator.permissions.query; window.navigator.permissions.query = (parameters) => ( parameters.name === 'notifications' ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters) ); """ await self.context.add_init_script(stealth_script) def _get_random_ua(self) -> str: """获取随机用户代理""" user_agents = [ # Chrome Windows "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", # Chrome Mac "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", # Firefox "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/120.0", ] return random.choice(user_agents) async def fetch_page(self, url: str, **kwargs) -> Page: """获取页面""" page = await self.context.new_page() try: # 设置请求头 headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", } # 随机延迟,模拟人类行为 await asyncio.sleep(random.uniform(1, 3)) # 发起请求 await page.set_extra_http_headers(headers) response = await page.goto(url, **{ "wait_until": "networkidle", "timeout": self.config.timeout, **kwargs }) if not response or not response.ok: self.logger.warning(f"页面加载失败: {url}, 状态码: {response.status if response else '无响应'}") # 随机滚动,模拟浏览行为 await self._simulate_human_behavior(page) return page except Exception as e: self.logger.error(f"获取页面失败 {url}: {str(e)}") await page.close() raise async def _simulate_human_behavior(self, page: Page): """模拟人类浏览行为""" # 随机滚动 scroll_steps = random.randint(3, 8) for _ in range(scroll_steps): scroll_distance = random.randint(200, 800) await page.evaluate(f"window.scrollBy(0, {scroll_distance})") await asyncio.sleep(random.uniform(0.5, 2)) # 随机鼠标移动 await page.mouse.move( random.randint(100, 500), random.randint(100, 500) ) await asyncio.sleep(random.uniform(0.5, 1)) async def close(self): """关闭资源""" if self.context: await self.context.close() if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop()2. 豆瓣电影评分爬虫实现
python
import re from bs4 import BeautifulSoup from urllib.parse import urljoin, urlparse from typing import List, Dict, Any import json import hashlib class DoubanMovieSpider: """豆瓣电影评分爬虫""" BASE_URL = "https://movie.douban.com" def __init__(self, scraper: PlaywrightScraper): self.scraper = scraper self.logger = logging.getLogger(__name__) async def get_movie_rating(self, movie_id: str) -> Optional[MovieRating]: """获取单部电影评分数据""" url = f"{self.BASE_URL}/subject/{movie_id}/" try: page = await self.scraper.fetch_page(url) content = await page.content() # 使用BeautifulSoup解析 soup = BeautifulSoup(content, 'lxml') # 提取电影信息 movie_data = self._parse_movie_page(soup, url) if movie_data: # 转换为标准模型 return MovieRating( movie_id=movie_id, title=movie_data.get('title', ''), original_title=movie_data.get('original_title'), year=movie_data.get('year'), rating=movie_data.get('rating'), rating_count=movie_data.get('rating_count'), rating_distribution=movie_data.get('rating_distribution'), source=RatingSource.DOUBAN, genres=movie_data.get('genres', []), directors=movie_data.get('directors', []), actors=movie_data.get('actors', []), url=url ) except Exception as e: self.logger.error(f"爬取电影失败 {movie_id}: {str(e)}") return None def _parse_movie_page(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]: """解析电影页面""" try: # 电影标题 title_element = soup.find("span", property="v:itemreviewed") title = title_element.text if title_element else "" # 原始标题 original_title_element = soup.find("span", class_="pl", text=re.compile("原名")) original_title = None if original_title_element: original_title = original_title_element.next_sibling.strip() if original_title_element.next_sibling else None # 上映年份 year_element = soup.find("span", class_="year") year = None if year_element: year_match = re.search(r'(\d{4})', year_element.text) year = int(year_match.group(1)) if year_match else None # 评分信息 rating_element = soup.find("strong", class_="ll rating_num") rating = float(rating_element.text.strip()) if rating_element else None # 评分人数 rating_count_element = soup.find("span", property="v:votes") rating_count = int(rating_count_element.text.strip()) if rating_count_element else None # 评分分布 rating_distribution = {} distribution_elements = soup.find_all("span", class_="rating_per") for i, elem in enumerate(distribution_elements[:5]): rating_distribution[f"{5-i}星"] = elem.text.strip() # 电影类型 genres = [] genre_elements = soup.find_all("span", property="v:genre") for elem in genre_elements: genres.append(elem.text.strip()) # 导演信息 directors = [] director_elements = soup.find_all("a", rel="v:directedBy") for elem in director_elements: directors.append(elem.text.strip()) # 演员信息(前5位) actors = [] actor_elements = soup.find_all("a", rel="v:starring")[:5] for elem in actor_elements: actors.append(elem.text.strip()) return { "title": title, "original_title": original_title, "year": year, "rating": rating, "rating_count": rating_count, "rating_distribution": rating_distribution, "genres": genres, "directors": directors, "actors": actors, } except Exception as e: self.logger.error(f"解析页面失败: {str(e)}") return {} async def search_movies(self, keyword: str, limit: int = 20) -> List[Dict[str, Any]]: """搜索电影""" search_url = f"{self.BASE_URL}/j/subject_suggest" try: page = await self.scraper.fetch_page(search_url) # 构造搜索请求 search_params = { "q": keyword, "format": "json" } # 使用API搜索 api_url = f"{search_url}?q={keyword}" page = await self.scraper.fetch_page(api_url) content = await page.text_content() results = json.loads(content) if content else [] return results[:limit] except Exception as e: self.logger.error(f"搜索电影失败 {keyword}: {str(e)}") return []3. 异步任务调度系统
python
import asyncio from typing import List, Optional, Callable from concurrent.futures import ThreadPoolExecutor import redis import pickle import backoff from datetime import datetime, timedelta import uuid class AsyncTaskScheduler: """异步任务调度器""" def __init__(self, redis_url: str = "redis://localhost:6379/0", max_workers: int = 10): self.redis_client = redis.from_url(redis_url) self.executor = ThreadPoolExecutor(max_workers=max_workers) self.logger = logging.getLogger(__name__) self.running_tasks = {} async def schedule_task(self, func: Callable, *args, **kwargs) -> str: """调度异步任务""" task_id = str(uuid.uuid4()) # 将任务信息存储到Redis task_info = { "task_id": task_id, "func_name": func.__name__, "args": pickle.dumps(args), "kwargs": pickle.dumps(kwargs), "status": "pending", "created_at": datetime.now().isoformat() } self.redis_client.hset(f"task:{task_id}", mapping=task_info) self.redis_client.lpush("task_queue", task_id) # 异步执行任务 asyncio.create_task(self._execute_task(task_id, func, *args, **kwargs)) return task_id @backoff.on_exception(backoff.expo, Exception, max_tries=3) async def _execute_task(self, task_id: str, func: Callable, *args, **kwargs): """执行任务(带重试机制)""" try: # 更新任务状态 self.redis_client.hset(f"task:{task_id}", "status", "running") self.redis_client.hset(f"task:{task_id}", "started_at", datetime.now().isoformat()) # 执行任务 if asyncio.iscoroutinefunction(func): result = await func(*args, **kwargs) else: # 非异步函数在线程池中执行 result = await asyncio.get_event_loop().run_in_executor( self.executor, lambda: func(*args, **kwargs) ) # 更新任务状态和结果 task_info = { "status": "completed", "completed_at": datetime.now().isoformat(), "result": pickle.dumps(result) if result else None, "success": "true" } self.redis_client.hset(f"task:{task_id}", mapping=task_info) self.logger.info(f"任务 {task_id} 执行成功") except Exception as e: self.logger.error(f"任务 {task_id} 执行失败: {str(e)}") # 更新失败状态 task_info = { "status": "failed", "completed_at": datetime.now().isoformat(), "error": str(e), "success": "false" } self.redis_client.hset(f"task:{task_id}", mapping=task_info)4. 配置文件管理
python
import os from typing import Dict, Any, Optional from pydantic import BaseSettings, Field from dotenv import load_dotenv load_dotenv() class ScraperConfig(BaseSettings): """爬虫配置""" # 浏览器配置 headless: bool = Field(True, env="HEADLESS_MODE") browser_timeout: int = Field(30000, env="BROWSER_TIMEOUT") # 代理配置 proxy_enabled: bool = Field(False, env="PROXY_ENABLED") proxy_list: List[str] = Field([], env="PROXY_LIST") # 请求配置 request_delay: float = Field(1.0, env="REQUEST_DELAY") max_retries: int = Field(3, env="MAX_RETRIES") # 数据库配置 db_url: str = Field("postgresql://user:password@localhost/movie_ratings", env="DATABASE_URL") redis_url: str = Field("redis://localhost:6379/0", env="REDIS_URL") # 并发配置 max_concurrent_tasks: int = Field(5, env="MAX_CONCURRENT_TASKS") # 爬取限制 rate_limit_per_minute: int = Field(30, env="RATE_LIMIT_PER_MINUTE") class Config: env_file = ".env" env_file_encoding = "utf-8"5. 主程序入口
python
import asyncio import logging from contextlib import asynccontextmanager from typing import List, Optional import asyncpg from tenacity import retry, stop_after_attempt, wait_exponential logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) class MovieRatingScraper: """电影评分爬虫主程序""" def __init__(self, config: ScraperConfig): self.config = config self.logger = logging.getLogger(__name__) self.scraper: Optional[PlaywrightScraper] = None self.db_pool: Optional[asyncpg.Pool] = None self.scheduler: Optional[AsyncTaskScheduler] = None @asynccontextmanager async def context(self): """上下文管理器""" try: await self.setup() yield self finally: await self.cleanup() async def setup(self): """初始化设置""" # 初始化数据库连接池 self.db_pool = await asyncpg.create_pool(self.config.db_url) # 初始化爬虫 browser_config = BrowserConfig( headless=self.config.headless, timeout=self.config.browser_timeout ) self.scraper = PlaywrightScraper(browser_config) # 初始化任务调度器 self.scheduler = AsyncTaskScheduler( redis_url=self.config.redis_url, max_workers=self.config.max_concurrent_tasks ) # 创建数据库表 await self._create_tables() self.logger.info("爬虫初始化完成") async def _create_tables(self): """创建数据库表""" async with self.db_pool.acquire() as conn: await conn.execute(''' CREATE TABLE IF NOT EXISTS movie_ratings ( id SERIAL PRIMARY KEY, movie_id VARCHAR(100) NOT NULL, title VARCHAR(500) NOT NULL, original_title VARCHAR(500), year INTEGER, rating DECIMAL(3,1), rating_count INTEGER, rating_distribution JSONB, source VARCHAR(50) NOT NULL, genres TEXT[], directors TEXT[], actors TEXT[], crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, url TEXT NOT NULL, UNIQUE(movie_id, source) ); CREATE INDEX IF NOT EXISTS idx_movie_rating_source ON movie_ratings(source, crawled_at); CREATE INDEX IF NOT EXISTS idx_movie_rating_score ON movie_ratings(rating DESC, rating_count DESC); ''') @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) async def save_rating(self, rating: MovieRating): """保存评分数据到数据库""" async with self.db_pool.acquire() as conn: await conn.execute(''' INSERT INTO movie_ratings (movie_id, title, original_title, year, rating, rating_count, rating_distribution, source, genres, directors, actors, url) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT (movie_id, source) DO UPDATE SET rating = EXCLUDED.rating, rating_count = EXCLUDED.rating_count, rating_distribution = EXCLUDED.rating_distribution, crawled_at = CURRENT_TIMESTAMP ''', rating.movie_id, rating.title, rating.original_title, rating.year, rating.rating, rating.rating_count, rating.rating_distribution, rating.source.value, rating.genres, rating.directors, rating.actors, rating.url ) async def scrape_movie_batch(self, movie_ids: List[str], source: RatingSource): """批量爬取电影评分""" tasks = [] if source == RatingSource.DOUBAN: spider = DoubanMovieSpider(self.scraper) # 使用信号量控制并发 semaphore = asyncio.Semaphore(self.config.max_concurrent_tasks) async def limited_task(movie_id): async with semaphore: try: # 随机延迟,避免请求过于频繁 await asyncio.sleep(self.config.request_delay * random.uniform(0.5, 1.5)) rating = await spider.get_movie_rating(movie_id) if rating: await self.save_rating(rating) self.logger.info(f"成功爬取电影: {rating.title} ({rating.rating})") return rating except Exception as e: self.logger.error(f"爬取电影失败 {movie_id}: {str(e)}") return None # 创建任务 for movie_id in movie_ids: task = asyncio.create_task(limited_task(movie_id)) tasks.append(task) # 等待所有任务完成 results = await asyncio.gather(*tasks, return_exceptions=True) successful = sum(1 for r in results if r and not isinstance(r, Exception)) self.logger.info(f"批量爬取完成: 成功 {successful}/{len(movie_ids)}") return results async def cleanup(self): """清理资源""" if self.scraper: await self.scraper.close() if self.db_pool: await self.db_pool.close() if self.scheduler: self.scheduler.executor.shutdown() self.logger.info("资源清理完成") async def main(): """主函数""" # 加载配置 config = ScraperConfig() # 示例电影ID列表(豆瓣) douban_movie_ids = [ "1292052", # 肖申克的救赎 "1291546", # 霸王别姬 "1292720", # 阿甘正传 "1295644", # 这个杀手不太冷 "1292722", # 泰坦尼克号 ] async with MovieRatingScraper(config) as scraper: # 批量爬取豆瓣电影评分 await scraper.scrape_movie_batch(douban_movie_ids, RatingSource.DOUBAN) # 持续监控新电影 # await scraper.start_monitoring() if __name__ == "__main__": # 运行爬虫 asyncio.run(main())高级特性与优化
1. 反反爬策略增强
python
class AntiAntiScraper(PlaywrightScraper): """增强反反爬能力的爬虫""" async def bypass_cloudflare(self, page): """绕过Cloudflare验证""" # 检测Cloudflare if "cloudflare" in await page.title().lower(): self.logger.info("检测到Cloudflare,尝试绕过...") # 等待挑战页面 await page.wait_for_selector("#challenge-form", timeout=10000) # 模拟人类解决挑战 await asyncio.sleep(random.uniform(5, 10)) # 尝试点击验证按钮 try: await page.click("input[type='submit']", delay=random.randint(100, 500)) await asyncio.sleep(random.uniform(3, 7)) except: pass async def rotate_fingerprint(self): """轮换浏览器指纹""" # 修改WebGL指纹 await self.context.add_init_script(""" const getParameter = WebGLRenderingContext.prototype.getParameter; WebGLRenderingContext.prototype.getParameter = function(parameter) { if (parameter === 37445) { return 'Intel Inc.'; } if (parameter === 37446) { return 'Intel Iris OpenGL Engine'; } return getParameter.apply(this, arguments); }; """) # 修改时区 await self.context.add_init_script(""" Object.defineProperty(Intl.DateTimeFormat.prototype, 'resolvedOptions', { value: function() { const result = Reflect.apply(this, this, arguments); result.timeZone = 'Asia/Shanghai'; return result; } }); """)2. 数据质量监控
python
class DataQualityMonitor: """数据质量监控器""" @staticmethod def validate_rating_data(rating: MovieRating) -> bool: """验证评分数据质量""" checks = [ (rating.title and len(rating.title) > 0, "标题为空"), (rating.rating is None or 0 <= rating.rating <= 10, "评分超出范围"), (rating.rating_count is None or rating.rating_count >= 0, "评分人数为负"), (len(rating.genres) <= 20, "类型过多"), (len(rating.directors) <= 10, "导演过多"), (len(rating.actors) <= 30, "演员过多"), ] failed_checks = [reason for (check, reason) in checks if not check] if failed_checks: logging.warning(f"数据验证失败: {failed_checks}") return False return True @staticmethod def detect_anomalies(ratings: List[MovieRating]) -> Dict[str, Any]: """检测数据异常""" if not ratings: return {} scores = [r.rating for r in ratings if r.rating is not None] if len(scores) < 2: return {} import statistics mean = statistics.mean(scores) stdev = statistics.stdev(scores) if len(scores) > 1 else 0 anomalies = [] for rating in ratings: if rating.rating and abs(rating.rating - mean) > 3 * stdev: anomalies.append({ "movie_id": rating.movie_id, "title": rating.title, "rating": rating.rating, "deviation": rating.rating - mean }) return { "mean_rating": mean, "std_deviation": stdev, "anomaly_count": len(anomalies), "anomalies": anomalies }3. 分布式爬虫扩展
python
import ray from ray import serve from ray.util import ActorPool @ray.remote class DistributedScraper: """分布式爬虫节点""" def __init__(self, node_id: str): self.node_id = node_id self.scraper = None self.logger = logging.getLogger(f"DistributedScraper-{node_id}") async def initialize(self): """初始化节点""" from playwright.async_api import async_playwright playwright = await async_playwright().start() browser = await playwright.chromium.launch(headless=True) self.scraper = browser async def scrape_url(self, url: str) -> Dict[str, Any]: """爬取URL""" if not self.scraper: await self.initialize() page = await self.scraper.new_page() try: await page.goto(url, wait_until="networkidle") content = await page.content() return { "url": url, "content": content[:10000], # 限制大小 "node_id": self.node_id, "timestamp": datetime.now().isoformat() } finally: await page.close() async def shutdown(self): """关闭节点""" if self.scraper: await self.scraper.close() class DistributedCrawlerSystem: """分布式爬虫系统""" def __init__(self, num_nodes: int = 4): self.num_nodes = num_nodes self.nodes = [] # 初始化Ray ray.init(ignore_reinit_error=True) async def start(self): """启动分布式系统""" # 创建爬虫节点 self.nodes = [DistributedScraper.remote(f"node-{i}") for i in range(self.num_nodes)] # 初始化所有节点 init_tasks = [node.initialize.remote() for node in self.nodes] await asyncio.gather(*init_tasks) print(f"分布式爬虫系统启动,共 {self.num_nodes} 个节点") async def distribute_tasks(self, urls: List[str]) -> List[Dict[str, Any]]: """分发任务""" # 创建Actor池 pool = ActorPool(self.nodes) # 并行执行任务 results = [] for url, result in zip(urls, pool.map(lambda a, u: a.scrape_url.remote(u), urls)): results.append(await result) return results async def stop(self): """停止系统""" shutdown_tasks = [node.shutdown.remote() for node in self.nodes] await asyncio.gather(*shutdown_tasks) ray.shutdown()部署与监控
Docker部署配置
dockerfile
# Dockerfile FROM python:3.10-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ wget \ gnupg \ libnss3 \ libxss1 \ libasound2 \ libxtst6 \ libgtk-3-0 \ libgbm1 \ libxshmfence1 \ && rm -rf /var/lib/apt/lists/* # 安装Chrome RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ && echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list \ && apt-get update \ && apt-get install -y google-chrome-stable \ && rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 创建非root用户 RUN useradd -m -u 1000 scraper \ && chown -R scraper:scraper /app USER scraper # 启动命令 CMD ["python", "main.py"]
监控与日志
python
import structlog from prometheus_client import start_http_server, Counter, Gauge, Histogram # 结构化日志 structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer() ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), cache_logger_on_first_use=True, ) # Prometheus指标 REQUEST_COUNT = Counter('scraper_requests_total', 'Total requests', ['source', 'status']) REQUEST_DURATION = Histogram('scraper_request_duration_seconds', 'Request duration', ['source']) ACTIVE_TASKS = Gauge('scraper_active_tasks', 'Active scraping tasks') SUCCESS_RATE = Gauge('scraper_success_rate', 'Success rate of scraping tasks') class MonitoredScraper: """带监控的爬虫""" def __init__(self): self.logger = structlog.get_logger() async def monitored_scrape(self, url: str, source: str): """带监控的爬取方法""" start_time = time.time() ACTIVE_TASKS.inc() try: # 执行爬取 result = await self._scrape(url) # 记录成功 REQUEST_COUNT.labels(source=source, status='success').inc() REQUEST_DURATION.labels(source=source).observe(time.time() - start_time) self.logger.info("scrape_success", url=url, source=source) return result except Exception as e: # 记录失败 REQUEST_COUNT.labels(source=source, status='error').inc() self.logger.error("scrape_error", url=url, source=source, error=str(e)) raise finally: ACTIVE_TASKS.dec()结语
本文详细介绍了如何使用Python最新技术构建一个高效、稳定的电影评分数据收集系统。我们从基础爬虫实现开始,逐步深入到反反爬策略、异步并发、分布式扩展等高级主题,最终构建了一个功能完备的爬虫系统。
关键要点总结:
现代化技术栈:使用Playwright、Asyncio等最新技术,提高了爬虫的效率和稳定性
反反爬策略:通过浏览器指纹修改、行为模拟等技术有效应对网站反爬机制
可扩展架构:采用模块化设计,便于添加新的数据源和功能
数据质量控制:实现了数据验证和质量监控,确保数据准确性
生产就绪:提供了完整的部署、监控和错误处理方案
未来发展建议:
机器学习集成:使用ML模型识别网站结构变化,自动调整解析策略
实时数据处理:集成流处理框架,实现评分数据的实时分析和可视化
多源数据融合:整合多个数据源的评分,构建更全面的电影评价体系
合规性增强:添加robots.txt解析、爬取速率控制等合规功能