news 2026/4/23 15:20:43

基于异步技术与智能解析的跨平台房源数据聚合python爬虫实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
基于异步技术与智能解析的跨平台房源数据聚合python爬虫实战

一、引言:房源信息聚合的重要性与挑战

在当今数字化房地产市场中,房源信息分散在多个平台如链家、安居客、贝壳等,用户需要同时浏览多个网站才能获得全面信息。本文介绍如何使用Python最新技术构建一个高效、稳定的跨平台房源信息聚合系统,通过智能解析和异步处理技术,实现多平台房源数据的自动化采集与整合。

二、技术栈选择

  • 异步框架:aiohttp+asyncio- 实现高并发数据抓取

  • 解析引擎:parsel+BeautifulSoup4- 支持CSS和XPath混合选择器

  • 浏览器自动化:playwright- 处理动态渲染页面

  • 数据存储:SQLAlchemy+Alembic- ORM与数据库迁移

  • 反反爬策略: 代理池、请求指纹伪装、浏览器特征模拟

  • 部署监控:scrapy+scrapy-playwright- 生产级爬虫架构

三、系统架构设计

3.1 整体架构

text

数据源层(链家/安居客/贝壳) → 爬虫调度层 → 解析层 → 数据清洗层 → 存储层 → API服务层

3.2 模块划分

  • 代理管理模块: 维护动态代理池

  • 请求管理模块: 处理请求头、Cookie、会话管理

  • 解析器工厂: 多平台解析器动态调度

  • 数据标准化: 统一不同平台的数据格式

  • 异常处理: 智能重试与降级策略

四、核心代码实现

4.1 异步请求基础框架

python

import asyncio import aiohttp from aiohttp import TCPConnector from typing import Dict, Any, Optional import logging from dataclasses import dataclass from urllib.parse import urljoin import json logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class RequestConfig: """请求配置类""" timeout: int = 30 max_retries: int = 3 delay: float = 1.0 use_proxy: bool = True headers: Optional[Dict] = None class AsyncRequestClient: """异步HTTP客户端""" def __init__(self, config: RequestConfig = None): self.config = config or RequestConfig() self.session = None self.proxy_pool = [] # 代理池实例 async def __aenter__(self): connector = TCPConnector(limit=100, ssl=False) self.session = aiohttp.ClientSession( connector=connector, headers=self._get_default_headers() ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.session.close() def _get_default_headers(self): """生成动态请求头""" return { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'Cache-Control': 'max-age=0', } async def fetch(self, url: str, method: str = 'GET', **kwargs) -> Optional[str]: """执行异步请求""" for attempt in range(self.config.max_retries): try: proxy = self._get_proxy() if self.config.use_proxy else None async with self.session.request( method=method, url=url, proxy=proxy, timeout=aiohttp.ClientTimeout(total=self.config.timeout), **kwargs ) as response: if response.status == 200: content = await response.text() logger.info(f"Successfully fetched {url}") return content elif response.status in [403, 429]: logger.warning(f"Blocked by {url}, retrying...") await self._rotate_proxy() await asyncio.sleep(self.config.delay * 2) else: logger.error(f"HTTP {response.status} for {url}") except Exception as e: logger.error(f"Attempt {attempt + 1} failed for {url}: {str(e)}") await asyncio.sleep(self.config.delay * (attempt + 1)) return None def _get_proxy(self): """从代理池获取代理""" if self.proxy_pool: import random return random.choice(self.proxy_pool) return None async def _rotate_proxy(self): """切换代理""" # 实现代理切换逻辑 pass

4.2 智能解析器引擎

python

