news 2026/6/10 14:22:40

基于torch_npu的IPC特性介绍

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
基于torch_npu的IPC特性介绍

背景介绍

IPC(Inter-Process Communication,进程间通信)允许不同进程之间直接访问共享的设备内存,而无需进行显式的内存拷贝操作,从而显著提升通信效率。昇腾当前已基于Ascend Extension for PyTorch(昇腾NPU适配PyTorch框架的插件,也称为torch_npu)提供了IPC特性的原子能力,使开发者在分布式训练、强化学习等需要多进程大规模数据通信场景可以自主开发优化,提升数据传输性能并节省设备内存消耗。目前已经在强化学习实践中验证,通过使能IPC特性,可以省去训练进程向推理进程拷贝权重数据的过程,推理进程通过共享内存直接获取权重数据,大幅降低了推理进程的内存消耗。本文将详细介绍IPC的原理以及使用方法。

torch_npu的IPC特性解析

torch_npu的IPC设计原理

torch_npu的IPC设计整体逻辑主要包括以下两个步骤:

  1. 发送方操作
    将张量(Tensor)和存储(Storage)的内存信息封装为一个句柄(Handle),并结合存储大小(Storage_Size)、存储偏移量(Storage_Offset)等关键信息,以及用于重构的函数一并打包返回,供接收方使用;

  2. 接收方操作
    接收到这些信息后,利用重构函数和Handle等参数,恢复到原来的Tensor和Storage对象,实现跨进程的数据共享。

图1. torch_npu IPC通信流程图

以reduce_tensor为例,其具体流程如下:

图2. reduce_tensor流程示例

发送方流程:

  1. 统一的reduce化入口:无论采用multiprocessing传参、还是直接调用reduce_tensor进行reduce化操作,最终都会通过storage._share_npu_接口对内存进行reduce化处理;

  2. 内存Handle生成:storage._share_npu_的C++实现会判断虚拟内存是否启用,并调用相应的acl接口。这一过程生成一个字符串形式的内存句柄(Handle),并与storage_offset等辅助参数打包后返回;

  3. 参数传递到用户层:上层Python获取到打包后的参数后,结合rebuild函数返回完整的数据,供用户进一步使用并发送给接收方。

接收方流程:

  1. 参数获取与rebuild启动:接收方接收发送方传来的全部参数,并调用rebuild函数将参数传入,从而触发NPU内存的重建,通过storage._new_shared_npu接口实现。

  2. 建内存解析:storage._new_shared_npu的C++实现会根据虚拟内存配置的不同调用相应acl接口。此步骤会解析出内存Handle并获取到实际的内存指针。在启用虚拟内存的场景中,还需额外进行从物理内存到虚拟内存的映射操作。

  3. 返回完整storage对象:接口内部将内存指针与storage_offset等参数封装为完整的storage对象,最终交由上层Python返回,完成共享内存的传递过程。

torch_npu IPC版本约束​

1. HDK多进程支持限制

由于HCCP的多进程功能在HDK 25.0.RC1版本中才首次实现,因此若要使用该功能,必须升级至HDK 25.0.RC1或更高版本。

2. HDK多次Import支持限制

HDK 25.3.RC1版本修复了多次调用aclrtMemImportFromShareableHandle接口时无法成功导入的问题。为适配这一改进,torch_npu 7.2.0版本移除了与多次导入不兼容的内存映射机制。因此,若需使用多次导入功能,需使用HDK 25.3.RC1或更高版本。

3. CANN 白名单与 P2P 支持限制

CANN 8.3.0版本开始支持去除白名单限制和EnableP2P功能。torch_npu 7.2.0对此进行了相应适配,不再需要手动控制白名单。因此,若想使用这些功能,需使用CANN 8.3.RC1或更高版本。

4. torch_npu功能适配限制

torch_npu对HDK与CANN的相关改进进行了适配。为确保功能正常运行,建议使用torch_npu 7.2.0或更高版本。

5. 节点内共享限制

当前仅支持在同一节点内部进行内存共享,跨节点共享暂不支持。

6. 内存格式限制

torch_npu目前仅支持ND格式的内存共享。若通过IPC(进程间通信)共享Storage,必须使用ND格式,不支持私有内存格式的共享。

IPC接口列表

跨卡内存映射

在处理reduce_tensor操作时,接收方可以通过将接收到的Tensor直接映射到其他NPU设备上,实现跨设备共享。例如:

