news 2026/4/23 12:53:56

LangChain 极速入门与技术实战V2

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
LangChain 极速入门与技术实战V2

LangChain 极速入门与技术实战:Python AI应用新范式

LangChain 技术架构深度解析

LangChain 的核心价值在于其模块化设计哲学清晰的架构分层。理解其技术架构是构建复杂AI应用的基础。

1. 技术架构概览

┌─────────────────────────────────────────────────────┐ │ 应用层 (Application) │ │ - 智能助手 │ 文档问答 │ 数据分析 │ 代码生成 │ ├─────────────────────────────────────────────────────┤ │ 链式层 (Chains) │ │ - LLMChain │ SequentialChain │ RouterChain │ ├─────────────────────────────────────────────────────┤ │ 组件抽象层 (Abstractions) │ │ - 提示模板 │ 记忆系统 │ 检索器 │ 输出解析器 │ ├─────────────────────────────────────────────────────┤ │ 模型层 (Models) │ │ - LLMs │ ChatModels │ Embeddings │ ├─────────────────────────────────────────────────────┤ │ 集成与工具层 (Integrations) │ │ - 向量数据库 │ API工具 │ 搜索引擎 │ 文件系统 │ └─────────────────────────────────────────────────────┘

2. 设计模式与模块化

LangChain 应用通常遵循以下设计模式:

# 完整应用架构示例:config.py""" 配置管理模块 - 集中管理所有配置项 """importosfromtypingimportDict,Anyfromdotenvimportload_dotenv load_dotenv()classConfig:"""配置管理器"""def__init__(self):self.openai_api_key=os.getenv("OPENAI_API_KEY")self.model_name=os.getenv("MODEL_NAME","gpt-4")self.temperature=float(os.getenv("TEMPERATURE","0.7"))self.vector_db_path=os.getenv("VECTOR_DB_PATH","./chroma_db")defget_llm_config(self)->Dict[str,Any]:"""获取LLM配置"""return{"model":self.model_name,"temperature":self.temperature,"api_key":self.openai_api_key}# 单例配置实例config=Config()

核心组件详解与实战代码

1. 模型抽象层:多模型统一接口

# models_manager.py""" 多模型管理器 - 支持动态切换和回退机制 """fromlangchain_openaiimportChatOpenAI,OpenAIEmbeddingsfromlangchain_anthropicimportChatAnthropicfromlangchain_google_genaiimportChatGoogleGenerativeAIfromtypingimportOptional,Union,Listimportlogging logger=logging.getLogger(__name__)classModelManager:"""统一模型管理器"""def__init__(self,config:Config):self.config=config self._models={}self._embeddings=Nonedefget_chat_model(self,provider:str="openai",**kwargs)->Union[ChatOpenAI,ChatAnthropic,ChatGoogleGenerativeAI]:"""获取聊天模型实例"""# 合并配置model_kwargs={"temperature":self.config.temperature,**kwargs}ifprovider=="openai":ifnotself.config.openai_api_key:raiseValueError("OpenAI API key is required")model=ChatOpenAI(api_key=self.config.openai_api_key,model_name=self.config.model_name,**model_kwargs)elifprovider=="anthropic":model=ChatAnthropic(**model_kwargs)elifprovider=="google":model=ChatGoogleGenerativeAI(**model_kwargs)else:raiseValueError(f"Unsupported provider:{provider}")self._models[provider]=modelreturnmodeldefget_embeddings(self)->OpenAIEmbeddings:"""获取嵌入模型"""ifself._embeddingsisNone:self._embeddings=OpenAIEmbeddings(api_key=self.config.openai_api_key)returnself._embeddingsdefget_available_models(self)->List[str]:"""获取可用模型列表"""returnlist(self._models.keys())# 使用示例if__name__=="__main__":fromconfigimportconfig manager=ModelManager(config)# 获取OpenAI模型openai_llm=manager.get_chat_model(provider="openai",temperature=0.5,max_tokens=1000)# 测试模型response=openai_llm.invoke("你好,请介绍一下自己")print(f"模型响应:{response.content}")

2. 高级提示工程与模板系统