from parsel import Selector import re from enum import Enum import hashlib class Platform(Enum): LIANJIA = "lianjia" ANJUKE = "anjuke" BEIKE = "beike" class ParserFactory: """解析器工厂""" @staticmethod def get_parser(platform: Platform): parsers = { Platform.LIANJIA: LianJiaParser, Platform.ANJUKE: AnJuKeParser, Platform.BEIKE: BeiKeParser } return parsers.get(platform, BaseParser)() class BaseParser: """基础解析器""" def extract_price(self, text: str) -> float: """提取价格信息""" patterns = [ r'(\d+(?:\.\d+)?)\s*万', r'¥\s*(\d+(?:\.\d+)?)', r'(\d+(?:\.\d+)?)\s*元/月' ] for pattern in patterns: match = re.search(pattern, text) if match: return float(match.group(1)) return 0.0 def extract_area(self, text: str) -> float: """提取面积""" pattern = r'(\d+(?:\.\d+)?)\s*㎡' match = re.search(pattern, text) return float(match.group(1)) if match else 0.0 def clean_text(self, text: str) -> str: """清洗文本""" if not text: return "" return re.sub(r'\s+', ' ', text).strip() class LianJiaParser(BaseParser): """链家解析器""" def parse_listing(self, html: str) -> Dict[str, Any]: """解析房源列表页""" selector = Selector(html) items = [] for house in selector.css('.sellListContent li'): item = { 'platform': 'lianjia', 'house_id': house.css('::attr(data-lj_action_housedel_id)').get(), 'title': self.clean_text(house.css('.title a::text').get()), 'price': self.extract_price(house.css('.totalPrice span::text').get() or ''), 'unit_price': self.extract_price(house.css('.unitPrice span::text').get() or ''), 'area': self.extract_area(house.css('.houseInfo::text').get() or ''), 'district': self.clean_text(house.css('.positionInfo a::text').get() or ''), 'community': self.clean_text(house.css('.positionInfo a::text')[1].get() if len(house.css('.positionInfo a::text')) > 1 else ''), 'url': urljoin('https://sh.lianjia.com', house.css('.title a::attr(href)').get()), 'tags': [tag.get() for tag in house.css('.tag span::text')] } items.append(item) # 提取分页信息 pagination = { 'total_pages': self._extract_total_pages(selector), 'current_page': self._extract_current_page(selector) } return {'items': items, 'pagination': pagination} def _extract_total_pages(self, selector): """提取总页数""" page_data = selector.css('.page-box.house-lst-page-box::attr(page-data)').get() if page_data: try: return json.loads(page_data).get('totalPage', 1) except: pass return 1 def _extract_current_page(self, selector): """提取当前页数""" current = selector.css('.content .page-box a.on::text').get() return int(current) if current and current.isdigit() else 1 class AnJuKeParser(BaseParser): """安居客解析器""" def parse_listing(self, html: str) -> Dict[str, Any]: """解析安居客房源列表""" selector = Selector(html) items = [] for house in selector.css('.house-list .list-item'): item = { 'platform': 'anjuke', 'house_id': house.css('::attr(data-id)').get(), 'title': self.clean_text(house.css('.house-title a::text').get()), 'price': self.extract_price(house.css('.price .price-con::text').get() or ''), 'unit_price': self.extract_price(house.css('.unit-price::text').get() or ''), 'area': self.extract_area(house.css('.details-item .area span::text').get() or ''), 'district': self.clean_text(house.css('.address .area a::text').get() or ''), 'community': self.clean_text(house.css('.address .comm-address a::text').get() or ''), 'url': house.css('.house-title a::attr(href)').get(), 'layout': self.clean_text(house.css('.details-item .area::text').get() or ''), } items.append(item) return {'items': items, 'pagination': self._parse_pagination(selector)} def _parse_pagination(self, selector): """解析分页""" # 实现分页解析逻辑 pass class BeiKeParser(BaseParser): """贝壳解析器""" # 贝壳解析器实现类似

4.3 Playwright动态渲染处理

python

from playwright.async_api import async_playwright import asyncio class DynamicPageCrawler: """处理动态渲染页面""" def __init__(self, headless: bool = True): self.headless = headless self.browser = None self.context = None async def __aenter__(self): playwright = await async_playwright().start() self.browser = await playwright.chromium.launch( headless=self.headless, args=[ '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', '--no-sandbox' ] ) # 创建上下文,模拟真实浏览器 self.context = await self.browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', locale='zh-CN', timezone_id='Asia/Shanghai', permissions=['geolocation'], extra_http_headers={ 'Accept-Language': 'zh-CN,zh;q=0.9', } ) # 屏蔽WebDriver特性 await self.context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); window.chrome = { runtime: {} }; """) return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.browser.close() async def crawl_page(self, url: str, wait_for: str = None, timeout: int = 30000): """爬取动态页面""" page = await self.context.new_page() try: # 设置请求拦截,优化性能 await page.route("**/*.{png,jpg,jpeg,gif,svg,woff,woff2}", lambda route: route.abort()) # 监听请求 page.on('request', lambda request: logger.debug(f"> {request.method} {request.url}")) page.on('response', lambda response: logger.debug(f"< {response.status} {response.url}")) # 导航到页面 await page.goto(url, wait_until='networkidle') # 等待特定元素 if wait_for: await page.wait_for_selector(wait_for, timeout=timeout) # 随机滚动,模拟用户行为 await self._simulate_scroll(page) # 获取页面内容 content = await page.content() # 提取JSON-LD结构化数据 structured_data = await self._extract_structured_data(page) # 截屏(调试用) # await page.screenshot(path=f'screenshot_{hashlib.md5(url.encode()).hexdigest()[:8]}.png') return { 'html': content, 'structured_data': structured_data, 'url': url, 'title': await page.title() } except Exception as e: logger.error(f"Error crawling {url}: {str(e)}") return None finally: await page.close() async def _simulate_scroll(self, page, scroll_times: int = 3): """模拟滚动行为""" import random for i in range(scroll_times): scroll_height = random.randint(300, 800) await page.evaluate(f"window.scrollBy(0, {scroll_height})") await asyncio.sleep(random.uniform(0.5, 2.0)) async def _extract_structured_data(self, page): """提取结构化数据(JSON-LD)""" try: return await page.evaluate(""" () => { const scripts = document.querySelectorAll('script[type="application/ld+json"]'); return Array.from(scripts).map(script => { try { return JSON.parse(script.textContent); } catch (e) { return null; } }).filter(data => data !== null); } """) except: return []

