news 2026/4/23 11:25:44

基于异步协程与机器学习去重的智能招聘信息聚合python爬虫实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
基于异步协程与机器学习去重的智能招聘信息聚合python爬虫实战

引言:招聘信息聚合的挑战与机遇

在当今数字化招聘时代,求职者常常需要在多个招聘平台间切换搜索,而招聘网站反爬机制日益复杂,传统爬虫技术已难以应对。本文将介绍一个基于Python异步协程、智能代理池和机器学习去重技术的现代化招聘信息聚合爬虫系统,实现高效、稳定、智能的数据采集。

技术栈概览

  • 异步框架: asyncio + aiohttp + aiomysql

  • 反反爬技术: 动态代理池 + 请求指纹模拟 + 浏览器行为模拟

  • 智能解析: Playwright自动化 + XPath/CSS选择器 + 正则表达式

  • 数据存储: MySQL 8.0 + Redis + 异步数据库操作

  • 去重技术: SimHash算法 + 布隆过滤器 + 文本相似度计算

  • 监控部署: Prometheus + Grafana + Docker容器化

系统架构设计

python

""" 智能招聘信息聚合爬虫系统架构 """ import asyncio import aiohttp import aiomysql from typing import Dict, List, Optional from dataclasses import dataclass from datetime import datetime import hashlib import json @dataclass class JobPosition: """职位数据模型""" id: str title: str company: str salary: str location: str experience: str education: str tags: List[str] description: str source: str url: str publish_time: datetime crawl_time: datetime hash_value: str = None def __post_init__(self): """生成内容哈希值用于去重""" content = f"{self.title}{self.company}{self.description}" self.hash_value = self.generate_simhash(content) @staticmethod def generate_simhash(content: str, bits: int = 64) -> str: """SimHash算法生成文档指纹""" import numpy as np # 分词和哈希(简化版) words = content.split() vector = np.zeros(bits) for word in words: # 生成每个词的哈希 word_hash = bin(int(hashlib.md5(word.encode()).hexdigest(), 16))[2:].zfill(bits) # 加权累加 for i, bit in enumerate(word_hash): vector[i] += 1 if bit == '1' else -1 # 生成SimHash simhash = ''.join(['1' if v > 0 else '0' for v in vector]) return simhash

核心爬虫实现

1. 异步爬虫引擎

python

import asyncio import aiohttp from aiohttp import ClientTimeout, TCPConnector from contextlib import asynccontextmanager import random import logging from urllib.parse import urlparse, urljoin import backoff logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AsyncSpiderEngine: """异步爬虫引擎""" def __init__(self, max_concurrent: int = 10, request_timeout: int = 30): self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) self.timeout = ClientTimeout(total=request_timeout) self.session = None self.proxy_pool = ProxyPool() self.user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36' ] @asynccontextmanager async def create_session(self): """创建aiohttp会话""" connector = TCPConnector(limit=self.max_concurrent, ssl=False) async with aiohttp.ClientSession( connector=connector, timeout=self.timeout, headers=self._get_headers() ) as session: self.session = session yield session def _get_headers(self) -> Dict: """生成随机请求头""" return { 'User-Agent': random.choice(self.user_agents), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', } @backoff.on_exception( backoff.expo, (aiohttp.ClientError, asyncio.TimeoutError), max_tries=3, max_time=30 ) async def fetch(self, url: str, use_proxy: bool = True) -> Optional[str]: """异步获取页面内容""" async with self.semaphore: try: proxy = await self.proxy_pool.get_proxy() if use_proxy else None async with self.session.get( url, proxy=proxy, headers=self._get_headers(), cookies=self._get_cookies() ) as response: if response.status == 200: content = await response.text() # 更新代理评分 if proxy: await self.proxy_pool.update_score(proxy, True) return content else: logger.warning(f"请求失败: {url}, 状态码: {response.status}") if proxy: await self.proxy_pool.update_score(proxy, False) return None except Exception as e: logger.error(f"请求异常 {url}: {e}") return None def _get_cookies(self) -> Dict: """生成模拟cookies""" return { 'session_id': hashlib.md5(str(random.random()).encode()).hexdigest(), 'user_token': hashlib.md5(str(random.random()).encode()).hexdigest()[:16] }

2. 智能代理池实现

python