# prompt_system.py""" 智能提示系统 - 支持动态模板和上下文感知 """fromlangchain.promptsimport(ChatPromptTemplate,SystemMessagePromptTemplate,HumanMessagePromptTemplate,MessagesPlaceholder)fromlangchain.schemaimportBaseMessage,HumanMessage,AIMessagefromtypingimportDict,List,AnyimportjsonclassPromptSystem:"""高级提示管理系统"""def__init__(self):self.templates={}self._register_default_templates()def_register_default_templates(self):"""注册默认模板"""# 1. 数据分析专家模板data_analyst_template=ChatPromptTemplate.from_messages([SystemMessagePromptTemplate.from_template("""你是一位资深数据分析专家,拥有10年行业经验。 你的任务是根据用户提供的数据和分析需求,给出专业、准确的分析结果。 分析原则: 1. 数据驱动:所有结论必须有数据支持 2. 业务导向:分析要服务于业务目标 3. 可操作性:建议必须具体可执行 当前数据上下文: {data_context} 请开始你的分析。"""),MessagesPlaceholder(variable_name="chat_history"),HumanMessagePromptTemplate.from_template("{query}")])self.templates["data_analyst"]=data_analyst_template# 2. 代码生成模板code_gen_template=ChatPromptTemplate.from_messages([SystemMessagePromptTemplate.from_template("""你是一位经验丰富的软件工程师,擅长{language}编程。 代码生成规范: 1. 遵循PEP8/{language}最佳实践 2. 包含必要的注释和文档字符串 3. 考虑错误处理和边缘情况 4. 代码要模块化、可复用 需求:{requirements} 请生成高质量的代码。"""),HumanMessagePromptTemplate.from_template("{task}")])self.templates["code_generator"]=code_gen_template# 3. 文档总结模板summary_template=ChatPromptTemplate.from_messages([SystemMessagePromptTemplate.from_template("""你是专业的文档总结助手,能够从复杂文档中提取关键信息。 总结要求: 1. 提取核心观点(3-5个) 2. 保留关键数据和事实 3. 识别行动建议 4. 字数控制在{word_limit}字以内 文档类型:{doc_type} 目标读者:{audience} 开始总结。"""),HumanMessagePromptTemplate.from_template("{content}")])self.templates["summarizer"]=summary_templatedefcreate_dynamic_prompt(self,template_name:str,**kwargs)->ChatPromptTemplate:"""创建动态提示模板"""iftemplate_namenotinself.templates:raiseValueError(f"模板不存在:{template_name}")template=self.templates[template_name]returntemplate.partial(**kwargs)ifkwargselsetemplatedefformat_with_context(self,template_name:str,query:str,context:Dict[str,Any],chat_history:List[BaseMessage]=None)->List[BaseMessage]:"""带上下文的提示格式化"""prompt=self.create_dynamic_prompt(template_name)format_kwargs={"query":query,**context}ifchat_history:format_kwargs["chat_history"]=chat_historyreturnprompt.format_messages(**format_kwargs)# 使用示例if__name__=="__main__":prompt_system=PromptSystem()# 使用数据分析模板data_context={"data_source":"销售数据库","time_range":"2024年Q1","metrics":["销售额","利润率","客户增长率"]}messages=prompt_system.format_with_context(template_name="data_analyst",query="分析季度销售趋势并提出改进建议",context={"data_context":json.dumps(data_context,ensure_ascii=False)},chat_history=[HumanMessage(content="这是我们的销售数据"),AIMessage(content="我已了解数据概况")])print("生成的提示消息:")formsginmessages:print(f"{msg.type}:{msg.content[:100]}...")

3. 高级记忆系统实现

# memory_system.py""" 智能记忆系统 - 支持短期/长期记忆和记忆检索 """fromlangchain.memoryimport(ConversationBufferMemory,ConversationSummaryMemory,VectorStoreRetrieverMemory)fromlangchain.schemaimportBaseMemoryfromtypingimportDict,Any,List,OptionalfromdatetimeimportdatetimeimportpickleimporthashlibclassAdvancedMemorySystem:"""高级记忆管理系统"""def__init__(self,short_term_memory:BaseMemory=None,long_term_memory:BaseMemory=None,max_short_term_items:int=20):# 短期记忆(最近对话)self.short_term_memory=short_term_memoryorConversationBufferMemory(return_messages=True,memory_key="chat_history",output_key="output")# 长期记忆(总结性记忆)self.long_term_memory=long_term_memoryorConversationSummaryMemory(llm=ChatOpenAI(temperature=0),memory_key="long_term_history")self.max_short_term_items=max_short_term_items self.memory_file="memory_store.pkl"self._load_memory()def_load_memory(self):"""从文件加载记忆"""try:withopen(self.memory_file,'rb')asf:saved_memory=pickle.load(f)self.short_term_memory=saved_memory.get('short_term',self.short_term_memory)self.long_term_memory=saved_memory.get('long_term',self.long_term_memory)exceptFileNotFoundError:print("未找到记忆存储文件,创建新记忆系统")defsave_memory(self):"""保存记忆到文件"""memory_data={'short_term':self.short_term_memory,'long_term':self.long_term_memory,'saved_at':datetime.now().isoformat()}withopen(self.memory_file,'wb')asf:pickle.dump(memory_data,f)defadd_interaction(self,user_input:str,ai_response:str,metadata:Dict[str,Any]=None):"""添加交互到记忆系统"""# 添加到短期记忆self.short_term_memory.save_context({"input":user_input},{"output":ai_response})# 定期总结到长期记忆ifself._should_summarize():self._update_long_term_memory()# 添加元数据ifmetadata:self._add_metadata(metadata)def_should_summarize(self)->bool:"""判断是否需要总结"""chat_history=self.short_term_memory.load_memory_variables({})messages=chat_history.get("chat_history",[])returnlen(messages)>=self.max_short_term_itemsdef_update_long_term_memory(self):"""更新长期记忆"""chat_history=self.short_term_memory.load_memory_variables({})summary_text=self._create_summary(chat_history)self.long_term_memory.save_context({"input":"历史对话总结"},{"output":summary_text})# 清空短期记忆self.short_term_memory.clear()def_create_summary(self,chat_history:Dict)->str:"""创建对话总结"""messages=chat_history.get("chat_history",[])conversation_text="\n".join([f"{msg.type}:{msg.content}"formsginmessages[-10:]# 最近10条])# 使用LLM生成总结summary_prompt=f"""请总结以下对话的核心内容和关键信息:{conversation_text}总结要求: 1. 提取主要讨论主题 2. 记录重要决定和结论 3. 识别待办事项 4. 字数不超过200字"""returnsummary_prompt# 实际应用中应调用LLM生成总结def_add_metadata(self,metadata:Dict[str,Any]):"""添加元数据"""metadata_file="metadata.json"try:importjsonwithopen(metadata_file,'r')asf:existing_metadata=json.load(f)exceptFileNotFoundError:existing_metadata=[]existing_metadata.append({**metadata,"timestamp":datetime.now().isoformat()})withopen(metadata_file,'w')asf:json.dump(existing_metadata,f,indent=2)defget_memory_context(self)->Dict[str,Any]:"""获取记忆上下文"""short_term=self.short_term_memory.load_memory_variables({})long_term=self.long_term_memory.load_memory_variables({})return{"short_term_memory":short_term,"long_term_memory":long_term,"memory_summary":self.get_memory_summary()}defget_memory_summary(self)->str:"""获取记忆摘要"""short_term=self.short_term_memory.load_memory_variables({})messages=short_term.get("chat_history",[])returnf"对话记忆:{len(messages)}条消息"defsearch_memory(self,query:str)->List[Dict[str,Any]]:"""在记忆中搜索相关内容"""all_memory=self.get_memory_context()results=[]# 简单关键词搜索(实际应用可用向量搜索)forkey,valueinall_memory.items():ifisinstance(value,str)andquery.lower()invalue.lower():results.append({"source":key,"content":value[:200]+"..."iflen(value)>200elsevalue})returnresults# 使用示例if__name__=="__main__":memory_system=AdvancedMemorySystem()# 模拟对话conversations=[("你好,我叫张三","你好张三!很高兴认识你。"),("我来自北京","北京是个美丽的城市!"),("我喜欢编程","编程是个很有用的技能!"),("我想学习Python","Python是很好的入门语言。")]foruser_input,ai_responseinconversations:memory_system.add_interaction(user_input=user_input,ai_response=ai_response,metadata={"topic":"自我介绍"})# 获取记忆上下文context=memory_system.get_memory_context()print("记忆上下文:",context["memory_summary"])# 搜索记忆results=memory_system.search_memory("编程")print("搜索结果:",results)# 保存记忆memory_system.save_memory()