4.4 数据存储与管理

python

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import datetime import pandas as pd Base = declarative_base() class HouseListing(Base): """房源信息数据模型""" __tablename__ = 'house_listings' id = Column(Integer, primary_key=True) house_id = Column(String(100), unique=True, index=True) platform = Column(String(50), index=True) title = Column(String(500)) price = Column(Float) unit_price = Column(Float) area = Column(Float) layout = Column(String(50)) floor = Column(String(100)) orientation = Column(String(50)) district = Column(String(100)) community = Column(String(200)) address = Column(String(500)) longitude = Column(Float) latitude = Column(Float) tags = Column(JSON) url = Column(String(1000)) raw_data = Column(JSON) created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) def to_dict(self): return { field.name: getattr(self, field.name) for field in self.__table__.c if field.name not in ['raw_data'] } class DataManager: """数据管理器""" def __init__(self, db_url: str = 'sqlite:///houses.db'): self.engine = create_engine(db_url) self.Session = sessionmaker(bind=self.engine) Base.metadata.create_all(self.engine) def save_listings(self, listings: list): """保存房源信息""" session = self.Session() try: for listing_data in listings: # 检查是否已存在 existing = session.query(HouseListing).filter_by( house_id=listing_data['house_id'], platform=listing_data['platform'] ).first() if existing: # 更新现有记录 for key, value in listing_data.items(): if hasattr(existing, key): setattr(existing, key, value) existing.updated_at = datetime.now() else: # 创建新记录 house = HouseListing(**listing_data) session.add(house) session.commit() logger.info(f"Saved/Updated {len(listings)} listings") except Exception as e: session.rollback() logger.error(f"Error saving listings: {str(e)}") raise finally: session.close() def export_to_csv(self, filepath: str): """导出到CSV""" session = self.Session() try: query = session.query(HouseListing) df = pd.read_sql(query.statement, session.bind) df.to_csv(filepath, index=False, encoding='utf-8-sig') logger.info(f"Exported data to {filepath}") finally: session.close()

4.5 主爬虫调度器

python

