news 2026/4/23 13:58:22

can(6) canopen python库使用

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
can(6) canopen python库使用

1.发送消息

# canopen_sender.py - CANopen 发送端(主节点) import canopen import can import time def start_canopen_sender(): """启动 CANopen 发送端,连接虚拟总线并发送消息""" # 统一的虚拟总线配置(必须与 bus_creator.py 一致) VIRTUAL_CHANNEL = "virtual_canopen_bus" BITRATE = 500000 MASTER_NODE_ID = 1 # 主节点 ID SLAVE_NODE_ID = 2 # 接收端(从节点)ID # 1. 初始化 CANopen 网络,连接虚拟总线 network = canopen.Network() network.connect( interface="virtual", channel=VIRTUAL_CHANNEL, bitrate=BITRATE ) print(f"✅ CANopen 发送端已连接虚拟总线:{VIRTUAL_CHANNEL}") master_node = network.add_node(MASTER_NODE_ID, object_dictionary=None) # 2. 发送 CANopen 心跳报文(主节点自身心跳) print("\n===== 发送主节点心跳报文 =====") heartbeat_id = 0x700 + MASTER_NODE_ID # CANopen 心跳 ID 规则:0x700 + 节点ID heartbeat_msg = can.Message( arbitration_id=heartbeat_id, data=[0x05], # 0x05=运行状态 is_extended_id=False, channel=VIRTUAL_CHANNEL ) network.bus.send(heartbeat_msg) print(f"📤 发送心跳报文:ID=0x{heartbeat_id:X},数据=0x05(运行状态)") # 3. 发送 SDO 请求(向从节点读取设备名称 0x1008:0x00) print("\n===== 发送 SDO 读取请求 =====") sdo_request_id = 0x600 + SLAVE_NODE_ID # SDO 请求 ID 规则:0x600 + 从节点ID # SDO 读取请求数据(格式:0x40 + 索引高字节 + 索引低字节 + 子索引 + 保留) sdo_request_data = b'\x40\x08\x10\x00\x00\x00\x00\x00' # 读取 0x1008:0x00 sdo_msg = can.Message( arbitration_id=sdo_request_id, data=sdo_request_data, is_extended_id=False, channel=VIRTUAL_CHANNEL ) network.bus.send(sdo_msg) print(f"📤 发送 SDO 请求:ID=0x{sdo_request_id:X},数据={sdo_request_data.hex()}") # 4. 监听是否有从节点响应(可选) print("\n===== 监听从节点响应(5秒)=====") start_time = time.time() while time.time() - start_time < 5: resp_msg = network.bus.recv(timeout=0.5) if resp_msg: print(f"📥 收到响应:ID=0x{resp_msg.arbitration_id:X},数据={resp_msg.data.hex()}") network.disconnect() if __name__ == "__main__": start_canopen_sender()

2.接收

# canopen_receiver.py - CANopen 接收端(从节点) import canopen import can import time import threading class CANopenReceiver: """CANopen 接收端,监听并响应虚拟总线消息""" def __init__(self): # 统一的虚拟总线配置(与 bus_creator.py 一致) self.VIRTUAL_CHANNEL = "virtual_canopen_bus" self.BITRATE = 500000 self.SLAVE_NODE_ID = 2 # 从节点 ID self.network: canopen.Network = None self.running = False def start(self): """启动接收端,连接虚拟总线并监听消息""" try: # 1. 连接虚拟总线 self.network = canopen.Network() self.network.connect( interface="virtual", channel=self.VIRTUAL_CHANNEL, bitrate=self.BITRATE ) print(f"✅ CANopen 接收端已连接虚拟总线:{self.VIRTUAL_CHANNEL}") self.running = True # 2. 启动监听线程 listen_thread = threading.Thread(target=self._listen_loop, daemon=True) listen_thread.start() print(f"ℹ️ 开始监听虚拟总线消息(节点 ID={self.SLAVE_NODE_ID}),按 Ctrl+C 退出") # 3. 保持脚本运行 while self.running: time.sleep(1) except KeyboardInterrupt: print("\n⚠️ 接收到退出信号,停止监听...") except Exception as e: print(f"❌ 接收端启动失败:{e}") finally: self.running = False self.network.disconnect() print("✅ 接收端已断开虚拟总线连接") def _listen_loop(self): """循环监听虚拟总线消息,并响应 SDO 请求""" while self.running: msg = self.network.bus.recv(timeout=0.1) # 非阻塞接收 if msg: self._process_message(msg) def _process_message(self, msg: can.Message): """解析并处理收到的 CANopen 消息""" arb_id = msg.arbitration_id data = msg.data.hex() print(f"\n📥 收到消息:ID=0x{arb_id:X},数据={data}") # 识别并响应 SDO 请求(0x600 + 从节点ID) if arb_id == 0x600 + self.SLAVE_NODE_ID: print(" → 收到 SDO 读取请求,准备响应...") self._respond_sdo_request(msg) # 识别心跳报文(0x700 + 节点ID) elif 0x700 <= arb_id < 0x800: node_id = arb_id - 0x700 state = msg.data[0] if len(msg.data) > 0 else 0 state_map = {0: "初始化", 5: "运行", 127: "故障"} state_desc = state_map.get(state, f"未知({state})") print(f" → 心跳报文:节点{node_id} 状态={state_desc}") def _respond_sdo_request(self, req_msg: can.Message): """响应 SDO 读取请求(模拟从节点返回设备名称)""" # SDO 响应 ID 规则:0x580 + 从节点ID sdo_resp_id = 0x580 + self.SLAVE_NODE_ID # 模拟返回设备名称 "VirtualCAN"(截断为 4 字节示例) resp_data = b'\x43\x08\x10\x00\x08\x56\x69\x72' # 响应格式 + "Vir..." resp_msg = can.Message( arbitration_id=sdo_resp_id, data=resp_data, is_extended_id=False, channel=self.VIRTUAL_CHANNEL ) self.network.bus.send(resp_msg) print(f"📤 发送 SDO 响应:ID=0x{sdo_resp_id:X},数据={resp_data.hex()}") if __name__ == "__main__": receiver = CANopenReceiver() receiver.start()