import asyncio import aiohttp from typing import List, Dict import random class ProxyPool: """智能代理池管理""" def __init__(self): self.proxies = [] self.proxy_scores = {} self.lock = asyncio.Lock() self.proxy_sources = [ 'http://www.proxy-list.org/', 'https://free-proxy-list.net/', 'http://www.gatherproxy.com/' ] async def initialize(self): """初始化代理池""" await self.refresh_proxies() # 启动定时刷新任务 asyncio.create_task(self._scheduled_refresh()) async def refresh_proxies(self): """刷新代理列表""" async with self.lock: new_proxies = [] for source in self.proxy_sources: proxies = await self._fetch_proxies_from_source(source) new_proxies.extend(proxies) # 验证代理可用性 valid_proxies = await self._validate_proxies(new_proxies) self.proxies = valid_proxies # 初始化分数 for proxy in valid_proxies: self.proxy_scores[proxy] = 100 async def get_proxy(self) -> Optional[str]: """获取高质量代理""" if not self.proxies: await self.refresh_proxies() # 根据分数选择代理(加权随机) weighted_proxies = [] for proxy in self.proxies: weight = self.proxy_scores.get(proxy, 50) weighted_proxies.extend([proxy] * weight) return random.choice(weighted_proxies) if weighted_proxies else None async def update_score(self, proxy: str, success: bool): """更新代理评分""" current_score = self.proxy_scores.get(proxy, 50) if success: new_score = min(current_score + 10, 200) else: new_score = max(current_score - 30, 0) if new_score <= 0: self.proxies.remove(proxy) self.proxy_scores.pop(proxy, None) return self.proxy_scores[proxy] = new_score async def _scheduled_refresh(self): """定时刷新代理""" while True: await asyncio.sleep(3600) # 每小时刷新一次 await self.refresh_proxies()

3. Playwright动态渲染支持

python

