fromconfigimportload_or_create_configfrommodelsimportMessagefromtypingimportOptional,List,Dict,AnyfromopenaiimportOpenAI,AsyncOpenAIimportasynciofromasyncioimportSemaphoreimportaiohttpimporttime# 可选的配置管理类def_build_messages(text:str,system_prompt:Optional[str]=None,history:Optional[List[Message]]=None)->List[Dict[str,Any]]:"""构建消息列表"""messages=[]ifsystem_prompt:messages.append({"role":"system","content":system_prompt})ifhistory:formsginhistory:messages.append({"role":msg.role,"content":msg.content})messages.append({"role":"user","content":text})returnmessagesclassLLMClient:def__init__(self,config_path:str="config.yaml",client_name:str="openai",history_max:int=10,system_prompt:Optional[str]=None,max_concurrent_requests:int=5,# 新增:最大并发数timeout:int=30,# 新增:超时时间):self.llm_client_list={}self.async_llm_client_list={}# 新增:异步客户端列表self.config=load_or_create_config(config_path)self.history=[]self.history_max=history_max self.client_name=client_name self.max_concurrent_requests=max_concurrent_requests self.timeout=timeout self.semaphore=Semaphore(max_concurrent_requests)# 并发控制# 初始化同步客户端ifself.client_name=="openai":self.llm_client_list["openai"]=OpenAI(api_key=self.config.models.openai.api_key,base_url=self.config.models.openai.base_url,timeout=timeout)# 新增:初始化异步客户端self.async_llm_client_list["openai"]=AsyncOpenAI(api_key=self.config.models.openai.api_key,base_url=self.config.models.openai.base_url,timeout=timeout)self.system_prompt=system_promptdefllm_call(self,text:str,model_name:str=None)->Optional[Message]:"""同步调用(保持原样)"""history=Noneifmodel_nameisNone:model_name=self.config.models.openai.model_nameifself.client_name=="openai":ifself.history_max>0andself.history:history=self.history[-self.history_max:]res_text=self.llm_openai_call(text,self.system_prompt,history=history,model_name=model_name,max_per_tokens=self.config.models.openai.max_per_tokens)self._add_to_history("user",text)self._add_to_history("assistant",res_text)returnres_textreturnNone# 新增:异步调用方法asyncdefallm_call(self,text:str,model_name:str=None,stream:bool=False)->Optional[str]:""" 异步调用LLM Args: text: 用户输入文本 model_name: 模型名称 stream: 是否使用流式响应 Returns: 模型回复内容 """ifmodel_nameisNone:model_name=self.config.models.openai.model_nameifself.client_name=="openai":# 获取历史记录history=Noneifself.history_max>0andself.history:history=self.history[-self.history_max:]asyncwithself.semaphore:# 并发控制ifstream:# 流式响应returnawaitself.allm_openai_call_stream(text,self.system_prompt,history=history,model_name=model_name,max_per_tokens=self.config.models.openai.max_per_tokens)else:# 普通响应res_text=awaitself.allm_openai_call(text,self.system_prompt,history=history,model_name=model_name,max_per_tokens=self.config.models.openai.max_per_tokens)self._add_to_history("user",text)self._add_to_history("assistant",res_text)returnres_textreturnNone# 新增:批量异步调用asyncdefbatch_allm_call(self,texts:List[str],model_name:str=None,max_workers:int=None)->List[str]:""" 批量异步调用LLM Args: texts: 文本列表 model_name: 模型名称 max_workers: 最大并发数 Returns: 回复列表 """ifmax_workers:self.semaphore=Semaphore(max_workers)ifmodel_nameisNone:model_name=self.config.models.openai.model_name tasks=[]fortextintexts:task=self.allm_call(text,model_name,stream=False)tasks.append(task)# 使用asyncio.gather并发执行try:results=awaitasyncio.gather(*tasks,return_exceptions=True)# 处理结果final_results=[]fori,resultinenumerate(results):ifisinstance(result,Exception):print(f"第{i}个请求失败:{str(result)}")final_results.append(f"请求失败:{str(result)}")else:final_results.append(result)returnfinal_resultsexceptExceptionase:raiseException(f"批量调用失败:{str(e)}")# 新增:异步OpenAI调用asyncdefallm_openai_call(self,text:str,system_prompt:Optional[str]=None,history:Optional[List[Message]]=None,model_name:str=None,max_per_tokens:int=4096,temperature:float=0.2,)->Optional[str]:"""异步OpenAI调用"""messages=_build_messages(text,system_prompt,history)try:response=awaitself.async_llm_client_list["openai"].chat.completions.create(model=model_nameorself.config.models.openai.model_name,messages=messages,stream=False,timeout=self.timeout)ifresponse.choicesandlen(response.choices)>0:assistant_reply=response.choices[0].message.contentreturnassistant_replyexceptasyncio.TimeoutError:raiseException("请求超时")exceptaiohttp.ClientErrorase:raiseException(f"网络错误:{str(e)}")exceptExceptionase:raiseException(f"调用模型时出错:{str(e)}")# 新增:流式响应异步调用asyncdefallm_openai_call_stream(self,text:str,system_prompt:Optional[str]=None,history:Optional[List[Message]]=None,model_name:str=None,max_per_tokens:int=4096,temperature:float=0.2,):"""流式响应异步调用(返回生成器)"""messages=_build_messages(text,system_prompt,history)try:response=awaitself.async_llm_client_list["openai"].chat.completions.create(model=model_nameorself.config.models.openai.model_name,messages=messages,stream=True,timeout=self.timeout)asyncforchunkinresponse:ifchunk.choicesandchunk.choices[0].delta.content:yieldchunk.choices[0].delta.contentexceptasyncio.TimeoutError:yield"【请求超时】"exceptExceptionase:yieldf"【错误:{str(e)}】"defllm_openai_call(self,text:str,system_prompt:Optional[str]=None,history:Optional[List[Message]]=None,model_name:str=None,max_per_tokens:int=4096,temperature:float=0.2,)->Optional[str]:"""同步OpenAI调用(保持原样)"""messages=_build_messages(text,system_prompt,history)try:response=self.llm_client_list["openai"].chat.completions.create(model=model_nameorself.config.models.openai.model_name,messages=messages,max_tokens=max_per_tokens,temperature=temperature,stream=False,)ifresponse.choicesandlen(response.choices)>0:assistant_reply=response.choices[0].message.contentreturnassistant_replyexceptExceptionase:error_msg=f"调用模型时出错:{str(e)}"raiseException(error_msg)def_add_to_history(self,role:str,content:str):"""添加消息到历史,自动管理长度"""self.history.append(Message(role=role,content=content))iflen(self.history)>self.history_max:to_remove=len(self.history)-self.history_max to_remove=to_remove-(to_remove%2)ifto_remove>0:self.history=self.history[to_remove:]# 新增:上下文管理器支持asyncdef__aenter__(self):returnselfasyncdef__aexit__(self,exc_type,exc_val,exc_tb):awaitself.close()# 新增:关闭异步客户端asyncdefclose(self):"""关闭异步客户端连接"""forclientinself.async_llm_client_list.values():ifhasattr(client,'close'):awaitclient.close()# 使用示例asyncdefexample_usage():# 1. 创建客户端client=LLMClient(config_path="config.yaml",history_max=10,system_prompt="你是一个有帮助的助手",max_concurrent_requests=3)# 2. 异步单次调用response=awaitclient.allm_call("介绍一下Python")print(f"Response:{response}")# 3. 批量调用questions=["什么是人工智能?","机器学习有哪些类型?","深度学习是什么?"]responses=awaitclient.batch_allm_call(questions,max_workers=2)fori,respinenumerate(responses):print(f"Q{i+1}:{questions[i][:30]}...")print(f"A{i+1}:{resp[:50]}...")print()# 运行示例if__name__=="__main__":# 运行异步示例asyncio.run(example_usage())【OpenAI标准调用和异步调用Python代码例子】
张小明
前端开发工程师
冥想第一千七百五十一天(1751)
1.周四,2026第一天,陪着家人一起吃火锅,给妈妈和媳妇买了新鞋子。 2.晚上带着桐桐转了一圈,下午跑步40分钟,感觉是连着运动,有点微累。感谢父母,感谢朋友,感谢家人,感谢不…
存储设备容量检测全攻略:让假U盘无处遁形
在数字生活日益普及的今天,U盘、SD卡等存储设备已经成为我们工作学习的必备工具。然而市面上鱼龙混杂,不少无良商家通过技术手段虚标容量,让消费者花大价钱买到的却是"缩水"产品。今天,我们就来介绍一款专业的存储设备检…
基于74HC14的光控开关设计:实战案例详解
用74HC14做个“天黑自动开灯”的光控开关:从原理到实战,一次讲透你有没有遇到过这种尴尬?楼道里的声控灯总是误触发,风一吹、猫一叫就亮;或者黄昏时分,路灯像抽搐一样忽明忽暗——明明天还没黑透࿰…
患者教育材料配音:慢性病用药指导语音化
患者教育材料配音:慢性病用药指导语音化 在社区医院的慢病管理门诊里,一位72岁的高血压患者反复询问护士:“这个‘氯吡格雷’到底怎么念?是不是饭前吃?”类似场景每天都在上演。纸质说明书上的专业术语和密密麻麻的小字…
OHIF Viewer:零足迹DICOM影像平台如何重塑现代放疗计划工作流
在精准医疗时代,放疗计划的可视化与管理已成为肿瘤治疗的关键环节。OHIF Viewer作为一款开源的零足迹DICOM医学影像查看器,通过其强大的扩展架构为放疗计划提供了前所未有的技术支持。本文将深入剖析OHIF Viewer如何通过DICOM-RT扩展包彻底改变放疗医师的…
我的Rufus使用体验:让系统安装变得简单高效
我的Rufus使用体验:让系统安装变得简单高效 【免费下载链接】rufus The Reliable USB Formatting Utility 项目地址: https://gitcode.com/GitHub_Trending/ru/rufus 作为一名经常折腾电脑的爱好者,我想和大家分享一款让我爱不释手的工具——Rufu…