news 2026/4/22 19:42:55

Python爬虫实战:利用异步技术与反爬策略高效爬取新浪新闻

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python爬虫实战:利用异步技术与反爬策略高效爬取新浪新闻

引言

在当今信息爆炸的时代,新闻数据对于市场分析、舆情监测、自然语言处理等领域具有重要价值。新浪新闻作为中国领先的新闻门户网站,每天发布海量的新闻资讯。本文将详细介绍如何使用Python最新技术栈构建一个高效、稳定、可扩展的新浪新闻爬虫系统。

技术选型与优势

核心框架

  • aiohttp/asyncio:异步HTTP客户端,比传统requests库快5-10倍

  • BeautifulSoup4:HTML解析库,支持多种解析器

  • Playwright:现代浏览器自动化工具,可处理动态加载内容

  • Redis:分布式缓存,存储爬虫状态和去重数据

  • MongoDB:文档数据库,存储非结构化新闻数据

反爬策略应对

  • 用户代理轮换

  • IP代理池集成

  • 请求频率控制

  • JavaScript渲染处理

  • 验证码识别备用方案

完整爬虫系统架构

python

""" 新浪新闻高效爬虫系统 版本:2.0 特性:异步处理、智能反爬、数据持久化、实时监控 """ import asyncio import aiohttp import async_timeout from bs4 import BeautifulSoup from urllib.parse import urljoin, urlparse import logging from datetime import datetime import json import hashlib from typing import List, Dict, Optional, Set import redis from pymongo import MongoClient from pymongo.errors import DuplicateKeyError import random import time from playwright.async_api import async_playwright import pandas as pd from dataclasses import dataclass, asdict from enum import Enum import zlib import pickle # 配置日志系统 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('sina_news_crawler.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 数据模型定义 @dataclass class NewsArticle: """新闻文章数据模型""" url: str title: str content: str publish_time: str source: str category: str keywords: List[str] authors: List[str] summary: str images: List[str] videos: List[str] comments_count: int read_count: int crawl_time: datetime html_hash: str def to_dict(self): return {k: str(v) for k, v in asdict(self).items()} class NewsCategory(Enum): """新闻分类枚举""" DOMESTIC = "国内" INTERNATIONAL = "国际" FINANCE = "财经" TECH = "科技" SPORTS = "体育" ENTERTAINMENT = "娱乐" MILITARY = "军事" HEALTH = "健康" class ProxyPool: """IP代理池管理器""" def __init__(self, redis_client): self.redis = redis_client self.proxy_key = "sina:proxy_pool" async def get_random_proxy(self): """获取随机代理""" proxies = self.redis.smembers(self.proxy_key) return random.choice(list(proxies)) if proxies else None async def add_proxy(self, proxy): """添加代理到池中""" self.redis.sadd(self.proxy_key, proxy) async def remove_proxy(self, proxy): """移除失效代理""" self.redis.srem(self.proxy_key, proxy) class UserAgentManager: """用户代理管理器""" def __init__(self): self.user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/121.0' ] def get_random_ua(self): return random.choice(self.user_agents) class SinaNewsCrawler: """新浪新闻主爬虫类""" def __init__(self, config: Dict): # 基础配置 self.base_url = "https://news.sina.com.cn/" self.max_concurrent = config.get('max_concurrent', 10) self.request_timeout = config.get('request_timeout', 30) self.max_retries = config.get('max_retries', 3) # 初始化组件 self.ua_manager = UserAgentManager() self.redis_client = redis.Redis( host=config.get('redis_host', 'localhost'), port=config.get('redis_port', 6379), db=config.get('redis_db', 0), decode_responses=True ) self.proxy_pool = ProxyPool(self.redis_client) # MongoDB连接 mongo_client = MongoClient( config.get('mongo_uri', 'mongodb://localhost:27017/'), maxPoolSize=50 ) self.db = mongo_client[config.get('mongo_db', 'sina_news')] self.articles_collection = self.db.articles self.create_indexes() # 状态跟踪 self.visited_urls = set() self.crawl_stats = { 'total_crawled': 0, 'success': 0, 'failed': 0, 'start_time': datetime.now() } # 异步信号量控制并发 self.semaphore = asyncio.Semaphore(self.max_concurrent) # 分类URL映射 self.category_urls = { NewsCategory.DOMESTIC: "https://news.sina.com.cn/china/", NewsCategory.INTERNATIONAL: "https://news.sina.com.cn/world/", NewsCategory.FINANCE: "https://finance.sina.com.cn/", NewsCategory.TECH: "https://tech.sina.com.cn/", NewsCategory.SPORTS: "https://sports.sina.com.cn/", NewsCategory.ENTERTAINMENT: "https://ent.sina.com.cn/", NewsCategory.MILITARY: "https://mil.news.sina.com.cn/", NewsCategory.HEALTH: "https://news.sina.com.cn/health/" } def create_indexes(self): """创建数据库索引""" self.articles_collection.create_index("url", unique=True) self.articles_collection.create_index("publish_time") self.articles_collection.create_index("category") self.articles_collection.create_index([("title", "text"), ("content", "text")]) async def fetch_with_retry(self, session, url: str, use_proxy: bool = True) -> Optional[str]: """带重试机制的异步请求""" headers = { 'User-Agent': self.ua_manager.get_random_ua(), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', } proxy = None if use_proxy: proxy = await self.proxy_pool.get_random_proxy() for attempt in range(self.max_retries): try: async with self.semaphore: async with async_timeout.timeout(self.request_timeout): async with session.get( url, headers=headers, proxy=f"http://{proxy}" if proxy else None, ssl=False ) as response: if response.status == 200: html = await response.text() await asyncio.sleep(random.uniform(1, 3)) # 礼貌性延迟 return html elif response.status == 403: logger.warning(f"访问被拒绝: {url}") if proxy: await self.proxy_pool.remove_proxy(proxy) await asyncio.sleep(5) # 被拒绝后等待更长时间 else: logger.error(f"HTTP错误 {response.status}: {url}") except Exception as e: logger.error(f"请求失败 (尝试 {attempt + 1}/{self.max_retries}): {e}") if attempt < self.max_retries - 1: await asyncio.sleep(2 ** attempt) # 指数退避 return None async def fetch_with_playwright(self, url: str) -> Optional[str]: """使用Playwright处理动态加载内容""" try: async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent=self.ua_manager.get_random_ua(), viewport={'width': 1920, 'height': 1080} ) page = await context.new_page() await page.goto(url, wait_until='networkidle') # 滚动页面以加载动态内容 await page.evaluate(""" async () => { await new Promise((resolve) => { let totalHeight = 0; const distance = 100; const timer = setInterval(() => { const scrollHeight = document.body.scrollHeight; window.scrollBy(0, distance); totalHeight += distance; if (totalHeight >= scrollHeight) { clearInterval(timer); resolve(); } }, 100); }); } """) # 等待内容加载 await asyncio.sleep(2) html = await page.content() await browser.close() return html except Exception as e: logger.error(f"Playwright请求失败: {e}") return None def parse_news_list(self, html: str, category: NewsCategory) -> List[str]: """解析新闻列表页,提取文章URL""" soup = BeautifulSoup(html, 'lxml') urls = [] # 多种选择器覆盖不同页面结构 selectors = [ 'div.news-item h2 a', 'div.blk122 a', 'ul.list_009 li a', 'div.news-content a', 'a[href*="/doc-"]' ] for selector in selectors: links = soup.select(selector) for link in links: href = link.get('href') if href and 'sina.com.cn' in href and 'video' not in href: full_url = urljoin(self.base_url, href) if self.is_valid_news_url(full_url): urls.append(full_url) # 去重 return list(set(urls)) def is_valid_news_url(self, url: str) -> bool: """验证是否为有效的新闻URL""" patterns = [ r'/doc-.*?\.shtml$', r'news\.sina\.com\.cn/\w+/\d{4}-\d{2}-\d{2}/.*\.shtml$', r'finance\.sina\.com\.cn/.*/\d{4}-\d{2}-\d{2}/.*\.shtml$' ] import re return any(re.search(pattern, url) for pattern in patterns) def parse_news_article(self, html: str, url: str) -> Optional[NewsArticle]: """解析新闻文章详情页""" try: soup = BeautifulSoup(html, 'lxml') # 提取标题 title_elem = soup.select_one('h1.main-title') or soup.select_one('div.page-header h1') title = title_elem.text.strip() if title_elem else "无标题" # 提取发布时间 time_elem = soup.select_one('span.date') or soup.select_one('div.article-info span.time-source') publish_time = time_elem.text.strip() if time_elem else datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 提取来源 source_elem = soup.select_one('span.source') or soup.select_one('div.article-info span.source') source = source_elem.text.strip() if source_elem else "未知来源" # 提取正文内容 content_elem = soup.select_one('div.article') or soup.select_one('div#article_content') if content_elem: # 移除无关元素 for tag in content_elem.select('script, style, div.ad, div.comment'): tag.decompose() content = '\n'.join([p.text for p in content_elem.select('p')]) else: content = "" # 提取关键词 keywords_meta = soup.select_one('meta[name="keywords"]') keywords = keywords_meta['content'].split(',') if keywords_meta else [] # 提取摘要 description_meta = soup.select_one('meta[name="description"]') summary = description_meta['content'] if description_meta else "" # 提取图片 images = [img['src'] for img in soup.select('div.article img') if img.get('src')] # 生成HTML哈希用于去重 html_hash = hashlib.md5(html.encode()).hexdigest() # 判断分类 category = self.detect_category(url, soup) return NewsArticle( url=url, title=title, content=content, publish_time=publish_time, source=source, category=category, keywords=keywords, authors=[], summary=summary, images=images, videos=[], comments_count=0, read_count=0, crawl_time=datetime.now(), html_hash=html_hash ) except Exception as e: logger.error(f"解析文章失败 {url}: {e}") return None def detect_category(self, url: str, soup: BeautifulSoup) -> str: """自动检测新闻分类""" # 从URL判断 url_lower = url.lower() if 'finance' in url_lower: return NewsCategory.FINANCE.value elif 'tech' in url_lower: return NewsCategory.TECH.value elif 'sports' in url_lower: return NewsCategory.SPORTS.value elif 'ent' in url_lower: return NewsCategory.ENTERTAINMENT.value elif 'mil' in url_lower: return NewsCategory.MILITARY.value # 从页面元素判断 category_elem = soup.select_one('div.channel-path a') if category_elem: return category_elem.text.strip() return "未知" async def save_article(self, article: NewsArticle) -> bool: """保存文章到MongoDB""" try: article_dict = article.to_dict() article_dict['_id'] = article.html_hash # 使用HTML哈希作为ID去重 self.articles_collection.insert_one(article_dict) self.crawl_stats['success'] += 1 logger.info(f"保存成功: {article.title}") return True except DuplicateKeyError: logger.debug(f"重复文章已跳过: {article.title}") return False except Exception as e: logger.error(f"保存失败 {article.url}: {e}") self.crawl_stats['failed'] += 1 return False async def crawl_category(self, session, category: NewsCategory, max_pages: int = 10): """爬取特定分类的新闻""" base_url = self.category_urls[category] logger.info(f"开始爬取分类: {category.value}") for page in range(1, max_pages + 1): page_url = f"{base_url}?page={page}" if page > 1 else base_url html = await self.fetch_with_retry(session, page_url) if not html: logger.warning(f"无法获取列表页: {page_url}") continue article_urls = self.parse_news_list(html, category) logger.info(f"第{page}页找到{len(article_urls)}篇文章") # 创建文章爬取任务 tasks = [] for url in article_urls: if url not in self.visited_urls: self.visited_urls.add(url) tasks.append(self.crawl_single_article(session, url)) # 并发执行 if tasks: await asyncio.gather(*tasks, return_exceptions=True) # 避免请求过快 await asyncio.sleep(random.uniform(2, 4)) async def crawl_single_article(self, session, url: str): """爬取单篇文章""" # 检查是否已爬取 if self.redis_client.sismember("sina:crawled_urls", url): return html = await self.fetch_with_retry(session, url) if not html: # 尝试使用Playwright html = await self.fetch_with_playwright(url) if html: article = self.parse_news_article(html, url) if article: await self.save_article(article) # 标记为已爬取 self.redis_client.sadd("sina:crawled_urls", url) self.crawl_stats['total_crawled'] += 1 else: logger.warning(f"无法解析文章: {url}") else: logger.error(f"无法获取文章内容: {url}") async def crawl_hot_news(self, session, limit: int = 50): """爬取热点新闻""" hot_url = "https://news.sina.com.cn/hotnews/" html = await self.fetch_with_retry(session, hot_url) if html: soup = BeautifulSoup(html, 'lxml') # 解析热点新闻排行榜 hot_items = soup.select('div.hotnews-item a') urls = [urljoin(self.base_url, item['href']) for item in hot_items[:limit]] tasks = [self.crawl_single_article(session, url) for url in urls] await asyncio.gather(*tasks, return_exceptions=True) def print_stats(self): """打印爬虫统计信息""" duration = datetime.now() - self.crawl_stats['start_time'] stats = f""" =========== 爬虫统计 =========== 开始时间: {self.crawl_stats['start_time']} 运行时长: {duration} 总共爬取: {self.crawl_stats['total_crawled']} 成功保存: {self.crawl_stats['success']} 失败数量: {self.crawl_stats['failed']} 已访问URL: {len(self.visited_urls)} ================================ """ logger.info(stats) async def export_to_csv(self, filename: str = "sina_news.csv"): """导出数据到CSV文件""" cursor = self.articles_collection.find({}) articles = [] async for doc in cursor: articles.append(doc) if articles: df = pd.DataFrame(articles) df.to_csv(filename, index=False, encoding='utf-8-sig') logger.info(f"数据已导出到 {filename}") async def run(self): """主运行方法""" logger.info("开始新浪新闻爬虫...") # 创建aiohttp会话 connector = aiohttp.TCPConnector(limit=100, ssl=False) async with aiohttp.ClientSession(connector=connector) as session: # 创建爬取任务 tasks = [] # 爬取各分类新闻 for category in NewsCategory: tasks.append(self.crawl_category(session, category, max_pages=5)) # 爬取热点新闻 tasks.append(self.crawl_hot_news(session, limit=30)) # 执行所有任务 await asyncio.gather(*tasks, return_exceptions=True) # 打印统计信息 self.print_stats() # 导出数据 await self.export_to_csv() class DistributedCrawler: """分布式爬虫协调器(扩展功能)""" def __init__(self, config): self.redis = redis.Redis(**config['redis']) self.crawlers = [] async def add_crawler(self, crawler_config): """添加爬虫节点""" crawler = SinaNewsCrawler(crawler_config) self.crawlers.append(crawler) async def distribute_tasks(self): """分布式任务分配""" # 实现任务队列和负载均衡 pass async def main(): """主函数""" config = { 'max_concurrent': 20, 'request_timeout': 45, 'max_retries': 5, 'redis_host': 'localhost', 'redis_port': 6379, 'redis_db': 0, 'mongo_uri': 'mongodb://localhost:27017/', 'mongo_db': 'sina_news' } # 创建爬虫实例 crawler = SinaNewsCrawler(config) try: await crawler.run() except KeyboardInterrupt: logger.info("爬虫被用户中断") except Exception as e: logger.error(f"爬虫运行错误: {e}") finally: crawler.print_stats() if __name__ == "__main__": # 设置事件循环策略(Windows系统需要) if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) # 运行爬虫 asyncio.run(main())