4. 智能代理系统架构

# agent_system.py""" 智能代理系统 - 支持工具调用和任务分解 """fromlangchain.agentsimport(AgentExecutor,create_openai_functions_agent,create_react_agent)fromlangchain.toolsimportBaseTool,toolfromlangchain.promptsimportChatPromptTemplate,MessagesPlaceholderfromtypingimportList,Dict,Any,Optionalimportasynciofromconcurrent.futuresimportThreadPoolExecutorimportjsonclassToolRegistry:"""工具注册器"""def__init__(self):self.tools={}self._register_builtin_tools()def_register_builtin_tools(self):"""注册内置工具"""@tooldefcalculator(expression:str)->str:"""计算数学表达式,支持加减乘除和常用函数"""try:# 安全地评估表达式allowed_names={'abs':abs,'round':round,'min':min,'max':max,'sum':sum,'len':len}# 使用ast安全解析importast node=ast.parse(expression,mode='eval')def_eval(node):ifisinstance(node,ast.Num):returnnode.nelifisinstance(node,ast.BinOp):left=_eval(node.left)right=_eval(node.right)ifisinstance(node.op,ast.Add):returnleft+rightelifisinstance(node.op,ast.Sub):returnleft-rightelifisinstance(node.op,ast.Mult):returnleft*rightelifisinstance(node.op,ast.Div):returnleft/rightelifisinstance(node,ast.Name):ifnode.idinallowed_names:returnallowed_names[node.id]elifisinstance(node,ast.Call):func=_eval(node.func)args=[_eval(arg)forarginnode.args]returnfunc(*args)raiseValueError(f"不支持的表达式:{node}")result=_eval(node.body)returnstr(result)exceptExceptionase:returnf"计算错误:{str(e)}"self.register_tool("calculator",calculator)@tooldefweb_search(query:str)->str:"""搜索网络信息"""# 实际应用中集成搜索引擎APIreturnf"网络搜索结果(模拟):{query}"self.register_tool("web_search",web_search)@tooldefdata_analyzer(data_json:str,analysis_type:str)->str:"""分析数据"""try:data=json.loads(data_json)ifanalysis_type=="statistics":stats={"count":len(data),"keys":list(data[0].keys())ifdataelse[]}returnjson.dumps(stats,ensure_ascii=False)else:return"分析类型暂不支持"exceptExceptionase:returnf"数据分析错误:{str(e)}"self.register_tool("data_analyzer",data_analyzer)defregister_tool(self,name:str,tool_func:callable):"""注册新工具"""self.tools[name]=tool_funcdefget_tool(self,name:str)->Optional[BaseTool]:"""获取工具"""returnself.tools.get(name)defget_all_tools(self)->List[BaseTool]:"""获取所有工具"""returnlist(self.tools.values())classIntelligentAgent:"""智能代理"""def__init__(self,llm,tool_registry:ToolRegistry,agent_type:str="react"):self.llm=llm self.tool_registry=tool_registry self.agent_type=agent_type# 创建代理ifagent_type=="react":self.agent=self._create_react_agent()elifagent_type=="openai_functions":self.agent=self._create_openai_functions_agent()else:raiseValueError(f"不支持的代理类型:{agent_type}")# 创建执行器self.executor=AgentExecutor(agent=self.agent,tools=tool_registry.get_all_tools(),verbose=True,handle_parsing_errors=True,max_iterations=5)def_create_react_agent(self):"""创建ReAct代理"""prompt=ChatPromptTemplate.from_messages([("system","""你是一个智能助手,可以调用工具解决问题。 请遵循以下步骤: 1. 理解问题 2. 思考需要哪些工具 3. 调用工具 4. 分析结果 5. 给出最终答案 可用的工具: {tools} 开始处理问题。"""),MessagesPlaceholder(variable_name="chat_history"),("user","{input}"),MessagesPlaceholder(variable_name="agent_scratchpad")])returncreate_react_agent(llm=self.llm,tools=self.tool_registry.get_all_tools(),prompt=prompt)def_create_openai_functions_agent(self):"""创建OpenAI函数代理"""prompt=ChatPromptTemplate.from_messages([("system","你是一个有帮助的助手"),MessagesPlaceholder(variable_name="chat_history"),("user","{input}"),MessagesPlaceholder(variable_name="agent_scratchpad")])returncreate_openai_functions_agent(llm=self.llm,prompt=prompt,tools=self.tool_registry.get_all_tools())asyncdefainvoke(self,input_text:str,**kwargs)->Dict[str,Any]:"""异步调用代理"""try:result=awaitself.executor.ainvoke({"input":input_text,"chat_history":kwargs.get("chat_history",[])})return{"success":True,"output":result.get("output",""),"intermediate_steps":result.get("intermediate_steps",[]),"execution_time":result.get("execution_time",0)}exceptExceptionase:return{"success":False,"error":str(e),"output":""}definvoke(self,input_text:str,**kwargs)->Dict[str,Any]:"""同步调用代理"""try:result=self.executor.invoke({"input":input_text,"chat_history":kwargs.get("chat_history",[])})return{"success":True,"output":result.get("output",""),"intermediate_steps":result.get("intermediate_steps",[])}exceptExceptionase:return{"success":False,"error":str(e),"output":""}defbatch_process(self,inputs:List[str])->List[Dict[str,Any]]:"""批量处理"""withThreadPoolExecutor(max_workers=5)asexecutor:futures=[executor.submit(self.invoke,input_text)forinput_textininputs]return[future.result()forfutureinfutures]# 使用示例if__name__=="__main__":fromconfigimportconfigfrommodels_managerimportModelManager# 初始化model_manager=ModelManager(config)llm=model_manager.get_chat_model("openai")# 创建工具注册器和代理tool_registry=ToolRegistry()# 注册自定义工具@tooldefweather_checker(city:str)->str:"""查询城市天气"""# 模拟天气数据weather_data={"北京":"晴,15-25°C","上海":"多云,18-28°C","广州":"雨,22-30°C"}returnweather_data.get(city,"城市天气信息暂不可用")tool_registry.register_tool("weather_checker",weather_checker)# 创建智能代理agent=IntelligentAgent(llm=llm,tool_registry=tool_registry,agent_type="react")# 测试代理test_queries=["计算 (15 + 23) * 2 的结果","查询北京的天气","分析这个数据 [{'name': 'Alice', 'age': 25}, {'name': 'Bob', 'age': 30}]"]forqueryintest_queries:print(f"\n查询:{query}")result=agent.invoke(query)ifresult["success"]:print(f"结果:{result['output']}")ifresult.get("intermediate_steps"):print(f"中间步骤:{len(result['intermediate_steps'])}步")else:print(f"错误:{result['error']}")