# 接收方获取参数func,list_args=list(args)# 修改目标设备号(示例映射到卡1)list_args[6]=1# 重建tensor到目标设备tensor2=func(*list_args)

rebuild_npu_tensor参数详解

rebuild_npu_tensor的作用是重新构建一个指向共享内存存储设备的Tensor。该函数支持跨设备Tensor映射和恢复,其详细定义如下:

defrebuild_npu_tensor(tensor_cls,# tensor类型 (通过type(tensor)获取)tensor_size,# tensor尺寸tensor_stride,# tensor步长tensor_offset,# tensor偏移量storage_cls,# storage类型 (通过type(storage)获取)dtype,# 数据类型storage_device,# storage设备 (修改此参数实现跨卡映射)storage_handle,# 内存handle (接收方解析获取)storage_size_bytes,# storage字节大小storage_offset_bytes,# storage字节偏移requires_grad,# 是否需要梯度ref_counter_handle,# 引用计数管理handle (禁止修改)ref_counter_offset,# 引用计数偏移量 (禁止修改)event_handle,# 事件handle (当前版本不支持)event_sync_required# 事件同步标记 (当前版本恒为False))

【注意】事件同步机制说明:(1)PTA当前不支持Event序列化传输,创建Event并序列化将触发错误。(2)发送方在创建IPC共享内存之前,会执行一次流同步。

底层共享接口说明

核心接口详解
# 发送方:创建共享内存handlestorage_handle=tensor.storage()._share_npu_()# 接收方:从handle重建storagestorage=torch.UntypedStorage._new_shared_npu(*handle_args)
_new_shared_npu参数详解
def_new_shared_npu(storage_device,# storage设备 (修改此参数实现跨卡映射)storage_handle,# 内存handle (接收方解析获取)storage_size_bytes,# storage字节大小storage_offset_bytes,# storage字节偏移ref_counter_handle,# 引用计数管理handle (禁止修改)ref_counter_offset,# 引用计数偏移量 (禁止修改)event_handle,# 事件handle (当前版本不支持)event_sync_required# 事件同步标记 (当前版本恒为False))->torch.UntypedStorage

完整示例代码

多进程之间直接传递Tensor

以下示例展示了如何在多进程中直接传递Tensor:

importtorchimporttorch_npuimporttorch.multiprocessingasmpfrommultiprocessingimportQueuedefworker(tensor):print(f"接收方收到 tensor ={tensor}")if__name__=='__main__':# 必须使用spawn方法mp.set_start_method('spawn')tensor=torch.full((5,),float(1.0),device='npu:0')print(f"发送方发送 tensor ={tensor}")# torch.npu.synchronize()p=mp.Process(target=worker,args=(tensor,))p.start()p.join()

执行结果:

通过multiprocessing.Queue传递Tensor

以下示例展示了如何利用multiprocessing.Queue进行Tensor的传输:

importtorchimporttorch_npuimporttorch.multiprocessingasmpfrommultiprocessingimportQueuedefworker(tensor_queue):tensor=tensor_queue.get()print(f"接收方收到 tensor ={tensor}")if__name__=='__main__':# 必须使用spawn方法mp.set_start_method('spawn')tensor_queue=Queue()p=mp.Process(target=worker,args=(tensor_queue,))p.start()tensor=torch.full((5,),float(1.0),device='npu:0')tensor_queue.put(tensor)print(f"发送方发送 tensor ={tensor}")p.join()

执行结果:

通过reduce_tensor传递Tensor

接收方可以通过修改Tensor的Device信息实现跨设备映射:

importtorchimporttorch_npuimporttorch.multiprocessingasmpfromtorch.multiprocessing.reductionsimportreduce_tensorfrommultiprocessingimportQueuedefworker(tensor_queue):shared_handles=tensor_queue.get()func,args=shared_handlesprint(func)list_args=list(args)# 修改目标设备为卡1list_args[6]=1tensor=func(*list_args)print(f"接收方收到 shared_handles ={shared_handles}, tensor ={tensor}")if__name__=='__main__':mp.set_start_method('spawn')tensor_queue=Queue()p=mp.Process(target=worker,args=(tensor_queue,))p.start()tensor=torch.full((5,),float(3.14),device=f"npu:0")shared_handle=reduce_tensor(tensor)tensor_queue.put(shared_handle)print(f"发送方发送 shared_handle ={shared_handle}, tensor ={tensor}")p.join()

执行结果:

通过_share_npu_传递Tensor (集合通信方式)

该方法适合在分布式框架中处理跨进程内存共享,示例如下:

importimporttorchimporttorch_npuimporttorch.distributedasdistimporttorch.multiprocessingasmpdefworker(rank,world_size):"""分布式工作进程"""# 初始化分布式环境os.environ['MASTER_ADDR']='localhost'os.environ['MASTER_PORT']='12355'torch.npu.set_device(rank)dist.init_process_group(backend="hccl",rank=rank,world_size=world_size)pid=os.getpid()# 进程0:创建共享内存ifrank==0:tensor=torch.full((5,),float(1.0),device='npu:0')# 创建共享内存handlelocal_storage={"rank":rank,"storage":tensor.storage()._share_npu_()}print(f"发送方 pid ={pid}发送 tensor ={tensor}, handle ={local_storage['storage'][1]}")else:local_storage={"rank":rank,"storage":None}# 广播共享内存handlegather_storage_list=[None]*world_size dist.all_gather_object(gather_storage_list,local_storage)# 进程1:接收并重建tensorifrank==1:tensor=torch.zeros((5,),device='npu:0')# 获取进程0发送的handlesend_storage=gather_storage_list[0]['storage']# 重建storage对象storage=torch.UntypedStorage._new_shared_npu(*send_storage)# 设置tensor存储tensor.set_(storage)print(f"接收方 pid ={pid}收到 tensor ={tensor}, handle ={send_storage[1]}")if__name__=="__main__":mp.set_start_method('spawn')world_size=2processes=[]# 启动工作进程forrankinrange(world_size):p=mp.Process(target=worker,args=(rank,world_size))p.start()processes.append(p)# 等待所有进程结束forpinprocesses:p.join()

执行结果:

相关链接

[1] Ascend Extension for PyTorch/PyTorch框架特性指南/内存资源优化/内存共享(IPC):https://www.hiascend.com/document/detail/zh/Pytorch/720/ptmoddevg/Frameworkfeatures/featuresguide_00031.html

[2] Ascend/pytorch代码仓:https://gitcode.com/ascend/pytorch/tree/master/torch_npu/csrc/ipc

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/7 12:31:33

Kotaemon能否生成API文档?Swagger自动化尝试

Kotaemon能否生成API文档?Swagger自动化尝试 在企业级AI系统日益复杂的今天,一个核心挑战浮出水面:如何让智能对话能力不仅“能说”,还能“可集成”?换句话说,当用户通过自然语言与系统交互时——比如问“…

作者头像 李华
网站建设 2026/6/10 3:10:44

图像放大就模糊?这款开源神器让你的图片无限放大不失真

图像放大就模糊?这款开源神器让你的图片无限放大不失真 【免费下载链接】vtracer Raster to Vector Graphics Converter 项目地址: https://gitcode.com/gh_mirrors/vt/vtracer 你是否曾经遇到过这样的困扰:精心设计的LOGO放大后边缘变得模糊&…

作者头像 李华
网站建设 2026/6/6 5:20:44

基于Kotaemon的RAG智能体实践:提升答案准确性的秘诀

基于Kotaemon的RAG智能体实践:提升答案准确性的秘诀 在企业级AI应用日益普及的今天,一个普遍而棘手的问题浮现出来:用户问“我今年能休几天年假?”,系统却回答“根据公司政策,您有10天假期”——可实际上HR…

作者头像 李华
网站建设 2026/6/8 15:55:09

Kotaemon能否识别建筑图纸?CAD信息提取设想

Kotaemon能否识别建筑图纸?CAD信息提取设想 在智能建造与数字孪生快速演进的今天,一个现实问题正困扰着无数工程师:如何从成百上千张CAD图纸中快速找到“三楼东侧走廊的配电箱型号”?传统方式依赖经验丰富的技术人员逐图翻阅、交叉…

作者头像 李华
网站建设 2026/6/8 9:48:12

Kotaemon是否适合非技术用户?我们测试了它的易用性

Kotaemon是否适合非技术用户?我们测试了它的易用性 在智能助手逐渐渗透企业日常运营的今天,越来越多团队希望快速搭建一个能回答内部问题、处理常见任务的AI系统。但现实是,大多数开源框架仍然停留在“开发者专属”的阶段——你需要懂Python、…

作者头像 李华