高级功能扩展

1. 反反爬虫增强模块

python

class AntiAntiCrawler: """反反爬虫增强模块""" @staticmethod def generate_fingerprint(): """生成浏览器指纹""" pass @staticmethod def solve_captcha(image_data): """验证码识别""" pass @staticmethod def simulate_human_behavior(page): """模拟人类行为""" pass

2. 数据质量监控

python

class DataQualityMonitor: """数据质量监控器""" def check_completeness(self, article): """检查数据完整性""" required_fields = ['title', 'content', 'publish_time'] return all(getattr(article, field) for field in required_fields) def check_duplication(self, content_hash): """检查内容重复""" pass

3. 实时监控仪表盘

python

class CrawlerDashboard: """爬虫监控仪表盘""" def realtime_stats(self): """实时统计展示""" pass def alert_system(self): """异常告警系统""" pass

部署与优化建议

1. Docker容器化部署

dockerfile

FROM python:3.11-slim COPY requirements.txt . RUN pip install -r requirements.txt COPY . /app WORKDIR /app CMD ["python", "main.py"]

2. Kubernetes编排配置

yaml

apiVersion: apps/v1 kind: Deployment spec: replicas: 3 template: spec: containers: - name: sina-crawler image: sina-crawler:latest env: - name: REDIS_HOST value: "redis-service"