5. 企业级RAG系统实现

# rag_system.py""" 企业级RAG系统 - 检索增强生成 """fromtypingimportList,Dict,Any,Optionalfromlangchain.document_loadersimport(TextLoader,PDFLoader,WebBaseLoader,DirectoryLoader)fromlangchain.text_splitterimport(RecursiveCharacterTextSplitter,CharacterTextSplitter)fromlangchain.embeddingsimportOpenAIEmbeddingsfromlangchain.vectorstoresimport(Chroma,FAISS,Pinecone)fromlangchain.retrieversimport(MultiVectorRetriever,ContextualCompressionRetriever,EnsembleRetriever)fromlangchain.retrievers.document_compressorsimport(LLMChainExtractor,EmbeddingsFilter)fromlangchain.chainsimportRetrievalQAfromlangchain.schemaimportDocumentimportosimportpickleclassEnterpriseRAGSystem:"""企业级RAG系统"""def__init__(self,embedding_model=None,llm=None,vector_store_type:str="chroma",persist_directory:str="./vector_store"):self.embedding_model=embedding_modelorOpenAIEmbeddings()self.llm=llm self.vector_store_type=vector_store_type self.persist_directory=persist_directory self.vector_store=Noneself.retriever=Noneself.qa_chain=Nonedefload_documents(self,source_path:str,source_type:str="directory",**kwargs)->List[Document]:"""加载文档"""loader_map={"directory":DirectoryLoader,"pdf":PDFLoader,"text":TextLoader,"web":WebBaseLoader}ifsource_typenotinloader_map:raiseValueError(f"不支持的文档类型:{source_type}")loader_class=loader_map[source_type]ifsource_type=="directory":loader=loader_class(source_path,glob="**/*.*",loader_kwargs=kwargs.get("loader_kwargs",{}))else:loader=loader_class(source_path,**kwargs)returnloader.load()defprocess_documents(self,documents:List[Document],chunk_size:int=1000,chunk_overlap:int=200)->List[Document]:"""处理文档"""# 文本分割text_splitter=RecursiveCharacterTextSplitter(chunk_size=chunk_size,chunk_overlap=chunk_overlap,length_function=len,separators=["\n\n","\n","。","!","?",";",","," "])chunks=text_splitter.split_documents(documents)# 添加元数据fori,chunkinenumerate(chunks):chunk.metadata.update({"chunk_id":i,"total_chunks":len(chunks),"source_hash":hash(chunk.page_content[:100])})returnchunksdefcreate_vector_store(self,documents:List[Document]):"""创建向量存储"""ifself.vector_store_type=="chroma":self.vector_store=Chroma.from_documents(documents=documents,embedding=self.embedding_model,persist_directory=self.persist_directory)self.vector_store.persist()elifself.vector_store_type=="faiss":self.vector_store=FAISS.from_documents(documents=documents,embedding=self.embedding_model)# 保存FAISS索引self.vector_store.save_local(self.persist_directory)else:raiseValueError(f"不支持的向量存储类型:{self.vector_store_type}")print(f"向量存储创建成功,包含{len(documents)}个文档块")defcreate_advanced_retriever(self,search_type:str="similarity",k:int=4,score_threshold:float=0.5):"""创建高级检索器"""ifnotself.vector_store:raiseValueError("请先创建向量存储")# 基础检索器base_retriever=self.vector_store.as_retriever(search_type=search_type,search_kwargs={"k":k*2,# 获取更多结果用于压缩"score_threshold":score_threshold})# 上下文压缩检索器compressor=LLMChainExtractor.from_llm(self.llm)compression_retriever=ContextualCompressionRetriever(base_compressor=compressor,base_retriever=base_retriever)# 嵌入过滤embeddings_filter=EmbeddingsFilter(embeddings=self.embedding_model,similarity_threshold=score_threshold)# 组合检索器self.retriever=compression_retrieverreturnself.retrieverdefcreate_qa_chain(self,chain_type:str="stuff"):"""创建问答链"""ifnotself.retriever:raiseValueError("请先创建检索器")self.qa_chain=RetrievalQA.from_chain_type(llm=self.llm,chain_type=chain_type,retriever=self.retriever,return_source_documents=True,verbose=True)returnself.qa_chaindefquery(self,question:str,**kwargs)->Dict[str,Any]:"""查询"""ifnotself.qa_chain:raiseValueError("请先创建问答链")try:result=self.qa_chain({"query":question})return{"success":True,"answer":result["result"],"source_documents":[{"content":doc.page_content[:200]+"...","metadata":doc.metadata}fordocinresult.get("source_documents",[])],"source_count":len(result.get("source_documents",[]))}exceptExceptionase:return{"success":False,"error":str(e),"answer":""}defsave_system(self,filepath:str="rag_system.pkl"):"""保存系统状态"""state={"vector_store_type":self.vector_store_type,"persist_directory":self.persist_directory,"config":{"embedding_model":type(self.embedding_model).__name__,"llm":type(self.llm).__name__ifself.llmelseNone}}withopen(filepath,'wb')asf:pickle.dump(state,f)print(f"系统状态已保存到{filepath}")@classmethoddefload_system(cls,filepath:str="rag_system.pkl",embedding_model=None,llm=None):"""加载系统"""withopen(filepath,'rb')asf:state=pickle.load(f)system=cls(embedding_model=embedding_model,llm=llm,vector_store_type=state["vector_store_type"],persist_directory=state["persist_directory"])# 加载向量存储ifstate["vector_store_type"]=="chroma":system.vector_store=Chroma(persist_directory=state["persist_directory"],embedding_function=embedding_model)system.retriever=system.vector_store.as_retriever()elifstate["vector_store_type"]=="faiss":system.vector_store=FAISS.load_local(state["persist_directory"],embedding_model,allow_dangerous_deserialization=True)system.retriever=system.vector_store.as_retriever()# 重新创建QA链ifllm:system.create_qa_chain()returnsystem# 使用示例if__name__=="__main__":fromconfigimportconfigfrommodels_managerimportModelManager# 初始化模型model_manager=ModelManager(config)llm=model_manager.get_chat_model("openai",temperature=0.1)embeddings=model_manager.get_embeddings()# 创建RAG系统rag_system=EnterpriseRAGSystem(embedding_model=embeddings,llm=llm,vector_store_type="chroma",persist_directory="./data/vector_store")# 1. 加载文档print("加载文档...")documents=rag_system.load_documents(source_path="./data/documents",source_type="directory")print(f"加载了{len(documents)}个文档")# 2. 处理文档print("处理文档...")processed_docs=rag_system.process_documents(documents,chunk_size=800,chunk_overlap=100)print(f"分割为{len(processed_docs)}个文档块")# 3. 创建向量存储print("创建向量存储...")rag_system.create_vector_store(processed_docs)# 4. 创建检索器print("创建检索器...")retriever=rag_system.create_advanced_retriever(search_type="similarity_score_threshold",k=3,score_threshold=0.7)# 5. 创建QA链print("创建QA链...")qa_chain=rag_system.create_qa_chain(chain_type="map_reduce")# 6. 测试查询test_questions=["文档的主要内容是什么?","有哪些关键的技术点?","作者提出了什么建议?"]forquestionintest_questions:print(f"\n问题:{question}")result=rag_system.query(question)ifresult["success"]:print(f"回答:{result['answer'][:200]}...")print(f"参考文档:{result['source_count']}个")else:print(f"错误:{result['error']}")# 7. 保存系统rag_system.save_system("./data/rag_system_state.pkl")