import asyncio from playwright.async_api import async_playwright from bs4 import BeautifulSoup class DynamicPageRenderer: """处理JavaScript动态渲染的页面""" def __init__(self): self.browser = None self.context = None async def __aenter__(self): self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=True, args=['--disable-blink-features=AutomationControlled'] ) self.context = await self.browser.new_context( user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', viewport={'width': 1920, 'height': 1080} ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.context.close() await self.browser.close() await self.playwright.stop() async def render_page(self, url: str, wait_for_selector: str = None) -> str: """渲染动态页面""" page = await self.context.new_page() # 模拟人类行为 await self._simulate_human_behavior(page) try: await page.goto(url, wait_until='networkidle') if wait_for_selector: await page.wait_for_selector(wait_for_selector, timeout=10000) # 随机滚动 await self._random_scroll(page) # 获取渲染后的内容 content = await page.content() return content finally: await page.close() async def _simulate_human_behavior(self, page): """模拟人类浏览行为""" # 随机延迟 await asyncio.sleep(random.uniform(1, 3)) # 随机移动鼠标 await page.mouse.move( random.randint(100, 500), random.randint(100, 500) ) async def _random_scroll(self, page): """随机滚动页面""" for _ in range(random.randint(2, 5)): scroll_amount = random.randint(300, 800) await page.evaluate(f"window.scrollBy(0, {scroll_amount})") await asyncio.sleep(random.uniform(0.5, 1.5))

4. 招聘网站解析器

python

class JobSiteParser: """招聘网站解析基类""" def __init__(self): self.spider = AsyncSpiderEngine() self.renderer = DynamicPageRenderer() async def parse_job_list(self, url: str) -> List[Dict]: """解析职位列表页""" raise NotImplementedError async def parse_job_detail(self, url: str) -> Optional[JobPosition]: """解析职位详情页""" raise NotImplementedError def extract_salary_range(self, salary_text: str) -> Dict: """提取薪资范围""" import re pattern = r'(\d+\.?\d*)[kK]?-?(\d+\.?\d*)?[kK]?' match = re.search(pattern, salary_text) if match: min_salary = float(match.group(1)) * 1000 max_salary = float(match.group(2)) * 1000 if match.group(2) else min_salary return { 'min': min_salary, 'max': max_salary, 'text': salary_text } return {'min': 0, 'max': 0, 'text': salary_text} class BossZhiPinParser(JobSiteParser): """Boss直聘解析器""" async def parse_job_list(self, url: str) -> List[Dict]: """解析Boss直聘列表页""" content = await self.spider.fetch(url) if not content: # 尝试使用动态渲染 async with self.renderer: content = await self.renderer.render_page( url, wait_for_selector='.job-list-box' ) soup = BeautifulSoup(content, 'html.parser') jobs = [] for item in soup.select('.job-card-wrapper'): try: job = { 'title': item.select_one('.job-title').text.strip(), 'company': item.select_one('.company-name').text.strip(), 'salary': item.select_one('.salary').text.strip(), 'location': item.select_one('.job-area').text.strip(), 'experience': item.select_one('.tag-list').text.strip(), 'link': item.select_one('a')['href'], 'source': 'boss_zhipin' } jobs.append(job) except Exception as e: logger.error(f"解析职位项失败: {e}") return jobs async def parse_job_detail(self, url: str) -> Optional[JobPosition]: """解析Boss直聘详情页""" full_url = f"https://www.zhipin.com{url}" if not url.startswith('http') else url async with self.renderer: content = await self.renderer.render_page( full_url, wait_for_selector='.job-detail' ) soup = BeautifulSoup(content, 'html.parser') try: title = soup.select_one('.job-title').text.strip() company = soup.select_one('.company-name').text.strip() salary = soup.select_one('.salary').text.strip() # 提取其他信息 info_items = soup.select('.job-detail-section-item') location = info_items[0].text.strip() if len(info_items) > 0 else '' experience = info_items[1].text.strip() if len(info_items) > 1 else '' description = soup.select_one('.job-sec-text').text.strip() return JobPosition( id=hashlib.md5(full_url.encode()).hexdigest(), title=title, company=company, salary=salary, location=location, experience=experience, education='', # 可根据需要提取 tags=[], # 可根据需要提取 description=description, source='boss_zhipin', url=full_url, publish_time=datetime.now(), crawl_time=datetime.now() ) except Exception as e: logger.error(f"解析详情页失败 {full_url}: {e}") return None class LagouParser(JobSiteParser): """拉勾网解析器""" # 实现类似Boss直聘的解析逻辑 pass class ZhilianParser(JobSiteParser): """智联招聘解析器""" # 实现类似Boss直聘的解析逻辑 pass

5. 异步数据存储

python

import aiomysql from motor.motor_asyncio import AsyncIOMotorClient from redis import asyncio as aioredis import json class AsyncDataStorage: """异步数据存储管理器""" def __init__(self, mysql_config: Dict, redis_config: Dict, mongo_config: Dict = None): self.mysql_config = mysql_config self.redis_config = redis_config self.mongo_config = mongo_config self.pool = None self.redis = None self.mongo = None async def initialize(self): """初始化数据库连接""" # 初始化MySQL连接池 self.pool = await aiomysql.create_pool(**self.mysql_config) # 初始化Redis连接 self.redis = await aioredis.from_url( f"redis://{self.redis_config['host']}:{self.redis_config['port']}", password=self.redis_config.get('password'), db=self.redis_config.get('db', 0) ) # 初始化MongoDB连接(可选) if self.mongo_config: self.mongo = AsyncIOMotorClient(self.mongo_config['uri']) async def save_job(self, job: JobPosition): """保存职位信息""" # 1. 使用布隆过滤器去重 if await self.is_duplicate(job.hash_value): logger.info(f"检测到重复职位: {job.title}") return False async with self.pool.acquire() as conn: async with conn.cursor() as cursor: sql = """ INSERT INTO jobs ( id, title, company, salary, location, experience, education, description, source, url, publish_time, crawl_time, hash_value ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE crawl_time = VALUES(crawl_time) """ await cursor.execute(sql, ( job.id, job.title, job.company, job.salary, job.location, job.experience, job.education, job.description, job.source, job.url, job.publish_time, job.crawl_time, job.hash_value )) await conn.commit() # 2. 添加到布隆过滤器 await self.redis.setbit('job_bloom_filter', int(job.hash_value[:8], 16) % (2**20), 1) # 3. 缓存到Redis cache_key = f"job:{job.id}" await self.redis.setex( cache_key, 3600 * 24, # 缓存24小时 json.dumps(job.__dict__, default=str) ) return True async def is_duplicate(self, hash_value: str) -> bool: """使用布隆过滤器检查是否重复""" # 简单的布隆过滤器实现 position = int(hash_value[:8], 16) % (2**20) result = await self.redis.getbit('job_bloom_filter', position) return bool(result) async def close(self): """关闭数据库连接""" if self.pool: self.pool.close() await self.pool.wait_closed() if self.redis: await self.redis.close()

6. 主调度程序

python

class JobSpiderScheduler: """爬虫调度器""" def __init__(self): self.parsers = { 'boss_zhipin': BossZhiPinParser(), 'lagou': LagouParser(), 'zhilian': ZhilianParser() } self.storage = AsyncDataStorage( mysql_config={ 'host': 'localhost', 'port': 3306, 'user': 'root', 'password': 'password', 'db': 'job_spider', 'minsize': 1, 'maxsize': 10 }, redis_config={ 'host': 'localhost', 'port': 6379, 'password': '', 'db': 0 } ) self.task_queue = asyncio.Queue() self.results = [] async def run(self): """运行爬虫""" logger.info("开始运行招聘信息聚合爬虫...") # 初始化存储 await self.storage.initialize() # 定义爬取任务 tasks = [ ('boss_zhipin', 'https://www.zhipin.com/web/geek/job?query=python&city=101010100'), ('lagou', 'https://www.lagou.com/jobs/list_python?city=北京'), ('zhilian', 'https://sou.zhaopin.com/?jl=北京&kw=python') ] # 启动消费者任务 consumer_tasks = [ asyncio.create_task(self._consumer()) for _ in range(5) # 5个并发消费者 ] # 添加任务到队列 for task in tasks: await self.task_queue.put(task) # 等待所有任务完成 await self.task_queue.join() # 取消消费者任务 for consumer in consumer_tasks: consumer.cancel() # 关闭存储连接 await self.storage.close() logger.info(f"爬虫完成,共采集 {len(self.results)} 个职位") async def _consumer(self): """消费者:处理爬取任务""" while True: try: source, url = await self.task_queue.get() parser = self.parsers.get(source) if parser: # 解析列表页 jobs = await parser.parse_job_list(url) for job_info in jobs[:10]: # 限制每个站点爬取数量 # 解析详情页 job = await parser.parse_job_detail(job_info['link']) if job: # 保存到数据库 success = await self.storage.save_job(job) if success: self.results.append(job) logger.info(f"成功保存职位: {job.title}") # 礼貌延迟 await asyncio.sleep(random.uniform(1, 3)) self.task_queue.task_done() except asyncio.CancelledError: break except Exception as e: logger.error(f"处理任务失败: {e}") self.task_queue.task_done() async def main(): """主函数""" scheduler = JobSpiderScheduler() try: await scheduler.run() except KeyboardInterrupt: logger.info("收到中断信号,优雅退出...") except Exception as e: logger.error(f"爬虫运行异常: {e}") finally: logger.info("爬虫结束") if __name__ == "__main__": # 创建数据库表(一次性执行) create_table_sql = """ CREATE TABLE IF NOT EXISTS jobs ( id VARCHAR(64) PRIMARY KEY, title VARCHAR(255) NOT NULL, company VARCHAR(255) NOT NULL, salary VARCHAR(100), location VARCHAR(100), experience VARCHAR(100), education VARCHAR(100), description TEXT, source VARCHAR(50), url VARCHAR(500), publish_time DATETIME, crawl_time DATETIME, hash_value VARCHAR(128), INDEX idx_title (title(100)), INDEX idx_company (company(100)), INDEX idx_source (source), INDEX idx_crawl_time (crawl_time) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; """ # 运行爬虫 asyncio.run(main())

高级功能扩展

1. 机器学习去重优化

python

from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import numpy as np class SmartDeduplicator: """智能去重器""" def __init__(self): self.vectorizer = TfidfVectorizer(max_features=5000) self.job_vectors = [] self.job_ids = [] def calculate_similarity(self, text1: str, text2: str) -> float: """计算文本相似度""" vectors = self.vectorizer.fit_transform([text1, text2]) similarity = cosine_similarity(vectors[0:1], vectors[1:2])[0][0] return similarity def is_similar_job(self, new_job: JobPosition, threshold: float = 0.8) -> bool: """判断是否为相似职位""" for job_id, vector in zip(self.job_ids, self.job_vectors): similarity = cosine_similarity( self.vectorizer.transform([new_job.description]), vector )[0][0] if similarity > threshold: return True return False

2. 反爬策略监控

python

class AntiAntiSpiderMonitor: """反反爬监控器""" def __init__(self): self.request_count = 0 self.blocked_count = 0 self.success_count = 0 async def monitor_request(self, url: str, success: bool): """监控请求状态""" self.request_count += 1 if success: self.success_count += 1 else: self.blocked_count += 1 # 计算成功率 success_rate = self.success_count / max(self.request_count, 1) # 如果成功率过低,触发警报 if success_rate < 0.3: logger.warning(f"爬虫被频繁拦截,成功率: {success_rate:.2%}") await self.adjust_strategy() async def adjust_strategy(self): """调整爬取策略""" logger.info("调整爬取策略:增加延迟、更换代理、更换User-Agent") # 实现策略调整逻辑

部署与监控

Docker部署配置

dockerfile

# Dockerfile FROM python:3.9-slim WORKDIR /app # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 安装Playwright浏览器 RUN playwright install chromium # 复制代码 COPY . . # 运行爬虫 CMD ["python", "main.py"]

Prometheus监控配置

yaml

# prometheus.yml scrape_configs: - job_name: 'job_spider' static_configs: - targets: ['localhost:9091']
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 14:42:04

拓竹科技打印的的AMS 内部有哪些部件,什么功能

拓竹科技打印的的AMS 内部有哪些部件&#xff0c;什么功能拓竹科技&#xff08;Bambu Lab&#xff09;的自动供料系统&#xff08;AMS&#xff09;的核心&#xff0c;是一个由多个精密部件协同工作的“自动化料仓”。线材从装入到送入打印头&#xff0c;会依次经过以下主要部件…

作者头像 李华
网站建设 2026/4/23 13:17:04

毕业设计救星:基于MGeo的地址相似度计算系统快速搭建

毕业设计救星&#xff1a;基于MGeo的地址相似度计算系统快速搭建 距离答辩只剩两周&#xff0c;计算机专业的你还在为"智能地址管理系统"的核心算法发愁&#xff1f;别担心&#xff0c;今天我要分享的MGeo地址相似度计算方案&#xff0c;能帮你快速搭建出专业级的地址…

作者头像 李华
网站建设 2026/4/23 11:34:09

疫情防控中的地址技术:MGeo在流调溯源中的实战

疫情防控中的地址技术&#xff1a;MGeo在流调溯源中的实战 引言 在疫情防控工作中&#xff0c;疾控中心经常需要处理大量口头描述的非标准地址信息&#xff0c;如"XX超市隔壁的网吧"。这类地址描述往往包含模糊的空间关系和复杂的语义信息&#xff0c;传统的地理编码…

作者头像 李华
网站建设 2026/4/23 11:38:38

Mac音频格式转换神器:QMCDecode轻松解锁QQ音乐加密文件

Mac音频格式转换神器&#xff1a;QMCDecode轻松解锁QQ音乐加密文件 【免费下载链接】QMCDecode QQ音乐QMC格式转换为普通格式(qmcflac转flac&#xff0c;qmc0,qmc3转mp3, mflac,mflac0等转flac)&#xff0c;仅支持macOS&#xff0c;可自动识别到QQ音乐下载目录&#xff0c;默认…

作者头像 李华
网站建设 2026/4/23 14:35:16

乡村振兴中的AI实践:基于MGeo的农村模糊地址匹配方案

乡村振兴中的AI实践&#xff1a;基于MGeo的农村模糊地址匹配方案 为什么农村地址匹配是个技术难题&#xff1f; 在助农电商平台的实际运营中&#xff0c;我们常遇到这样的场景&#xff1a;农户下单时填写的是"老王家隔壁的蓝色大棚"或"村口第二棵枣树往东50米&q…

作者头像 李华
网站建设 2026/4/23 12:31:29

传统3天VS AI 3分钟:支付违规处理效率对比

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 开发一个效率对比演示工具&#xff0c;展示&#xff1a;1. 传统人工排查支付违规的完整流程 2. AI自动化检测修复流程 3. 关键节点耗时对比图表 4. 成功率统计数据 5. 成本对比分析…

作者头像 李华