3. 性能优化策略

  • 使用连接池复用HTTP连接

  • 实现增量爬取机制

  • 采用布隆过滤器去重

  • 实施断点续爬功能

法律与伦理考量

  1. 遵守robots.txt:始终检查并遵守网站的爬虫协议

  2. 控制请求频率:避免对目标服务器造成过大压力

  3. 数据使用规范:仅将数据用于合法用途

  4. 版权尊重:注意新闻内容的版权限制

  5. 隐私保护:不收集个人隐私信息

结语

本文详细介绍了一个专业级的新浪新闻爬虫系统的设计与实现。该系统采用了最新的异步编程技术、智能反爬策略和分布式架构,具备高可用性、高扩展性和强健壮性。通过模块化设计和良好的代码结构,该系统可以轻松适应其他新闻网站的爬取需求。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 6:32:30

NBTExplorer:免费开源的Minecraft数据编辑神器

NBTExplorer&#xff1a;免费开源的Minecraft数据编辑神器 【免费下载链接】NBTExplorer A graphical NBT editor for all Minecraft NBT data sources 项目地址: https://gitcode.com/gh_mirrors/nb/NBTExplorer 还在为复杂的Minecraft数据文件头疼吗&#xff1f;想要轻…

作者头像 李华
网站建设 2026/4/23 13:40:22

