背景:对话系统“长胖”后的烦恼
线上跑过 ChatGPT 类应用的团队都体会过同一件事:对话越久,系统越“胖”。
- 单轮对话平均 0.8 KB,日活 10 k 用户、每人 30 轮就能轻易把历史表撑到 8 GB;
- 为了做多轮上下文,LLM 调用前必须回拉最近 N 条消息,B+Tree 在 5000 万行时 RT 中位数从 20 ms 飙到 200 ms;
- 运营还想做“对话续写”“会话搜索”,模糊查询一跑就是全表扫描,CPU 立刻打满。
一句话:历史数据膨胀 → 检索效率雪崩 → 用户体验滑坡。Archive 方案的核心目标就是“让旧数据别碍事,让热数据飞起来”。
存储选型:三大数据库硬刚对话场景
先给出结论,再解释原因:
| 维度 | 关系型(Postgres) | 文档型(Mongo) | 时序型(Influx) |
|---|---|---|---|
| 事务一致性 | 强 | 弱 | 无 |
| 检索灵活度 | SQL 万能 | 嵌套查询够用 | 标签过滤为主 |
| 水平扩展 | 分片复杂 | 原生分片 | 时序分区 |
| 压缩率 | 低 | 中 | 高 |
| 延迟 P99 | 50 ms | 30 ms | 10 ms |
| 成本(GB/月) | 高 | 中 | 低 |
关系型:
- 优点:ACID、外键、UPDATE 友好。
- 痛点:对话是典型的追加写,B+Tree 频繁页分裂,磁盘随机写放大;长文本(system prompt + 回答)让行宽超 8 KB,TOAST 压缩反而拖慢索引。
文档型:
- 优点:一回合对话直接塞一个 doc,schema-less 方便加字段(如 token 用量、情绪标签)。
- 痛点:默认不压缩,WT 引擎 16 MB 页在随机读时放大了冷延迟;多租户场景下若按 user_id 散列,chunk 迁移会拖垮写入。
时序型:
- 优点:天生按时间线分片,TSM 压缩算法对“消息流”极致友好,磁盘占用可降 70%。
- 痛点:不支持单行更新,一旦消息需要“编辑”或“撤回”只能整点重写;二级索引弱,搜索关键词需外挂倒排。
最终我们采用“混合体”:
- 热数据 → Redis Stream(内存,按 session 做 key)
- 温数据 → Postgres 分区表(按天分区,30 天自动转冷)
- 冷数据 → Parquet 文件 + MinIO(对象存储,按 user_id/年 月 前缀)
分层存储架构:让数据“各回各家”
热层(0–6 h)
写操作双写:- 先写 Redis Stream,保证 LLM 实时上下文毫秒级返回;
- 异步刷盘到 Postgres 当日分区,失败重试 3 次后入死信队列。
温层(6 h–30 d)
Postgres 分区表 + BRIN 索引(block range index)对 (user_id, ts) 做排序,保证“续写”查询 30 天内在 50 ms 内返回。冷层(30 d+)
凌晨 02:00 调度器把 30 天前分区 COPY 到 Parquet,Snappy 压缩后推 MinIO;原表 DROP PARTITION,磁盘瞬间回收。
Parquet 文件内按“对话线程”聚合,每 1000 条消息打一个 Row Group,方便后续 Spark 批量算 token 成本。检索链路
用户请求 → 先查 Redis(热);miss 则查 Postgres(温);再 miss 走对象存储 + Athena 做离线搜索。
三层命中率 85%/14%/1%,整体 P99 延迟 120 ms。
增量式压缩算法:让历史“瘦成闪电”
对话天然有局部重复:system prompt 常常不变,用户会复制粘贴。我们用“增量链”思路:
- 每 100 条消息做一次全量快照(msg_id 递增);
- 中间消息只存 diff(最长公共子序列,LCS);
- 读时按快照 + diff 重放即可还原原文。
核心代码(Python 3.8+):
from typing import List, Tuple import difflib import json import zlib class ThreadCompressor: """增量压缩/解压对话线程""" def __init__(self, snapshot_interval: int = 100): self.snapshot_interval = snapshot_interval def compress(self, messages: List[str]) -> List[Tuple[str, bytes]]: """ 返回 [(type, payload)] 列表 type=‘S’ 为快照,‘D’ 为 diff """ out: List[Tuple[str, bytes]] = [] for i, msg in enumerate(messages): if i % self.snapshot_interval == 0: out.append(('S', zlib.compress(msg.encode(), level=6))) else: prev = messages[i-1] diff = '\n'.join(difflib.unified_diff( prev.splitlines(), msg.splitlines(), lineterm='')) out.append(('D', zlib.compress(diff.encode(), level=6))) return out def decompress(self, chain: List[Tuple[str, bytes]]) -> List[str]: """逆向还原""" msgs: List[str] = [] for typ, payload in chain: data = zlib.decompress(payload).decode() if typ == 'S': msgs.append(data) else: # 应用 diff last = msgs[-1].splitlines() patch = data.splitlines() recovered = list(difflib.restore( difflib.unified_diff(last, []), 2)) msgs.append('\n'.join(recovered)) return msgs实测 1000 条平均 750 KB 的对话,压缩后 120 KB,节省 84% 存储;解压 1000 次耗时 38 ms,单核 QPS ≈ 26 k,完全够线上并发。
性能基准:数字说话
测试机:c6g.xlarge(4 vCPU,8 GB),Redis 6、Postgres 14、MinIO 单盘。
数据集:1000 万条消息,500 万热、300 万温、200 万冷。
| 指标 | 纯 Postgres | 分层方案 |
|---|---|---|
| 写 QPS | 3.2 k | 9.8 k |
| 读 P50 | 18 ms | 5 ms |
| 读 P99 | 220 ms | 120 ms |
| 磁盘占用 | 48 GB | 14 GB |
| 月云盘费用(AWS gp3) | 96 USD | 28 USD |
读提升 45%,写提升 206%,成本降 70%,基本符合预期。
避坑指南:三个容易翻车的细节
消息乱序
客户端重试可能导致 19:00:05 的消息比 19:00:02 晚入库。
解决:- 每条消息带客户端生成的时间戳 + 自增 msg_id;
- 服务端用“向量时钟”合并,LLM 侧按 msg_id 排序,保证上下文一致。
冷启动缓存穿透
新用户首次对话,Redis 空记录,若瞬间高并发会直接把 Postgres 打穿。
解决:- 布隆过滤器预载全量 user_id;
- 空结果写 Redis 占位(TTL 60 s),防止重复回源。
分布式一致性
双写 Redis & Postgres,节点宕机可能丢一条。
解决:- 采用“写日志优先”策略:先写本地 WAL(Write-Ahead-Log),再用单线程 worker 顺序刷 Postgres;
- 对账时用 WAL 的 msg_id 做幂等,保证 at-least-once 最终一致。
代码片段:端到端读写示例
import asyncio import aioredis import asyncpg from datetime import datetime from typing import Optional class DialogueArchive: def __init__(self, redis_url: str, pg_pool: asyncpg.Pool): self.redis = aioredis.from_url(redis_url, decode_responses=True) self.pg = pg_pool async def append(self, user_id: str, role: str, content: str) -> None: ts = datetime.utcnow().isoformat() msg = {"role": role, "content": content, "ts": ts} # 1. 写热层 await self.redis.xadd(f"stream:{user_id}", msg, maxlen=500) # 2. 异步刷温层(可落消息队列) asyncio.create_task(self._pg_append(user_id, msg)) async def _pg_append(self, user_id: str, msg: dict) -> None: await self.pg.execute( "INSERT INTO dialogue(user_id, role, content, ts) VALUES ($1,$2,$3,$4)", user_id, msg["role"], msg["content"], msg["ts"]) async def latest(self, user_id: str, k: int = 20) -> list[dict]: # 只读热层 entries = await self.redis.xrevrange(f"stream:{user_id}", count=k) return [{"role": x[1]["role"], "content": x[1]["content"]} for x in reversed(entries)]异常处理:
- Redis 连接超时自动降级到 Postgres;
- Postgres 主键冲突捕获
asyncpg.UniqueViolationError做幂等返回。
思考与展望:成本 or 效率,必须二选一?
把冷数据直接上 Glacier 能再省 50% 费用,可检索要等 5 min 解冻;全放 SSD 能让 P99 降到 20 ms,但预算立刻翻三倍。
业务早期用户量小,建议“性能优先”,把热层窗口拉长到 24 h;
当日活破百万,再缩小窗口、加大压缩比,甚至用纠删码把 Parquet 切更碎。
存储与检索就像天平,没有银弹,只有不断把“业务可接受的慢”往左移,把“钱包可接受的价格”往右移,找到自家产品的甜蜜点。
如果你想亲手把上面这套链路跑通,推荐试试从0打造个人豆包实时通话AI动手实验。课程从火山引擎账号申请到 Web 端麦克风对接一步步带你做,我跟着敲了 2 小时就把热温冷三层跑通,小白也能顺利体验。把对话存档玩溜后,再回来读这篇文章,相信你会对“成本 vs 效率”有更深的体感。