高级应用:完整AI应用架构

# ai_application.py""" 完整的AI应用架构示例 """importasynciofromtypingimportDict,Any,List,Optionalfromdatetimeimportdatetimeimportloggingfromdataclassesimportdataclass,asdictimportjson# 配置日志logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger=logging.getLogger(__name__)@dataclassclassAIConfig:"""AI应用配置"""model_provider:str="openai"model_name:str="gpt-4"temperature:float=0.7max_tokens:int=2000enable_memory:bool=Trueenable_tools:bool=Truecache_responses:bool=Truedefto_dict(self)->Dict[str,Any]:returnasdict(self)classAIApplication:"""完整的AI应用"""def__init__(self,config:AIConfig):self.config=config self.components={}self._initialize_components()def_initialize_components(self):"""初始化所有组件"""logger.info("初始化AI应用组件...")# 1. 初始化模型管理器frommodels_managerimportModelManagerfromconfigimportConfigasAppConfig app_config=AppConfig()self.components['model_manager']=ModelManager(app_config)# 2. 初始化LLMself.components['llm']=self.components['model_manager'].get_chat_model(provider=self.config.model_provider,temperature=self.config.temperature,max_tokens=self.config.max_tokens)# 3. 初始化记忆系统ifself.config.enable_memory:frommemory_systemimportAdvancedMemorySystem self.components['memory']=AdvancedMemorySystem()# 4. 初始化代理系统ifself.config.enable_tools:fromagent_systemimportToolRegistry,IntelligentAgent tool_registry=ToolRegistry()self.components['agent']=IntelligentAgent(llm=self.components['llm'],tool_registry=tool_registry,agent_type="react")# 5. 初始化缓存ifself.config.cache_responses:self.components['cache']=ResponseCache()logger.info("AI应用初始化完成")asyncdefprocess_request(self,user_input:str,user_id:Optional[str]=None,context:Optional[Dict[str,Any]]=None)->Dict[str,Any]:"""处理用户请求"""start_time=datetime.now()request_id=f"req_{datetime.now().strftime('%Y%m%d_%H%M%S')}"logger.info(f"处理请求{request_id}:{user_input[:50]}...")try:# 检查缓存ifself.config.cache_responses:cached_response=self.components['cache'].get(user_input)ifcached_response:logger.info(f"请求{request_id}命中缓存")return{"request_id":request_id,"response":cached_response,"from_cache":True,"processing_time":0}# 获取记忆上下文memory_context={}ifself.config.enable_memoryanduser_id:memory_context=self.components['memory'].get_memory_context()# 使用代理处理ifself.config.enable_tools:result=awaitself.components['agent'].ainvoke(user_input,chat_history=memory_context.get("chat_history",[]))response=result["output"]tool_used=len(result.get("intermediate_steps",[]))>0else:# 直接使用LLMresponse_obj=awaitself.components['llm'].ainvoke(user_input)response=response_obj.content tool_used=False# 更新记忆ifself.config.enable_memoryanduser_id:self.components['memory'].add_interaction(user_input=user_input,ai_response=response,metadata={"user_id":user_id,"request_id":request_id,"used_tools":tool_used})# 保存到缓存ifself.config.cache_responses:self.components['cache'].set(user_input,response)processing_time=(datetime.now()-start_time).total_seconds()return{"request_id":request_id,"response":response,"from_cache":False,"tool_used":tool_used,"processing_time":processing_time,"timestamp":datetime.now().isoformat()}exceptExceptionase:logger.error(f"处理请求{request_id}时出错:{str(e)}")return{"request_id":request_id,"error":str(e),"response":"抱歉,处理您的请求时出现了错误。","success":False,"timestamp":datetime.now().isoformat()}defbatch_process(self,requests:List[Dict[str,Any]])->List[Dict[str,Any]]:"""批量处理请求"""asyncdefprocess_all():tasks=[]forreqinrequests:task=self.process_request(user_input=req.get("input",""),user_id=req.get("user_id"),context=req.get("context",{}))tasks.append(task)returnawaitasyncio.gather(*tasks)returnasyncio.run(process_all())defget_system_status(self)->Dict[str,Any]:"""获取系统状态"""status={"config":self.config.to_dict(),"components":{name:type(comp).__name__forname,compinself.components.items()},"memory_stats":{}}ifself.config.enable_memory:memory=self.components.get('memory')ifmemory:status["memory_stats"]=memory.get_memory_summary()returnstatusclassResponseCache:"""响应缓存"""def__init__(self,max_size:int=1000,ttl:int=3600):self.cache={}self.max_size=max_size self.ttl=ttl# 生存时间(秒)self.access_times={}defget(self,key:str)->Optional[str]:"""获取缓存"""ifkeyinself.cache:# 检查是否过期last_access=self.access_times.get(key)iflast_accessand(datetime.now()-last_access).seconds>self.ttl:self.delete(key)returnNoneself.access_times[key]=datetime.now()returnself.cache[key]returnNonedefset(self,key:str,value:str):"""设置缓存"""iflen(self.cache)>=self.max_size:# 删除最久未使用的oldest_key=min(self.access_times,key=self.access_times.get)self.delete(oldest_key)self.cache[key]=value self.access_times[key]=datetime.now()defdelete(self,key:str):"""删除缓存"""ifkeyinself.cache:delself.cache[key]ifkeyinself.access_times:delself.access_times[key]defclear(self):"""清空缓存"""self.cache.clear()self.access_times.clear()defstats(self)->Dict[str,Any]:"""缓存统计"""return{"size":len(self.cache),"max_size":self.max_size,"ttl":self.ttl}# 使用示例asyncdefmain():"""主函数示例"""# 创建配置config=AIConfig(model_provider="openai",model_name="gpt-4",temperature=0.7,enable_memory=True,enable_tools=True,cache_responses=True)# 创建AI应用app=AIApplication(config)# 获取系统状态status=app.get_system_status()print("系统状态:")print(json.dumps(status,indent=2,ensure_ascii=False))# 处理单个请求print("\n处理单个请求...")result=awaitapp.process_request(user_input="计算一下 (45 + 78) * 2 除以 3 的结果",user_id="user_123")print(f"响应:{result.get('response','')}")print(f"是否使用工具:{result.get('tool_used',False)}")print(f"处理时间:{result.get('processing_time',0):.2f}秒")# 批量处理print("\n批量处理请求...")batch_requests=[{"input":"你好,介绍一下Python语言","user_id":"user_123"},{"input":"今天北京天气怎么样?","user_id":"user_456"},{"input":"解释一下机器学习","user_id":"user_789"}]batch_results=app.batch_process(batch_requests)fori,resultinenumerate(batch_results):print(f"\n请求{i+1}:")print(f"输入:{batch_requests[i]['input']}")print(f"响应:{result.get('response','')[:100]}...")print(f"是否缓存:{result.get('from_cache',False)}")if__name__=="__main__":asyncio.run(main())