3.代码解释

3.1 interface="virtual", -- CAN 总线的通信接口类型

其他类型

  • Windows:interface='pcan'(PCAN 硬件)、interface='kvaser'(Kvaser 硬件);
  • 虚拟总线(跨平台):interface='virtual'(python-can 内置虚拟总线)。
  • interface = ‘socketcan'Linux 系统下 Python 操作 CAN 总线的专属接口类型,依赖 Linux 的 SocketCAN 协议栈,需配合 Linux 的 CAN 接口(物理 / 虚拟)使用
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/18 13:12:38

初始化大模型的不同方式

llama.cpp 「发动机」—— 纯推理库&#xff0c;命令行或 C/C API 调用&#xff0c;什么都要自己拼。 Ollama 「整车」—— 把 llama.cpp 发动机包成 Docker 式服务&#xff0c;一键拉模型、自动启停、REST/OpenAI 兼容&#xff0c;零配置就能用。 维度llama.cppOllama定位底…

作者头像 李华
网站建设 2026/4/23 13:54:26

【数学方法论】

数学里“归纳”只是证明“对所有自然数&#xff08;或更一般的良序集&#xff09;成立”的一种专用方法&#xff1b; 如果把“证明套路”放到整个数学的大工具箱里&#xff0c;可按“直接 / 间接 / 构造 / 反证 / 极限 / 组合 / 计算”等思想做一张全景图。下面给一份“尽量全、…

作者头像 李华
网站建设 2026/4/18 13:30:32

大数据领域Hadoop的云部署实践

从机房到云端&#xff1a;Hadoop集群云部署实战全指南 标题选项&#xff08;3-5个&#xff09; 《告别硬件束缚&#xff1a;Hadoop云集群搭建与高可用实践》《大数据工程师必修课&#xff1a;Hadoop云部署全流程详解》《从0到1&#xff1a;AWS/阿里云Hadoop集群搭建与性能优化…

作者头像 李华
网站建设 2026/4/23 8:16:56

Android15系统中(娱乐框架和车机框架)中对摄像头的朝向是怎么定义的

Android15系统中访问Camera设备有两条框架路径&#xff0c;即最初的CameraService这个手机、平板走的那个框架路径和Automotive车机走的那个框架路径。对于手机这种我们常用的娱乐设备来说我们不默认&#xff0c;系统相机都用过&#xff0c;我们知道相机有前后置之分&#xff0…

作者头像 李华
网站建设 2026/4/23 8:22:55

PDMS二次开发(二十四)关于1.0.6.0版本升级内容的说明

目录1.更新内容介绍2.部分功能说明2.1 材料表和螺栓表独立2.2 报表功能改为导出CSV格式2.3 全新模块CATVIEW2.4 绘制了一套元件图标1.更新内容介绍 报表功能改为导出CSV格式&#xff1b;将螺栓表与管件材料表分离为两个模块&#xff0c;为后期扩展螺栓表功能做准备&#xff1b…

作者头像 李华