前言
意图识别判断了"走哪条路",处理链决定了"怎么走"。本文介绍项目中两条 LangChain 处理链的设计——知识库检索链(RAG Chain)和通用对话链(General Chat Chain),以及如何用AsyncGenerator实现全链路流式输出。
为什么需要两条链?
企业知识库问答系统面临两类完全不同的问题:
| 类型 | 示例 | 处理方式 |
|---|---|---|
| 知识库查询 | "公司的请假流程是什么?" | 检索 + 生成 |
| 日常闲聊 | "你好"、"今天星期几" | 直接回答 |
如果不区分,闲聊也会走检索管线——浪费一次 RAGFlow API 调用,还可能检索出无关内容干扰回答。因此项目设计了双链架构,由意图识别决定走哪条。
架构总览
意图识别结果 / \ knowledge_base general_chat / \ 查询增强 + RAG检索 纯 LLM 对话 \ / 流式输出 (AsyncGenerator) | 最终回答
第一条链:知识库检索链
这是项目的核心链,实现了完整的 RAG 流程:
# Langchain_utils/chains.py def create_knowledge_chain(system_prompt: str = "你是一个有用的助手"): async def process_with_knowledge(question: str, history: str): # 步骤 1:查询增强 enhanced_question = enhance_query(question, history) # 步骤 2:RAGFlow 检索 rag_answer = RAGFlow_chat(enhanced_question) # 步骤 3:构建增强 Prompt prompt = system_prompt + "\n\n" if history: prompt += f"对话历史:\n{history}\n\n" prompt += f"用户问题: {question}\n\n" prompt += f"知识库检索结果:\n{rag_answer}\n\n" prompt += "请基于上述知识库信息回答用户问题,如果知识库中没有相关信息,请说明。\n\n" prompt += f"用户问题: {question}" # 步骤 4:LLM 流式生成 async for chunk in call_llm(question, prompt): yield chunk return process_with_knowledge点睛之笔:用户问题在 Prompt 中出现了两次——一次在开头提供上下文,一次在结尾驱动生成。这是经典的 Prompt 工程技巧。
第二条链:通用对话链
相比知识库链简洁得多,纯 LLM 对话:
def create_general_chain(system_prompt: str = "你是一个有用的助手"): async def process_general_chat(question: str, history: str): prompt = system_prompt + "\n\n" if history: prompt += f"对话历史:\n{history}\n\n" prompt += f"用户问题: {question}" async for chunk in call_llm(question, prompt): yield chunk return process_general_chat没有检索步骤,上下文只有对话历史。适用于日常问候、常识问答、数学计算等。
共性:call_llm 流式调用
两条链最终都调用同一个流式 LLM 函数:
async def call_llm(question: str, prompt: str) -> AsyncGenerator[str, None]: # 从环境变量加载配置 api_key = os.getenv("") model = os.getenv("") base_url = os.getenv("") # 创建流式 LLM llm = ChatOpenAI( model=model, base_url=base_url, api_key=api_key, streaming=True ) chat_prompt = ChatPromptTemplate.from_messages([ ('system', prompt), ('human', question) ]) chain = chat_prompt | llm # astream 返回异步生成器,逐 token yield async for chunk in chain.astream({}): if hasattr(chunk, 'content') and chunk.content: yield chunk.contentchain.astream({})是 LCEL 的异步流式调用,返回AsyncGenerator,每个 chunk 是一个 token。外层逐层透传,直到 API 层的 SSE。
统一处理器:串起一切
有了两条链后,需要一个"总控"来串联:记忆加载 → 意图识别 → 链选择 → 流式输出 → 记忆保存:
# Langchain_utils/processor.py def create_unified_processor(session_id: str): memory = create_memory(session_id=session_id) knowledge_chain = create_knowledge_chain(system_prompt) general_chain = create_general_chain(system_prompt) async def processor(question: str): # 1. 加载对话历史 memory_vars = memory.load_memory_variables({}) chat_history = memory_vars.get("chat_history", []) formatted_history = format_chat_history(chat_history) # 2. 意图识别 intent = detect_intent(question, formatted_history) # 3. 发送元数据 metadata = {"intent": intent, "original_query": question} yield {"type": "metadata", "data": metadata} # 4. 根据意图选择链 if intent == "knowledge_base": chain = knowledge_chain else: chain = general_chain # 5. 流式输出 full_answer = "" async for chunk in chain(question, formatted_history): full_answer += chunk yield {"type": "content", "data": chunk} # 6. 保存到记忆 memory.chat_memory.add_messages([ HumanMessage(content=question), AIMessage(content=full_answer) ]) # 7. 结束标记 yield {"type": "end", "data": {"full_answer": full_answer, "metadata": metadata}} # 附加实用方法 processor.get_stats = lambda: stats.copy() processor.clear_memory = lambda: memory.clear() return processor流式输出的统一格式是一个字典,包含三种类型:
| type | 含义 | data |
|---|---|---|
metadata | 流开始时的元信息 | 意图、原始问题 |
content | 逐 token 内容 | 文本片段 |
end | 流结束标记 | 完整回答 + 最终元数据 |
为什么要 clear_memory?
clear_memory被挂载到 processor 函数上,供 API 层调用:
# output/main_service.py def clear_session_memory(session_id: str): if session_id in _processors: processor = _processors[session_id] processor.clear_memory() # 调用附加函数
用户点击"新会话"时,前端会请求/api/memory/clear,清除 MongoDB 中的对话历史。新对话从干净的上下文开始。
小结
本文介绍了双链架构的设计思路:
知识库链:查询增强 → RAG 检索 → LLM 总结,完整的 RAG 管道
通用对话链:纯 LLM 对话,无检索开销
统一处理器:记忆 → 意图 → 链选择 → 流式输出 → 记忆保存
AsyncGenerator 全链路:从 LLM token 到 SSE 事件,零缓冲流式传输