文章目录
- 核心方案:使用 zmq_poller
- 1. 创建 poller
- 2. 添加 socket 到 poller
- 3. 等待事件
- 4. 处理事件
- 完整示例
- 监听多个 SUB socket
- 高级用法
- 1. 动态管理 socket
- 2. 非阻塞模式
- 3. 超时设置
- 最佳实践
- 适用场景
- 总结
当需要连接多个 socket 并同时监听消息时,使用 ZeroMQ 的
zmq_poller是最佳选择。核心方案:使用 zmq_poller
1. 创建 poller
void*poller=zmq_poller_new();2. 添加 socket 到 poller
// 添加多个 socketzmq_poller_add(poller,socket1,NULL);zmq_poller_add(poller,socket2,NULL);// 可以添加更多 socket...3. 等待事件
zmq_pollitem_t items[2];// 也可以动态分配items[0].socket=socket1;items[0].fd=0;items[0].events=ZMQ_POLLIN;items[1].socket=socket2;items[1].fd=0;items[1].events=ZMQ_POLLIN;// 等待事件,timeout 为 -1 表示无限等待intrc=zmq_poll(items,2,-1);if(rc==-1){// 处理错误}4. 处理事件
// 检查哪个 socket 有事件if(items[0].revents&ZMQ_POLLIN){// socket1 有消息charbuffer[1024];zmq_recv(socket1,buffer,sizeof(buffer),0);printf("Received from socket1: %s\n",buffer);}if(items[1].revents&ZMQ_POLLIN){// socket2 有消息charbuffer[1024];zmq_recv(socket2,buffer,sizeof(buffer),0);printf("Received from socket2: %s\n",buffer);}完整示例
监听多个 SUB socket
#include<zmq.h>#include<stdio.h>intmain(){void*ctx=zmq_ctx_new();// 创建多个 SUB socketvoid*sub1=zmq_socket(ctx,ZMQ_SUB);zmq_connect(sub1,"tcp://localhost:5555");zmq_setsockopt(sub1,ZMQ_SUBSCRIBE,"",0);// 订阅所有消息void*sub2=zmq_socket(ctx,ZMQ_SUB);zmq_connect(sub2,"tcp://localhost:5556");zmq_setsockopt(sub2,ZMQ_SUBSCRIBE,"",0);// 准备 poll itemszmq_pollitem_t items[]={{sub1,0,ZMQ_POLLIN,0},{sub2,0,ZMQ_POLLIN,0}};while(1){intrc=zmq_poll(items,2,-1);if(rc==-1)break;// 中断if(items[0].revents&ZMQ_POLLIN){charbuffer[1024];zmq_recv(sub1,buffer,sizeof(buffer),0);printf("From publisher 1: %s\n",buffer);}if(items[1].revents&ZMQ_POLLIN){charbuffer[1024];zmq_recv(sub2,buffer,sizeof(buffer),0);printf("From publisher 2: %s\n",buffer);}}zmq_close(sub1);zmq_close(sub2);zmq_ctx_term(ctx);return0;}高级用法
1. 动态管理 socket
// 动态添加 socketvoidadd_socket_to_poller(void*poller,void*socket){zmq_poller_add(poller,socket,NULL);}// 移除 socketvoidremove_socket_from_poller(void*poller,void*socket){zmq_poller_remove(poller,socket);}2. 非阻塞模式
// 非阻塞,立即返回intrc=zmq_poll(items,2,0);if(rc==0){// 无事件,继续其他工作}3. 超时设置
// 1000ms 超时intrc=zmq_poll(items,2,1000);if(rc==0){// 超时,做其他事情}最佳实践
- 使用 zmq_poller:单线程处理多 socket,避免线程开销
- 合理设置超时:根据应用需求调整
- 错误处理:检查所有 ZeroMQ 函数的返回值
- 资源管理:正确关闭所有 socket 和 poller
- 事件驱动:基于事件处理,提高响应速度
适用场景
- 多订阅:同时订阅多个 publisher
- 多客户端:服务器同时处理多个客户端连接
- 混合模式:同时处理不同类型的 socket(如 SUB + REP)
- 资源监控:同时监听多个数据源
总结
连接多个 socket 并监听消息的最佳方案:
- 使用
zmq_poller管理多个 socket - 一次等待多个 socket 的事件
- 根据事件来源处理不同 socket 的消息
- 单线程处理,避免线程复杂性
这种方式比多线程更高效,代码更简洁,是 ZeroMQ 推荐的多 socket 管理方式!