部署与监控

# deployment.py""" 生产环境部署与监控 """fromfastapiimportFastAPI,HTTPException,DependsfrompydanticimportBaseModel,FieldfromtypingimportList,Optionalimportuvicornfromdatetimeimportdatetimeimportloggingimportasynciofromcontextlibimportasynccontextmanagerimportprometheus_clientfromprometheus_fastapi_instrumentatorimportInstrumentator# 请求/响应模型classChatRequest(BaseModel):message:str=Field(...,min_length=1,max_length=1000)user_id:Optional[str]=Nonesession_id:Optional[str]=Nonecontext:Optional[dict]={}classChatResponse(BaseModel):message_id:strresponse:strprocessing_time:floatmodel_used:strtimestamp:str# 应用生命周期管理@asynccontextmanagerasyncdeflifespan(app:FastAPI):"""应用生命周期管理"""# 启动时初始化print("启动AI应用...")# 初始化AI应用fromai_applicationimportAIApplication,AIConfig config=AIConfig(model_provider="openai",model_name="gpt-4",temperature=0.7)app.state.ai_app=AIApplication(config)app.state.request_counter=prometheus_client.Counter('chat_requests_total','Total chat requests',['status'])yield# 关闭时清理print("关闭AI应用...")# 清理资源# 创建FastAPI应用app=FastAPI(title="AI Chat API",description="基于LangChain的智能对话API",version="1.0.0",lifespan=lifespan)# 添加监控Instrumentator().instrument(app).expose(app)# 中间件@app.middleware("http")asyncdefadd_process_time_header(request,call_next):start_time=datetime.now()response=awaitcall_next(request)processing_time=(datetime.now()-start_time).total_seconds()response.headers["X-Processing-Time"]=str(processing_time)returnresponse# 路由@app.post("/chat",response_model=ChatResponse)asyncdefchat(request:ChatRequest):"""聊天接口"""try:# 记录请求app.state.request_counter.labels(status="received").inc()# 处理请求result=awaitapp.state.ai_app.process_request(user_input=request.message,user_id=request.user_id,context=request.contextor{})ifresult.get("success",True):app.state.request_counter.labels(status="success").inc()returnChatResponse(message_id=result["request_id"],response=result["response"],processing_time=result.get("processing_time",0),model_used=app.state.ai_app.config.model_name,timestamp=result.get("timestamp",datetime.now().isoformat()))else:app.state.request_counter.labels(status="error").inc()raiseHTTPException(status_code=500,detail=result.get("error","处理失败"))exceptExceptionase:app.state.request_counter.labels(status="error").inc()raiseHTTPException(status_code=500,detail=str(e))@app.get("/health")asyncdefhealth_check():"""健康检查"""return{"status":"healthy","timestamp":datetime.now().isoformat(),"service":"AI Chat API"}@app.get("/metrics")asyncdefmetrics():"""Prometheus指标"""returnprometheus_client.generate_latest()@app.get("/system/status")asyncdefsystem_status():"""系统状态"""ifhasattr(app.state,'ai_app'):returnapp.state.ai_app.get_system_status()return{"status":"initializing"}# 批量处理@app.post("/batch_chat")asyncdefbatch_chat(requests:List[ChatRequest]):"""批量聊天接口"""try:request_data=[{"input":req.message,"user_id":req.user_id,"context":req.contextor{}}forreqinrequests]results=app.state.ai_app.batch_process(request_data)return[{"message_id":result.get("request_id",f"batch_{i}"),"response":result.get("response",""),"success":result.get("success",True)}fori,resultinenumerate(results)]exceptExceptionase:raiseHTTPException(status_code=500,detail=str(e))if__name__=="__main__":uvicorn.run("deployment:app",host="0.0.0.0",port=8000,reload=True,log_level="info")

