背景痛点:JIMI 数据“看起来香,啃起来硬”
京东把 JIMI 智能客服的公开对话数据放出来后,很多团队第一时间下载,结果普遍卡在三个地方:
- 体量惊人:压缩包 8 GB,解压后 50 GB+ 的 JSON,单文件直接吃光 32 GB 内存。
- 结构松散:每行一条对话,字段层级深(
session.messages[].content嵌套数组),MySQL 需要拆成多表才能查。 - 分析无从下手:关键词搜“退货”能扫出 10 W 条,但找不到“情绪激烈且退货”的会话,传统 LIKE 性能瞬间爆炸。
一句话:数据金矿摆在面前,却没有趁手的铲子。
技术选型:为什么把 MySQL 换成 Elasticsearch
| 维度 | MySQL 8.0 | Elasticsearch 8.x |
|---|---|---|
| 全文检索 | 需外挂 Sphinx,延迟 200 ms+ | 内置倒排,10 ms 内 |
| 嵌套 JSON | 拆表或 JSON 字段,查询复杂 | 原生 nested,一条 DSL 搞定 |
| 水平扩展 | 主从只读,分库分表改业务 | 加节点自动重平衡 |
| 聚合 | group by 容易内存临时表 | 分布式 agg,毫秒级 |
结论:JIMI 数据是非结构化文本 + 深度嵌套,分析场景以“搜→聚→看”为主,Elasticsearch 在开发效率与查询性能上都更省心。
核心实现:从脏数据到语义洞察
1. 数据清洗流程(Python 3.10)
目标:把原始.jsonl转成“干净、扁平、带情绪标签”的文档。
# clean_jimi.py import json, re, emoji, tqdm from pathlib import Path PUNCT = re.compile(r"[~!@#$%^&*()_+`\-={}|\[\]:\";'<>?,./]") def clean_text(text: str) -> str: """去表情、去网址、去标点""" text = emoji.replace_emoji(text, replace='') text = re.sub(r'http\S+', '', text) text = PUNCT.sub(' ', text) return ' '.join(text.split()) # 合并多余空格 def iter_session(file_path): """原始文件一行一个 session""" with open(file_path, encoding='utf-8') as f: for line in f: yield json.loads(line) def flatten(session): """把嵌套对话拍平,保留上下文""" sid = session['session_id'] msgs = [] for turn in session['messages']: role = turn['from'] # user / bot msgs.append({ 'role': role, 'content': clean_text(turn['content']), 'ts': turn['timestamp'] }) return { 'session_id': sid, 'msgs': msgs, 'msg_cnt': len(msgs), 'has_order': bool(session.get('order_id')), 'create_time': session['create_time'][:10] # 只取日期 } if __name__ == '__main__': src = Path('jimi_raw.jsonl') dst = Path('jimi_clean.jsonl') with dst.open('w', encoding='utf-8') as f_out: for ses in tqdm.tqdm(iter_session(src), total=6_800_000): f_out.write(json.dumps(flatten(ses), ensure_ascii=False) + '\n')跑完得到 6 800 000 条扁平文档,体积从 50 GB 降到 21 GB,磁盘节省 58 %。
2. Elasticsearch 索引设计与优化
2.1 映射模板
PUT /jimi { "settings": { "number_of_shards": 6, "number_of_replicas": 1, "refresh_interval": "30s", "analysis": { "analyzer": { "cjk_smart": { "tokenizer": "jieba_index", "filter": ["lowercase", "stop"] } } } }, "mappings": { "properties": { "session_id": {"type": "keyword"}, "create_time": {"type": "date", "format": "yyyy-MM-dd"}, "has_order": {"type": "boolean"}, "msg_cnt": {"type": "short"}, "msgs": { "type": "nested", "properties": { "role": {"type": "keyword"}, "content": { "type": "text", "analyzer": "cjk_smart", "fields": {"raw": {"type": "keyword"}} }, "ts": {"type": "date", "format": "epoch_second"} } } } } }说明:
- 6 分片对应 3 节点集群,每节点 2 片,保证 CPU 用满。
- nested 结构让“用户→客服→用户”时序不丢失,也能独立查询任一角色。
- refresh 30 s 兼顾写入吞吐与实时可见性。
2.2 批量灌库(Python)
from elasticsearch import Elasticsearch, helpers es = Elasticsearch(['http://es1:9200'], request_timeout=120) def gendocs(): with open('jimi_clean.jsonl', encoding='utf-8') as f: for line in f: doc = json.loads(line) doc['_index'] = 'jimi' yield doc helpers.bulk(es, gendocs(), chunk_size=3000, max_retries=3)单节点 2 万 doc/s,三节点合计 6 万 doc/s,20 min 完成全量。
3. 基于 NLP 的语义分析实现
需求:找出“情绪激烈且要求退货”的会话,传统关键词召回率 62 %,需要语义升级。
步骤:
- 用开源 RoBERTa-wwm-ext 训练二分类模型(激动 / 平和),标注 1 万条,F1=0.89。
- 离线跑完 680 万条,把预测结果写回新字段
msgs.anger_score。 - 查询时加一条 nested filter,秒级返回。
DSL 示例:
GET /jimi/_search { "query": { "bool": { "must": [ {"nested": { "path": "msgs", "query": {"match": { "msgs.content": "退货" }} }}, {"nested": { "path": "msgs", "query": {"range": {"msgs.anger_score": {"gte": 0.8}}} }} ] } }, "aggs": { "daily": { "date_histogram": {"field": "create_time", "calendar_interval": "day"} } } }返回 1 200 条会话,比纯关键词少 85 % 噪声,运营同学直接可用。
性能考量:别让查询拖垮集群
测试环境:3 节点 16 核 64 GB SSD,JVM 31 GB。
| 场景 | 结果集 | 耗时 | 优化前 |
|---|---|---|---|
| 关键词“退货” | 10 W | 120 ms | 800 ms |
| 关键词+anger_score≥0.8 | 1.2 K | 45 ms | — |
| 聚合近 30 天情绪趋势 | 30 桶 | 350 ms | 2 100 ms |
优化手段:
- 把
anger_score从 float 降到half_float,节省 50 % 磁盘。 - 聚合查询加
"execution_hint": "map",对时间直方图效果显著。 - 热温分层:近 7 天热数据 SSD,>7 天自动迁到机械盘,查询慢 15 %,成本降 60 %。
避坑指南:生产踩过的 4 个坑
分片数随意定 → 后期扩容痛苦
建议:数据量 <100 GB 用 3~6 片,>500 GB 直接 12 片,避免 split 操作。nested 嵌套层数过深 → 内存爆炸
真实案例:把“词级别”再嵌一层,聚合直接 OOM。保持 1 层 nested 足够。用默认 dynamic mapping,字符串被猜成 text+keyword 双字段 → 索引膨胀 1.8 倍
提前手动建 mapping,关闭"dynamic": "strict",字段缺失就报错,早发现早修正。中文分词用 standard 分词 → 查“客服”把“客”和“服”拆开,召回离谱
一定装 jieba 或 ik,再自定义业务词库(“京东自营”“价保”等),准确率提升 30 %。
总结与展望:把对话数据玩出更多花样
整套流程跑下来,我们把 50 GB 原始日志变成可秒搜、可语义过滤、可聚合的业务资产,核心经验只有两句话:
- 存储选对,后面少踩 80 % 的坑;
- 清洗+语义一步到位,别让“垃圾进,垃圾出”。
下一步可继续扩展:
- 实时情绪预警:把 Flink 消费在线对话,调用同一模型写 Elasticsearch,实现 5 min 级情绪大盘。
- 多轮意图提取:用 seq2seq 标注每轮意图,再聚合看“咨询→投诉→退货”漏斗,反向优化客服剧本。
- 知识蒸馏:把高 anger_score 且最终解决的会话挑出来,自动生成 FAQ,反哺机器人训练。
如果你已经用类似思路落地,欢迎留言交流踩坑细节;如果正准备动手,希望这篇笔记能让你少走一点弯路。