import asyncio from typing import List import signal import sys class HouseSpiderScheduler: """爬虫调度器""" def __init__(self, platforms: List[str] = None): self.platforms = platforms or ['lianjia', 'anjuke', 'beike'] self.request_client = None self.dynamic_crawler = None self.data_manager = DataManager() self.tasks = [] async def initialize(self): """初始化组件""" self.request_client = AsyncRequestClient() self.dynamic_crawler = DynamicPageCrawler(headless=True) async def crawl_platform(self, platform: str, city: str = 'sh', max_pages: int = 10): """爬取特定平台""" urls = self._generate_urls(platform, city, max_pages) async with self.request_client as client: tasks = [self._crawl_page(client, platform, url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True) # 处理结果 all_listings = [] for result in results: if isinstance(result, Exception): logger.error(f"Crawl error: {str(result)}") elif result: all_listings.extend(result) # 保存数据 if all_listings: self.data_manager.save_listings(all_listings) return len(all_listings) def _generate_urls(self, platform: str, city: str, max_pages: int): """生成爬取URL""" templates = { 'lianjia': f'https://{city}.lianjia.com/ershoufang/pg{{}}/', 'anjuke': f'https://{city}.anjuke.com/sale/p{{}}/', 'beike': f'https://{city}.ke.com/ershoufang/pg{{}}/' } if platform not in templates: return [] return [templates[platform].format(i) for i in range(1, max_pages + 1)] async def _crawl_page(self, client, platform: str, url: str): """爬取单个页面""" try: # 首先尝试静态爬取 html = await client.fetch(url) if not html: return [] # 使用相应解析器 parser = ParserFactory.get_parser(Platform(platform)) result = parser.parse_listing(html) # 如果需要,使用动态爬取获取详情页 if result.get('items'): detail_tasks = [ self._crawl_detail(client, parser, item['url']) for item in result['items'][:3] # 限制并发,防止被封 ] detail_results = await asyncio.gather(*detail_tasks) # 合并详情信息 for item, detail in zip(result['items'], detail_results): if detail: item.update(detail) return result.get('items', []) except Exception as e: logger.error(f"Error crawling {url}: {str(e)}") return [] async def _crawl_detail(self, client, parser, url: str): """爬取详情页""" try: html = await client.fetch(url) if not html: return {} # 这里可以添加详情页解析逻辑 # 例如:解析房源描述、图片、配套设施等 return { 'detail_fetched': True, 'fetched_at': datetime.now().isoformat() } except Exception as e: logger.error(f"Error crawling detail {url}: {str(e)}") return {} async def run(self, cities: List[str] = None): """运行爬虫""" cities = cities or ['sh', 'bj', 'gz', 'sz'] logger.info("Starting house spider scheduler...") await self.initialize() # 为每个城市和平台创建任务 tasks = [] for city in cities: for platform in self.platforms: task = asyncio.create_task( self.crawl_platform(platform, city, max_pages=3) ) tasks.append(task) # 等待所有任务完成 results = await asyncio.gather(*tasks, return_exceptions=True) total_listings = 0 for result in results: if isinstance(result, int): total_listings += result logger.info(f"Crawl completed. Total listings: {total_listings}") # 导出数据 self.data_manager.export_to_csv(f'house_listings_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv') return total_listings def signal_handler(signum, frame): """信号处理""" logger.info("Received shutdown signal, cleaning up...") sys.exit(0) async def main(): """主函数""" # 注册信号处理 signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # 创建调度器并运行 scheduler = HouseSpiderScheduler() try: total = await scheduler.run(['sh']) # 先爬取上海 print(f"\n🎉 爬取完成!共获取 {total} 条房源信息") # 显示统计信息 session = scheduler.data_manager.Session() try: from sqlalchemy import func stats = session.query( HouseListing.platform, func.count(HouseListing.id).label('count'), func.avg(HouseListing.price).label('avg_price') ).group_by(HouseListing.platform).all() print("\n📊 平台统计:") for platform, count, avg_price in stats: print(f" {platform}: {count} 条记录, 平均价格: {avg_price:.2f} 万") finally: session.close() except Exception as e: logger.error(f"Main execution error: {str(e)}") if __name__ == '__main__': # 设置事件循环策略(Windows兼容) if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) # 运行异步主函数 asyncio.run(main())

五、高级特性实现

5.1 反反爬策略增强

python

class AntiAntiCrawler: """反反爬策略管理器""" def __init__(self): self.fingerprints = [] def generate_fingerprint(self): """生成浏览器指纹""" import random return { 'user_agent': self._random_user_agent(), 'screen_resolution': self._random_screen_resolution(), 'timezone_offset': self._random_timezone(), 'webgl_vendor': self._random_webgl_info(), 'plugins': self._random_plugins(), 'fonts': self._random_fonts(), 'language': 'zh-CN,zh;q=0.9', 'hardware_concurrency': random.choice([4, 8, 16]), 'device_memory': random.choice([4, 8, 16]) } def _random_user_agent(self): """随机用户代理""" agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15' ] import random return random.choice(agents)

5.2 数据去重与质量控制

python

class DataQualityController: """数据质量控制""" @staticmethod def deduplicate(listings: list, key_fields: list = None): """去重""" if not key_fields: key_fields = ['house_id', 'platform', 'title'] seen = set() unique_listings = [] for listing in listings: # 生成唯一标识 identifier = tuple(str(listing.get(field, '')) for field in key_fields) identifier_hash = hashlib.md5(''.join(identifier).encode()).hexdigest() if identifier_hash not in seen: seen.add(identifier_hash) unique_listings.append(listing) return unique_listings @staticmethod def validate_listing(listing: dict) -> bool: """验证房源数据有效性""" required_fields = ['title', 'price', 'area', 'district'] # 检查必需字段 for field in required_fields: if not listing.get(field): return False # 价格合理性检查 price = listing.get('price', 0) area = listing.get('area', 0) if price <= 0 or area <= 0: return False # 单价合理性检查 unit_price = price / area if area > 0 else 0 if unit_price < 1000 or unit_price > 200000: # 假设合理范围 return False return True

六、部署与监控

6.1 使用Scrapy框架重构

python

# scrapy_spider.py import scrapy from scrapy_playwright.page import PageMethod from itemadapter import ItemAdapter class HouseSpider(scrapy.Spider): name = 'house_spider' def start_requests(self): urls = [ 'https://sh.lianjia.com/ershoufang/', 'https://sh.anjuke.com/sale/' ] for url in urls: yield scrapy.Request( url=url, callback=self.parse, meta=dict( playwright=True, playwright_page_methods=[ PageMethod('wait_for_selector', '.house-list'), PageMethod('evaluate', 'window.scrollBy(0, document.body.scrollHeight)'), ] ) ) async def parse(self, response): # Scrapy解析逻辑 pass

6.2 分布式爬虫架构

python

# 使用Redis实现分布式任务队列 import redis from rq import Queue class DistributedScheduler: """分布式调度器""" def __init__(self, redis_url='redis://localhost:6379'): self.redis_conn = redis.from_url(redis_url) self.queue = Queue('house_crawler', connection=self.redis_conn) def enqueue_crawl_task(self, platform, city, pages): """入队爬取任务""" from tasks import crawl_platform_task job = self.queue.enqueue( crawl_platform_task, platform, city, pages, job_timeout='30m' ) return job.id

七、性能优化建议

  1. 连接池优化: 使用aiohttp.TCPConnector管理连接池

  2. 缓存策略: 对静态资源实现本地缓存

  3. 请求限流: 使用asyncio.Semaphore控制并发数

  4. 增量爬取: 基于时间戳只爬取更新内容

  5. CDN优化: 识别并直接请求CDN资源

八、法律与伦理考虑

  1. 遵守robots.txt: 尊重网站的爬虫协议

  2. 请求频率控制: 避免对目标网站造成压力

  3. 数据使用限制: 仅用于个人学习研究

  4. 隐私保护: 不爬取个人隐私信息

  5. 版权尊重: 不侵犯数据知识产权

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 10:13:47

Python爬虫自动生成网站地图:基于最新技术的Sitemap生成器全攻略

一、引言&#xff1a;为什么需要自动化Sitemap生成&#xff1f;在现代SEO优化中&#xff0c;网站地图&#xff08;Sitemap&#xff09;扮演着至关重要的角色。它不仅帮助搜索引擎爬虫更高效地索引网站内容&#xff0c;还能提升网站的收录率和搜索排名。然而&#xff0c;对于大型…

作者头像 李华
网站建设 2026/4/23 10:13:38

当历史智慧遇见测试前沿

2025年12月26日&#xff0c;我们迎来毛泽东主席诞辰132周年纪念日。这一天不仅是历史的回响&#xff0c;更是现代职场的一面镜子。对软件测试从业者而言&#xff0c;测试工作如同“革命征程”——需要精准的战略、坚韧的团队协作和不懈的质量追求。毛泽东的“群众路线”和“持久…

作者头像 李华
网站建设 2026/4/23 10:14:09

华为服务器中Mindie镜像的部署及启动方法

一、部署方法 首先要安装好Docker,然后点开网址https://www.hiascend.com/developer/ascendhub/detail/af85b724a7e5469ebd7ea13c3439d48f 拉取镜像需要申请权限: 注册登录后,即可提交申请,一般需要一个工作日,等审核通过后,点击下载即可弹出如下提示框: 按照上述方法…

作者头像 李华
网站建设 2026/4/15 15:06:18

揭秘PHP图像识别结果解析:5个关键步骤让你快速掌握核心技术

第一章&#xff1a;PHP图像识别结果解析概述在现代Web应用开发中&#xff0c;图像识别技术正逐步成为提升用户体验与系统智能化水平的重要手段。PHP作为广泛使用的服务器端脚本语言&#xff0c;虽然并非专为人工智能计算设计&#xff0c;但通过集成外部识别服务或调用本地模型A…

作者头像 李华