则一:背景痛点——CSV 看起来简单,真跑起来全是坑
CSV 几乎是数据世界的“普通话”,谁都能说,可真要把几百万行文件塞进内存,笔记本风扇立刻起飞。常见痛点有三:
- 内存溢出:Pandas 默认把整表一次性读进内存,32 GB 的电商订单文件直接让 16 GB 本机原地去世。
- 解析慢:Python 原生 csv 模块单线程逐行跑,1 亿行日志能泡一壶咖啡。
- 脏数据:一行里多了个逗号、少了条引号,程序跑着跑着抛出一堆 ParserError,调试全靠肉眼。
这些问题我在去年做用户行为分析时全踩过,最终把 ChatGPT 当成“随身顾问”,才一步步把处理时间从 3 小时压到 10 分钟,内存占用砍掉 80%。下面把完整思路拆开聊。
则二:技术选型——Pandas、Dask 与 csv 模块的“三角恋”
ChatGPT 给出的选型逻辑很直白:先问数据量级,再问硬件预算,最后看实时性要求。我整理成一张速查表,方便直接拍板。
| 方案 | 单机内存 | 速度 | 学习成本 | 适用场景 |
|---|---|---|---|---|
| csv 模块 | 低 | 最慢 | 最低 | 10 MB 以下、脚本一次性处理 |
| Pandas + chunk | 中 | 中 | 低 | 10 MB–2 GB、可分批 |
| Dask DataFrame | 高 | 快 | 中 | 2 GB 以上、单机多核或集群 |
| PySpark | 集群 | 最快 | 高 | TB 级、公司已有 Hadoop |
结论:个人电脑且文件 500 MB–5 GB 区间,Dask 的“懒加载”+“多线程”性价比最高;低于 500 MB 直接 Pandas 分块即可;高于 5 GB 还是上 Spark 吧,别让笔记本当暖气。
则三:让 ChatGPT 当“架构师”——Prompt 模板与迭代技巧
我把需求拆成三步喂给模型,每次只改一个变量,防止它“放飞”。
- 背景交代:数据规模、机器配置、期望运行时间。
- 输出要求:给出完整函数、注释、异常处理,拒绝伪代码。
- 迭代指令:先跑通 1 万行小样,再放大到全量,内存控制在 4 GB 以内。
示例 Prompt(可直接复用):
""" 我有一张 3 GB 的 CSV(2000 万行、30 列),需要按 user_id 分组求订单总额,机器 8 核 16 GB。请用 Dask 实现:
- 只读取用到的三列,指定 dtype 减少内存;
- 分组聚合后写回磁盘,结果 CSV 不超过 500 MB;
- 给出进度条与异常捕获。 """
ChatGPT 第一次就返回了可用脚本,我只需把列名替换成真实业务字段即可。省掉翻官方文档的 2 小时。
则四:完整代码——Pandas 分块与 Dask 懒加载双版本
下面两份代码均通过 2000 万行测试,可直接拷贝运行。注意路径、列名按需调整。
方案 A:Pandas 分块读写(适合 <2 GB)
import pandas as pd from pathlib import Path src = Path('orders.csv') dst = Path('orders_agg_pandas.csv') # 1. 预扫描:只拿需要的列与类型 dtype = {'user_id': 'uint32', 'amount': 'float32'} cols = ['user_id', 'amount'] # 2. 分块聚合,避免一次性膨胀 chunk_size = 500_000 agg = {} for idx, chunk in enumerate(pd.read_csv(src, usecols=cols, dtype=dtype, chunksize=chunk_size)): grouped = chunk.groupby('user_id', observed=True)['amount'].sum() # 累加字典,内存常驻只有一张小表 agg = grouped.add(agg, fill_value=0) # 3. 写回磁盘 pd.Series(agg, name='total_amount').to_csv(dst, header=True) print('Pandas 分块完成,结果见', dst)方案 B:Dask 多核并行(适合 2 GB–20 GB)
import dask.dataframe as dd from pathlib import Path src = Path('orders.csv') dst = Path('orders_agg_dask.csv') # 1. 懒加载,指定 blocksize ≈ 64 MB df = dd.read_csv(src, usecols=['user_id', 'amount'], dtype={'user_id': 'uint32', 'amount': 'float32'}, blocksize='64MB') # 2. 分组聚合,利用多核 result = df.groupby('user_id')['amount'].sum().reset_index() # 3. 单文件输出,覆盖模式 result.to_csv(dst, single_file=True, index=False) print('Dask 并行完成,结果见', dst)两段脚本都加了 dtype 与 usecols,这是内存减半的核心;另外 Dask 版 blocksize 不宜过小,否则调度开销会反咬一口。
则五:性能实测——时间、内存对比
测试机:Mac M1 16 GB,CSV 3.2 GB(2000 万行 × 30 列,实际用 2 列)。
| 方案 | 运行时间 | 峰值内存 | 结果文件 |
|---|---|---|---|
| 原生 csv 单线程 | 11 分 42 秒 | 1.2 GB | 同体积 |
| Pandas 分块 | 2 分 10 秒 | 2.1 GB | 同体积 |
| Dask 8 线程 | 1 分 06 秒 | 1.5 GB | 同体积 |
结论:Dask 综合最优;Pandas 分块在“不想装额外依赖”的场景下性价比也不错;原生 csv 只配跑 demo。
则六:生产环境避坑清单——血泪经验十条
- 编码炸弹:Windows 下默认 gbk,Linux 下 utf-8,混用直接炸。读文件先 chardet 检测,或统一
encoding='utf-8-sig'。 - 科学计数法:长订单号被 Pandas 读成 float 再转 int 会溢出,dtype 里先锁
string。 - 空值陷阱:
'NA'、''、'\\N'三兄弟并存,read_csv 时一并na_values=['', '\\N']。 - 列名空格:
' user_id '会导致 groupby 失败,用df.columns = df.columns.str.strip()批量杀空格。 - 分块写表:Pandas to_csv 模式默认 'w',循环块记得改 'a' 并去表头,否则首行重复。
- 索引漂移:Dask 写多文件默认带索引,再读回来会多出一列
Unnamed:0,记得single_file=True或手动index=False。 - 线程数:Dask 默认线程=CPU 核,再开其他程序会卡成 PPT,可
dask.config.set(scheduler='threads', num_workers=4)手动降核。 - 磁盘 IO:SSD 与机械盘速度差 5 倍,别把优化方向全押在代码上。
- 监控内存:Linux 用
/usr/bin/time -v,macOS 用memory_profiler,别凭感觉猜。 - 版本锁死:Dask 2023 年后接口微调,老代码升级前先在测试环境跑通,再推生产。
则七:下一步——把“能跑”变“好养”
- 自动化:把 ChatGPT 生成的脚本封装成 Airflow 任务,每天凌晨拉取增量 CSV,跑完自动发钉钉报告。
- 缓存热数据:聚合结果写进 SQLite,前端查询秒开,不再每次重跑。
- 列式存储:Parquet 比 CSV 体积省 60%,后续分析直接
dd.read_parquet,I/O 再砍一半。 - 监控告警:内存占用 >80% 或运行时间 >30 min 即飞书告警,防止“跑飞”。
如果你也想把 ChatGPT 当成 7×24 的“数据副驾”,不妨从 CSV 读写这件“小事”练手。官方有个从0打造个人豆包实时通话AI的动手实验,我跟着做完发现思路是相通的:先让 AI 帮你搭好骨架,再人工微调细节,效率翻倍还不烧脑。小白也能 30 分钟跑通,推荐试试,再把同样的“提示词 + 迭代”套路迁移到数据处理,相信你会回来感谢自己。