项目结构与最佳实践

ai-application/ ├── README.md ├── requirements.txt ├── .env.example ├── config/ │ ├── __init__.py │ └── settings.py ├── src/ │ ├── core/ │ │ ├── __init__.py │ │ ├── models.py │ │ ├── prompts.py │ │ ├── memory.py │ │ └── agents.py │ ├── services/ │ │ ├── __init__.py │ │ ├── chat_service.py │ │ └── rag_service.py │ └── api/ │ ├── __init__.py │ ├── routes.py │ └── middleware.py ├── tests/ │ ├── __init__.py │ ├── test_models.py │ └── test_services.py ├── scripts/ │ ├── setup.py │ └── deploy.sh └── docker/ ├── Dockerfile └── docker-compose.yml

最佳实践总结

  1. 模块化设计:每个组件职责单一,便于测试和维护
  2. 配置管理:使用环境变量和配置文件,避免硬编码
  3. 错误处理:完善的异常捕获和错误恢复机制
  4. 性能监控:集成监控和日志系统,便于问题排查
  5. 缓存策略:合理使用缓存提高响应速度
  6. 安全考虑:输入验证、API密钥管理、沙箱执行
  7. 版本控制:API版本管理,向后兼容

总结

LangChain 提供了一个强大而灵活的框架,使得构建复杂的AI应用变得简单高效。通过本文提供的完整示例和最佳实践,你可以快速搭建起符合企业级标准的AI应用系统。关键点包括:

  1. 架构清晰:分层设计,组件解耦
  2. 代码完整:提供可直接运行的完整示例
  3. 生产就绪:包含部署、监控、缓存等生产环境特性
  4. 扩展性强:易于添加新功能和集成新工具

