前言
在 Python 爬虫开发领域,零散的代码片段仅能满足简单的爬取需求,面对多站点、大批量、高稳定性的爬取场景时,易出现 “代码冗余、维护困难、扩展性差” 等问题。基础爬虫框架的设计是解决上述问题的核心 —— 通过模块化、分层化的架构设计,将爬虫拆分为可复用的核心组件,既能提升开发效率,又能保障爬虫的稳定性和可维护性。本文将从框架设计原则、核心组件拆解、实战开发等维度,系统讲解基础爬虫框架的设计思路与实现方法,帮助开发者构建标准化、可扩展的爬虫体系。
摘要
本文聚焦基础爬虫框架的结构设计,从框架设计核心原则出发,拆解请求模块、解析模块、存储模块、调度模块、异常处理模块等核心组件,并通过完整的实战案例(框架实战链接:多站点爬取测试平台、框架性能测试页)演示基础爬虫框架的开发与应用。读者可通过本文掌握模块化爬虫框架的设计思路,实现从 “零散代码” 到 “标准化框架” 的进阶,提升爬虫开发的效率和可维护性。
一、爬虫框架设计核心原则
1.1 框架设计的核心目标
| 目标维度 | 具体说明 |
|---|---|
| 模块化 | 将爬虫拆分为独立组件(请求、解析、存储等),组件间低耦合、高内聚 |
| 可复用 | 核心组件(如请求封装、异常处理)可跨项目复用,减少重复开发 |
| 可扩展 | 支持新增爬取站点、解析规则、存储方式时,无需修改核心代码 |
| 可维护 | 代码结构清晰,日志完善,便于问题定位和功能迭代 |
| 稳定性 | 具备重试、限流、异常捕获机制,保障爬虫持续稳定运行 |
1.2 框架设计的核心原则
| 原则名称 | 具体要求 |
|---|---|
| 单一职责 | 每个组件仅负责一个核心功能(如请求模块仅处理 HTTP 请求,解析模块仅处理数据提取) |
| 开闭原则 | 框架对扩展开放(新增功能),对修改关闭(核心代码无需改动) |
| 接口隔离 | 不同组件通过清晰的接口交互,避免组件间依赖过度 |
| 异常隔离 | 单个组件的异常不影响整个框架运行,异常被限定在组件内部处理 |
| 配置解耦 | 爬取规则、请求参数、存储路径等配置与代码分离,便于灵活调整 |
二、基础爬虫框架核心组件拆解
2.1 框架整体架构(分层设计)
plaintext
基础爬虫框架架构 ├── 配置层(Config):管理所有配置项(请求头、爬取规则、存储路径等) ├── 核心层(Core):框架核心组件 │ ├── 请求模块(Request):封装HTTP请求,处理重试、限流、反爬 │ ├── 解析模块(Parser):封装数据解析逻辑,支持多规则解析 │ ├── 存储模块(Storage):封装数据存储逻辑,支持多格式存储 │ ├── 调度模块(Scheduler):管理爬取任务,控制爬取流程 │ └── 异常处理模块(Exception):统一处理各类异常,记录日志 ├── 业务层(Business):具体爬取业务实现(基于核心层扩展) └── 工具层(Utils):通用工具函数(日志、编码处理、数据校验等)2.2 核心组件功能详解
| 组件名称 | 核心功能 | 技术选型 | 关键特性 |
|---|---|---|---|
| 配置模块 | 加载 / 管理配置(YAML/JSON/ 环境变量) | PyYAML/python-dotenv | 配置热更新、默认值兜底 |
| 请求模块 | 发送 HTTP/HTTPS 请求,处理重试 / 超时 / 反爬 | requests/aiohttp | 会话复用、请求限流、代理支持 |
| 解析模块 | 提取目标数据(HTML/JSON/XML) | BeautifulSoup/jsonpath/lxml | 多解析规则支持、字段校验 |
| 存储模块 | 数据持久化(文件 / 数据库) | pandas/sqlalchemy | 批量存储、去重、事务支持 |
| 调度模块 | 任务队列管理、爬取流程控制 | queue/threading | 单线程 / 多线程调度、优先级控制 |
| 异常处理模块 | 统一异常捕获、分类处理、日志记录 | logging | 异常分级、重试触发、告警提示 |
| 工具模块 | 通用工具(日志、编码、数据校验) | logging/chardet/pydantic | 功能复用、标准化输出 |
三、基础爬虫框架实战开发
3.1 开发环境准备
bash
运行
# 安装核心依赖 pip install requests PyYAML beautifulsoup4 jsonpath-python pandas sqlalchemy chardet pydantic3.2 框架目录结构(标准化)
plaintext
spider_framework/ ├── config/ # 配置目录 │ └── spider_config.yaml # 爬虫配置文件 ├── core/ # 核心组件目录 │ ├── __init__.py │ ├── request_handler.py # 请求模块 │ ├── parse_handler.py # 解析模块 │ ├── storage_handler.py # 存储模块 │ ├── scheduler.py # 调度模块 │ └── exception_handler.py # 异常处理模块 ├── utils/ # 工具目录 │ ├── __init__.py │ ├── log_utils.py # 日志工具 │ └── data_utils.py # 数据工具 ├── business/ # 业务实现目录 │ ├── __init__.py │ └── demo_spider.py # 示例爬虫 └── main.py # 框架入口3.3 核心组件实现
3.3.1 工具层:日志工具(utils/log_utils.py)
python
运行
import logging import os from datetime import datetime def setup_logger(name="spider_framework", log_level=logging.DEBUG): """ 初始化框架日志配置 :param name: 日志器名称 :param log_level: 日志级别 :return: 配置好的logger对象 """ # 创建日志目录 log_dir = "logs" if not os.path.exists(log_dir): os.makedirs(log_dir) # 日志文件名 log_file = f"{log_dir}/spider_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" # 日志格式 formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s" ) # 初始化logger logger = logging.getLogger(name) logger.setLevel(log_level) logger.handlers.clear() # 清空原有处理器 # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) # 文件处理器 file_handler = logging.FileHandler(log_file, encoding="utf-8") file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger # 全局日志对象 logger = setup_logger()输出结果(初始化日志):
plaintext
2025-12-17 15:00:00,000 - spider_framework - INFO - log_utils.py:35 - 日志初始化完成,日志文件路径:logs/spider_20251217_150000.log原理说明:
- 封装日志初始化逻辑,支持控制台 + 文件双输出,便于调试和问题追溯;
- 按时间戳命名日志文件,避免日志覆盖;
- 全局 logger 对象供框架所有组件复用,保证日志格式统一。
3.3.2 配置层:配置管理(config/spider_config.yaml + core/init.py)
配置文件(config/spider_config.yaml):
yaml
# 请求配置 request: headers: User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" Referer: "https://www.example.com" timeout: 10 # 请求超时时间(秒) retry_times: 3 # 重试次数 retry_interval: 2 # 重试间隔(秒) rate_limit: 1 # 请求频率限制(秒/次) # 解析配置 parse: # 示例解析规则(JSON接口) json_rules: demo_api: fields: id: "$.id" title: "$.title" content: "$.body" # 示例解析规则(HTML页面) html_rules: demo_html: item_selector: "div.item" fields: name: "span.name::text" price: "span.price::text" # 存储配置 storage: default_format: "csv" # 默认存储格式:csv/excel/sqlite save_path: "data" # 数据保存目录 sqlite_db: "data/spider_data.db" # SQLite数据库路径配置加载(core/init.py):
python
运行
import yaml import os from utils.log_utils import logger # 加载配置文件 def load_config(config_path="config/spider_config.yaml"): """ 加载YAML配置文件 :param config_path: 配置文件路径 :return: 配置字典 """ try: if not os.path.exists(config_path): raise FileNotFoundError(f"配置文件不存在:{config_path}") with open(config_path, "r", encoding="utf-8") as f: config = yaml.safe_load(f) logger.info(f"✅ 配置文件加载成功:{config_path}") return config except Exception as e: logger.error(f"❌ 配置文件加载失败:{e}", exc_info=True) return {} # 全局配置对象 CONFIG = load_config()输出结果:
plaintext
2025-12-17 15:01:00,000 - spider_framework - INFO - __init__.py:20 - ✅ 配置文件加载成功:config/spider_config.yaml原理说明:
- 使用 YAML 文件管理所有配置,实现配置与代码解耦;
- 封装配置加载逻辑,异常捕获并记录日志,保证框架鲁棒性;
- 全局 CONFIG 对象供所有核心组件调用,统一配置入口。
3.3.3 核心层:请求模块(core/request_handler.py)
python
运行
import requests import time from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from core import CONFIG from utils.log_utils import logger class RequestHandler: """请求处理模块:封装HTTP请求,处理重试、限流、反爬""" def __init__(self): # 从配置加载参数 self.timeout = CONFIG.get("request", {}).get("timeout", 10) self.retry_times = CONFIG.get("request", {}).get("retry_times", 3) self.retry_interval = CONFIG.get("request", {}).get("retry_interval", 2) self.rate_limit = CONFIG.get("request", {}).get("rate_limit", 1) self.headers = CONFIG.get("request", {}).get("headers", {}) # 初始化session(复用连接) self.session = self._create_session() # 记录最后一次请求时间(用于限流) self.last_request_time = 0 def _create_session(self): """创建带重试策略的session""" session = requests.Session() # 配置重试策略 retry_strategy = Retry( total=self.retry_times, backoff_factor=self.retry_interval, status_forcelist=[429, 500, 502, 503, 504] ) # 挂载适配器 adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("https://", adapter) session.mount("http://", adapter) # 设置默认请求头 session.headers.update(self.headers) return session def _rate_limit_control(self): """请求频率控制""" current_time = time.time() interval = current_time - self.last_request_time if interval < self.rate_limit: sleep_time = self.rate_limit - interval logger.debug(f"⏳ 请求频率限制,等待{sleep_time:.2f}秒") time.sleep(sleep_time) self.last_request_time = time.time() def get(self, url, params=None, **kwargs): """发送GET请求(封装限流、重试、异常处理)""" try: # 频率控制 self._rate_limit_control() # 发送请求 logger.debug(f"🚀 发送GET请求:{url},参数:{params}") response = self.session.get( url, params=params, timeout=self.timeout, **kwargs ) # 校验状态码 response.raise_for_status() # 设置编码 response.encoding = response.apparent_encoding logger.info(f"✅ GET请求成功:{url},状态码:{response.status_code}") return response except Exception as e: logger.error(f"❌ GET请求失败:{url},异常:{e}", exc_info=True) return None def post(self, url, data=None, json=None, **kwargs): """发送POST请求(封装限流、重试、异常处理)""" try: # 频率控制 self._rate_limit_control() # 发送请求 logger.debug(f"🚀 发送POST请求:{url},数据:{data},JSON:{json}") response = self.session.post( url, data=data, json=json, timeout=self.timeout, **kwargs ) # 校验状态码 response.raise_for_status() # 设置编码 response.encoding = response.apparent_encoding logger.info(f"✅ POST请求成功:{url},状态码:{response.status_code}") return response except Exception as e: logger.error(f"❌ POST请求失败:{url},异常:{e}", exc_info=True) return None def close(self): """关闭session""" self.session.close() logger.info("🔌 请求session已关闭") # 全局请求处理器对象 request_handler = RequestHandler()使用示例 & 输出结果:
python
运行
# 测试GET请求 response = request_handler.get("https://jsonplaceholder.typicode.com/posts/1") print(f"响应状态码:{response.status_code}") print(f"响应内容:{response.json()}")plaintext
2025-12-17 15:02:00,000 - spider_framework - DEBUG - request_handler.py:78 - 🚀 发送GET请求:https://jsonplaceholder.typicode.com/posts/1,参数:None 2025-12-17 15:02:01,000 - spider_framework - INFO - request_handler.py:92 - ✅ GET请求成功:https://jsonplaceholder.typicode.com/posts/1,状态码:200 响应状态码:200 响应内容:{'userId': 1, 'id': 1, 'title': 'sunt aut facere repellat provident occaecati excepturi optio reprehenderit', 'body': 'quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto'}原理说明:
- 封装 GET/POST 请求,集成重试、限流、编码处理等核心功能;
- 使用 Session 复用 TCP 连接,提升请求效率;
- 基于配置文件动态调整请求参数,无需修改代码;
- 完善的日志记录和异常处理,便于问题定位。
3.3.4 核心层:解析模块(core/parse_handler.py)
python
运行
from bs4 import BeautifulSoup from jsonpath import jsonpath from core import CONFIG from utils.log_utils import logger class ParseHandler: """解析处理模块:支持JSON/HTML数据解析""" def __init__(self): # 从配置加载解析规则 self.json_rules = CONFIG.get("parse", {}).get("json_rules", {}) self.html_rules = CONFIG.get("parse", {}).get("html_rules", {}) def parse_json(self, json_data, rule_name): """ 解析JSON数据(基于配置的解析规则) :param json_data: 原始JSON数据(dict/list) :param rule_name: 解析规则名称(对应配置中的json_rules) :return: 解析后的数据列表 """ if not json_data or rule_name not in self.json_rules: logger.warning(f"⚠️ JSON解析规则不存在或数据为空:{rule_name}") return [] rule = self.json_rules[rule_name] fields = rule.get("fields", {}) result = [] # 处理列表型JSON if isinstance(json_data, list): for item in json_data: parsed_item = self._parse_single_json(item, fields) if parsed_item: result.append(parsed_item) # 处理字典型JSON elif isinstance(json_data, dict): parsed_item = self._parse_single_json(json_data, fields) if parsed_item: result.append(parsed_item) logger.info(f"✅ JSON解析完成:{rule_name},解析出{len(result)}条数据") return result def _parse_single_json(self, json_item, fields): """解析单个JSON对象""" parsed_item = {} for field_name, jsonpath_expr in fields.items(): # 使用jsonpath提取字段(支持嵌套) values = jsonpath(json_item, jsonpath_expr) if values: # 处理列表结果,取第一个值 parsed_item[field_name] = values[0] if isinstance(values, list) else values else: parsed_item[field_name] = "" logger.debug(f"⚠️ 字段提取失败:{field_name},表达式:{jsonpath_expr}") return parsed_item def parse_html(self, html_content, rule_name): """ 解析HTML数据(基于配置的解析规则) :param html_content: 原始HTML字符串 :param rule_name: 解析规则名称(对应配置中的html_rules) :return: 解析后的数据列表 """ if not html_content or rule_name not in self.html_rules: logger.warning(f"⚠️ HTML解析规则不存在或数据为空:{rule_name}") return [] rule = self.html_rules[rule_name] item_selector = rule.get("item_selector") fields = rule.get("fields", {}) # 解析HTML soup = BeautifulSoup(html_content, "lxml") items = soup.select(item_selector) result = [] for item in items: parsed_item = {} for field_name, css_expr in fields.items(): try: # 支持CSS选择器+文本提取(如span.name::text) if "::text" in css_expr: css_expr = css_expr.replace("::text", "") value = item.select_one(css_expr).get_text(strip=True) if item.select_one(css_expr) else "" else: value = item.select_one(css_expr).get("href", "") if item.select_one(css_expr) else "" parsed_item[field_name] = value except Exception as e: parsed_item[field_name] = "" logger.debug(f"⚠️ HTML字段提取失败:{field_name},异常:{e}") result.append(parsed_item) logger.info(f"✅ HTML解析完成:{rule_name},解析出{len(result)}条数据") return result # 全局解析处理器对象 parse_handler = ParseHandler()使用示例 & 输出结果:
python
运行
# 测试JSON解析 json_data = {"id": 1, "title": "测试标题", "body": "测试内容"} parsed_json = parse_handler.parse_json(json_data, "demo_api") print(f"JSON解析结果:{parsed_json}") # 测试HTML解析 html_content = """ <div class="item"> <span class="name">测试商品</span> <span class="price">99.9元</span> </div> """ parsed_html = parse_handler.parse_html(html_content, "demo_html") print(f"HTML解析结果:{parsed_html}")plaintext
2025-12-17 15:03:00,000 - spider_framework - INFO - parse_handler.py:52 - ✅ JSON解析完成:demo_api,解析出1条数据 JSON解析结果:[{'id': 1, 'title': '测试标题', 'content': '测试内容'}] 2025-12-17 15:03:00,000 - spider_framework - INFO - parse_handler.py:95 - ✅ HTML解析完成:demo_html,解析出1条数据 HTML解析结果:[{'name': '测试商品', 'price': '99.9元'}]原理说明:
- 支持 JSON/HTML 两种主流数据格式解析,基于配置规则提取字段;
- JSON 解析使用 jsonpath 支持嵌套字段提取,HTML 解析使用 BeautifulSoup+CSS 选择器;
- 字段提取失败时设置空值并记录日志,避免解析流程中断;
- 解析规则与代码分离,新增解析规则仅需修改配置文件。
3.3.5 核心层:存储模块(core/storage_handler.py)
python
运行
import os import pandas as pd from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData from core import CONFIG from utils.log_utils import logger class StorageHandler: """存储处理模块:支持CSV/Excel/SQLite存储""" def __init__(self): # 从配置加载存储参数 self.default_format = CONFIG.get("storage", {}).get("default_format", "csv") self.save_path = CONFIG.get("storage", {}).get("save_path", "data") self.sqlite_db = CONFIG.get("storage", {}).get("sqlite_db", "data/spider_data.db") # 创建数据保存目录 if not os.path.exists(self.save_path): os.makedirs(self.save_path) logger.info(f"📁 创建数据保存目录:{self.save_path}") # 初始化SQLite引擎(按需) self.sqlite_engine = None def _init_sqlite(self): """初始化SQLite引擎""" if not self.sqlite_engine: self.sqlite_engine = create_engine(f"sqlite:///{self.sqlite_db}") logger.info(f"🗄️ SQLite引擎初始化完成:{self.sqlite_db}") def save_to_csv(self, data, filename, header=True): """ 保存数据到CSV文件 :param data: 解析后的数据列表(dict/list) :param filename: 文件名(不含后缀) :param header: 是否写入表头 """ if not data: logger.warning("⚠️ 无数据可保存到CSV") return False try: file_path = os.path.join(self.save_path, f"{filename}.csv") df = pd.DataFrame(data) # 追加/写入模式 mode = "a" if os.path.exists(file_path) and not header else "w" df.to_csv( file_path, mode=mode, header=header, index=False, encoding="utf-8-sig" ) logger.info(f"✅ 数据保存到CSV成功:{file_path},数据量:{len(data)}") return True except Exception as e: logger.error(f"❌ 保存CSV失败:{e}", exc_info=True) return False def save_to_excel(self, data, filename, header=True): """保存数据到Excel文件""" if not data: logger.warning("⚠️ 无数据可保存到Excel") return False try: file_path = os.path.join(self.save_path, f"{filename}.xlsx") df = pd.DataFrame(data) with pd.ExcelWriter(file_path, mode="a" if os.path.exists(file_path) and not header else "w") as writer: df.to_excel(writer, sheet_name="data", index=False, header=header) logger.info(f"✅ 数据保存到Excel成功:{file_path},数据量:{len(data)}") return True except Exception as e: logger.error(f"❌ 保存Excel失败:{e}", exc_info=True) return False def save_to_sqlite(self, data, table_name): """保存数据到SQLite数据库""" if not data: logger.warning("⚠️ 无数据可保存到SQLite") return False try: # 初始化SQLite引擎 self._init_sqlite() # 转换为DataFrame并写入数据库 df = pd.DataFrame(data) df.to_sql( table_name, self.sqlite_engine, if_exists="append", index=False ) logger.info(f"✅ 数据保存到SQLite成功:{table_name},数据量:{len(data)}") return True except Exception as e: logger.error(f"❌ 保存SQLite失败:{e}", exc_info=True) return False def save(self, data, filename, format_type=None, table_name="spider_data"): """ 统一保存接口(自动选择存储方式) :param data: 解析后的数据列表 :param filename: 文件名(不含后缀) :param format_type: 存储格式(csv/excel/sqlite),默认使用配置中的default_format :param table_name: SQLite表名 """ format_type = format_type or self.default_format if format_type == "csv": return self.save_to_csv(data, filename) elif format_type == "excel": return self.save_to_excel(data, filename) elif format_type == "sqlite": return self.save_to_sqlite(data, table_name) else: logger.error(f"❌ 不支持的存储格式:{format_type}") return False # 全局存储处理器对象 storage_handler = StorageHandler()使用示例 & 输出结果:
python
运行
# 测试CSV存储 test_data = [{"id": 1, "title": "测试1"}, {"id": 2, "title": "测试2"}] storage_handler.save(test_data, "demo_data", format_type="csv")plaintext
2025-12-17 15:04:00,000 - spider_framework - INFO - storage_handler.py:35 - 📁 创建数据保存目录:data 2025-12-17 15:04:00,000 - spider_framework - INFO - storage_handler.py:60 - ✅ 数据保存到CSV成功:data/demo_data.csv,数据量:2原理说明:
- 封装 CSV/Excel/SQLite 三种主流存储方式,提供统一保存接口;
- 支持追加写入,避免重复生成文件;
- CSV 存储使用
utf-8-sig编码,解决中文乱码问题; - 延迟初始化 SQLite 引擎,减少资源占用。
3.3.6 核心层:调度模块(core/scheduler.py)
python
运行
from core import request_handler, parse_handler, storage_handler from utils.log_utils import logger class Scheduler: """调度模块:管理爬取任务全流程""" def __init__(self): self.task_queue = [] # 爬取任务队列 def add_task(self, task): """ 添加爬取任务 :param task: 任务字典,格式: { "task_name": "任务名称", "url": "目标URL", "request_type": "get/post", "parse_type": "json/html", "parse_rule": "解析规则名称", "storage_format": "csv/excel/sqlite", "storage_filename": "存储文件名" } """ required_fields = ["task_name", "url", "request_type", "parse_type", "parse_rule", "storage_filename"] for field in required_fields: if field not in task: logger.error(f"❌ 任务缺少必填字段:{field}") return False self.task_queue.append(task) logger.info(f"✅ 添加爬取任务成功:{task['task_name']},当前任务队列长度:{len(self.task_queue)}") return True def run_task(self, task): """执行单个爬取任务""" try: logger.info(f"🚀 开始执行任务:{task['task_name']}") # 1. 发送请求 if task["request_type"] == "get": response = request_handler.get(task["url"]) elif task["request_type"] == "post": response = request_handler.post(task["url"], json=task.get("post_data")) else: logger.error(f"❌ 不支持的请求类型:{task['request_type']}") return False if not response: return False # 2. 解析数据 if task["parse_type"] == "json": raw_data = response.json() parsed_data = parse_handler.parse_json(raw_data, task["parse_rule"]) elif task["parse_type"] == "html": raw_data = response.text parsed_data = parse_handler.parse_html(raw_data, task["parse_rule"]) else: logger.error(f"❌ 不支持的解析类型:{task['parse_type']}") return False if not parsed_data: logger.warning(f"⚠️ 任务解析无数据:{task['task_name']}") return False # 3. 存储数据 storage_result = storage_handler.save( parsed_data, task["storage_filename"], format_type=task.get("storage_format") ) if storage_result: logger.info(f"✅ 任务执行成功:{task['task_name']}") return True else: logger.error(f"❌ 任务存储失败:{task['task_name']}") return False except Exception as e: logger.error(f"❌ 任务执行异常:{task['task_name']},异常:{e}", exc_info=True) return False def run_all_tasks(self): """执行所有任务""" logger.info(f"🚀 开始执行所有爬取任务,任务总数:{len(self.task_queue)}") success_count = 0 fail_count = 0 for task in self.task_queue: if self.run_task(task): success_count += 1 else: fail_count += 1 # 清空任务队列 self.task_queue.clear() logger.info(f"📊 所有任务执行完成:成功{success_count}个,失败{fail_count}个") return { "success": success_count, "fail": fail_count } # 全局调度器对象 scheduler = Scheduler()原理说明:
- 调度模块是框架的核心入口,串联 “请求→解析→存储” 全流程;
- 标准化任务格式,支持批量添加和执行任务;
- 任务执行结果统计,便于监控爬取效率;
- 单个任务异常不影响其他任务执行,保证框架稳定性。
3.4 业务层:示例爬虫(business/demo_spider.py)
python
运行
from core.scheduler import scheduler from utils.log_utils import logger def run_demo_spider(): """运行示例爬虫""" # 定义爬取任务 demo_task = { "task_name": "JSONPlaceholder帖子爬取", "url": "https://jsonplaceholder.typicode.com/posts", "request_type": "get", "parse_type": "json", "parse_rule": "demo_api", "storage_format": "csv", "storage_filename": "jsonplaceholder_posts" } # 添加任务 scheduler.add_task(demo_task) # 执行所有任务 result = scheduler.run_all_tasks() logger.info(f"📈 示例爬虫执行结果:{result}") return result if __name__ == "__main__": run_demo_spider()3.5 框架入口(main.py)
python
运行
from business.demo_spider import run_demo_spider from utils.log_utils import logger from core.request_handler import request_handler def main(): """框架主入口""" logger.info("🎉 基础爬虫框架启动") try: # 运行示例爬虫 run_demo_spider() finally: # 关闭请求session request_handler.close() logger.info("🛑 基础爬虫框架退出") if __name__ == "__main__": main()完整运行输出结果:
plaintext
2025-12-17 15:05:00,000 - spider_framework - INFO - main.py:8 - 🎉 基础爬虫框架启动 2025-12-17 15:05:00,000 - spider_framework - INFO - scheduler.py:45 - ✅ 添加爬取任务成功:JSONPlaceholder帖子爬取,当前任务队列长度:1 2025-12-17 15:05:00,000 - spider_framework - INFO - scheduler.py:89 - 🚀 开始执行所有爬取任务,任务总数:1 2025-12-17 15:05:00,000 - spider_framework - INFO - scheduler.py:59 - 🚀 开始执行任务:JSONPlaceholder帖子爬取 2025-12-17 15:05:00,000 - spider_framework - DEBUG - request_handler.py:78 - 🚀 发送GET请求:https://jsonplaceholder.typicode.com/posts,参数:None 2025-12-17 15:05:01,000 - spider_framework - INFO - request_handler.py:92 - ✅ GET请求成功:https://jsonplaceholder.typicode.com/posts,状态码:200 2025-12-17 15:05:01,000 - spider_framework - INFO - parse_handler.py:52 - ✅ JSON解析完成:demo_api,解析出100条数据 2025-12-17 15:05:01,000 - spider_framework - INFO - storage_handler.py:60 - ✅ 数据保存到CSV成功:data/jsonplaceholder_posts.csv,数据量:100 2025-12-17 15:05:01,000 - spider_framework - INFO - scheduler.py:80 - ✅ 任务执行成功:JSONPlaceholder帖子爬取 2025-12-17 15:05:01,000 - spider_framework - INFO - scheduler.py:97 - 📊 所有任务执行完成:成功1个,失败0个 2025-12-17 15:05:01,000 - spider_framework - INFO - demo_spider.py:22 - 📈 示例爬虫执行结果:{'success': 1, 'fail': 0} 2025-12-17 15:05:01,000 - spider_framework - INFO - request_handler.py:115 - 🔌 请求session已关闭 2025-12-17 15:05:01,000 - spider_framework - INFO - main.py:15 - 🛑 基础爬虫框架退出四、框架扩展与优化
4.1 框架扩展方向
| 扩展维度 | 具体方案 | 技术选型 |
|---|---|---|
| 多线程 / 异步爬取 | 扩展调度模块,支持多线程 / 异步任务执行 | threading/aiohttp/asyncio |
| 代理池集成 | 扩展请求模块,支持自动切换代理 IP | requests-proxies / 自建代理池 |
| 反爬对抗 | 扩展请求模块,集成 Cookie 池、指纹伪装 | faker/undetected-chromedriver |
| 数据去重 | 扩展存储模块,支持基于唯一键去重 | redis/pandas 去重 |
| 进度监控 | 扩展调度模块,添加进度条和实时统计 | tqdm/prometheus |
4.2 框架优化建议
- 配置热更新:使用 watchdog 监控配置文件变化,无需重启框架即可更新配置;
- 连接池优化:调整 requests.Session 的连接池大小,提升大批量请求效率;
- 内存优化:大批量数据存储时,采用分批写入策略,避免内存溢出;
- 日志轮转:使用 logging.handlers.RotatingFileHandler 实现日志文件轮转,避免日志文件过大;
- 单元测试:为核心组件编写单元测试,保证框架稳定性(pytest)。
4.3 框架使用规范
- 新增爬取业务:仅需在配置文件添加解析规则,在业务层定义任务字典,无需修改核心代码;
- 异常处理规范:所有自定义异常需继承框架基础异常类,统一异常格式;
- 日志规范:按级别记录日志(DEBUG:调试信息,INFO:正常流程,WARNING:警告,ERROR:错误,CRITICAL:严重错误);
- 配置规范:新增配置项需在 YAML 文件中添加注释,标明用途和默认值。
五、总结
基础爬虫框架的核心价值在于 “标准化、可复用、可扩展”—— 通过分层化、模块化的架构设计,将爬虫开发拆分为配置管理、请求处理、数据解析、数据存储、任务调度等独立组件,既降低了开发门槛,又提升了代码的可维护性。本文设计的基础爬虫框架覆盖了爬虫开发的核心场景,支持 JSON/HTML 解析、多格式存储、重试 / 限流 / 异常处理等核心功能,可作为各类爬虫项目的基础模板。
在实际应用中,需根据业务需求对框架进行扩展:简单爬取场景可直接使用核心组件,大批量 / 高反爬场景可扩展多线程、代理池、反爬对抗等功能。同时,需遵循框架使用规范,保证代码的一致性和可维护性。掌握基础爬虫框架的设计思路,是从 “初级爬虫开发者” 向 “高级爬虫工程师” 进阶的关键。
扩展建议
- 进阶框架学习:研究 Scrapy 框架的核心架构,吸收其组件设计思想;
- 分布式扩展:基于 celery/redis 实现分布式爬虫框架,提升爬取效率;
- 可视化管理:结合 FastAPI/Streamlit 开发爬虫管理界面,支持任务配置、进度监控、数据查看;
- 合规性优化:添加 robots.txt 校验、请求频率限制,保证爬虫合规性。