FGO自动化革命:告别手刷疲惫,智能助手让游戏回归乐趣

FGO自动化革命&#xff1a;告别手刷疲惫&#xff0c;智能助手让游戏回归乐趣 【免费下载链接】FGO-Automata 一个FGO脚本和API フェイトグランドオーダー自動化 项目地址: https://gitcode.com/gh_mirrors/fg/FGO-Automata 还记得那些深夜刷本的日子吗&#xff1f;眼睛…

作者头像 李华
网站建设 2026/4/23 16:23:22

面向‘git commit’习惯人群传播IndexTTS开源协作文化

面向“git commit”习惯人群传播IndexTTS开源协作文化 在短视频与虚拟内容爆发的时代&#xff0c;一个创作者最头疼的问题之一是&#xff1a;语音和画面总是对不上。 你精心剪辑了一段10秒的动画&#xff0c;配上旁白却发现音频太长&#xff1b;你想让虚拟主播用“愤怒”的语…

作者头像 李华
网站建设 2026/4/23 14:59:31

3个罗技鼠标隐藏技巧让你成为PUBG压枪高手

还在为PUBG中枪口乱跳而烦恼吗&#xff1f;每次扫射时后坐力让你无法精准控制射击方向&#xff1f;别担心&#xff0c;今天我要分享几个罗技鼠标的压枪技巧&#xff0c;让你轻松掌握稳定射击的诀窍&#xff01; 【免费下载链接】logitech-pubg PUBG no recoil script for Logit…

作者头像 李华
网站建设 2026/4/23 9:57:51

真实测评!大家来看看换装最丑的大模型是哪家?

每到季度末&#xff0c;各家大模型就要开始发布新品和榜单了。从榜单上看&#xff0c;每家大模型都说自己是最能打的。到底能不能打&#xff1f;还是需要拉出来比一下。 因此&#xff0c;社区新增了AI评测实验室板块&#xff0c;专门帮大家来测试大模型能力。我们不进行那些玄…

作者头像 李华