随着LangChain生态的不断发展,开发者可以更专注于业务逻辑的实现,而无需担心底层复杂性,真正实现AI应用的快速开发和部署。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/22 18:43:55

4-bit量化革命:Nunchaku FLUX.1让消费级GPU实现专业级AI绘图

4-bit量化革命:Nunchaku FLUX.1让消费级GPU实现专业级AI绘图 【免费下载链接】nunchaku-flux.1-krea-dev 项目地址: https://ai.gitcode.com/hf_mirrors/nunchaku-tech/nunchaku-flux.1-krea-dev 导语 Nunchaku团队推出基于SVDQuant技术的4-bit量化版FLUX.…

作者头像 李华
网站建设 2026/4/23 9:44:28

Wan2.2-T2V-A14B能否生成带有实时数据驱动的动态信息视频?

Wan2.2-T2V-A14B能否生成带有实时数据驱动的动态信息视频? 在新闻直播间里,一条突发财经消息刚从交易所系统弹出——某科技股瞬间拉升7%。不到两分钟,一段由AI生成的播报视频已自动推送到各大平台:女主播神情专注地讲解走势&#…

作者头像 李华
网站建设 2026/4/23 10:45:05

Wan2.2-T2V-A14B在零售门店陈列变化演示视频中的空间感知能力

Wan2.2-T2V-A14B在零售门店陈列变化演示视频中的空间感知能力 在一家连锁便利店总部的会议室里,区域经理刚提交了一份关于中秋促销陈列调整的方案——主通道增设月饼堆头、背景板更换为金色主题、灯光调暖、安排试吃活动。不到五分钟,一段逼真的动态视频…

作者头像 李华
网站建设 2026/4/23 10:44:59

Wan2.2-T2V-A14B模型镜像一键部署教程(Docker版)

Wan2.2-T2V-A14B模型镜像一键部署教程(Docker版) 在AI内容生成的浪潮中,文本到视频(Text-to-Video, T2V)技术正从实验室走向真实业务场景。过去制作一段几秒钟的动画可能需要专业团队数小时甚至数天的工作量&#xff0…

作者头像 李华
网站建设 2026/4/23 12:49:14

当符号学会说话:得意黑字体设计的诗意革命

在数字时代的视觉洪流中,字体早已超越了单纯的文字载体,成为设计师手中的魔法棒。而得意黑(Smiley Sans)作为一款在人文温度与几何理性间游走的中文黑体,其最大的魅力并非来自那些方正端庄的汉字,而是隐藏在…

作者头像 李华
网站建设 2026/4/22 18:59:53

《深入 Celery:用 Python 构建高可用任务队列的实战指南》

《深入 Celery:用 Python 构建高可用任务队列的实战指南》 一、引言:为什么我们需要任务队列? 在现代 Web 应用、数据处理、自动化系统中,我们经常会遇到这样的场景: 用户上传图片后需要异步压缩和存储;…

作者头像 李华