news 2026/6/16 1:46:29

基于 epoll 的简易 Reactor 网络模型实现

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
基于 epoll 的简易 Reactor 网络模型实现

Reactor 是一种事件驱动的 I/O 多路复用架构(如 epoll + 非阻塞 socket + 回调分发),一般用于高并发服务端。只在 I/O 事件就绪时才执行对应回调,无事可做时阻塞在 epoll_wait,CPU 不会空转,相比轮询占用极低。同时 Reactor 解耦了事件检测和业务处理,主循环只负责分发事件,具体逻辑全部交给回调,在操作系统 I/O 层和业务层之间起到事件分发中间层的作用。
从代码实现角度来看,整体分为三部分:结构体,主循环和回调函数。本文均为个人观点,如有错误请纠正。
1.结构体:

typedefint(*RCALLBACK)(intfd);structconn{intfd;charrbuffer[BUFFER_LENGTH];intrlength;charwbuffer[BUFFER_LENGTH];intwlength;RCALLBACK send_callback;union{RCALLBACK recv_callback;RCALLBACK accept_callback;}r_action;};structconnconn_list[CONNECTION_SIZE]={0};

内部分别是socket的fd,读写缓冲区,读写的长度和回调函数。
2.主循环

intinit_server(unsignedshortport){//初始化部分intsockfd=socket(AF_INET,SOCK_STREAM,0);structsockaddr_inservaddr;servaddr.sin_family=AF_INET;servaddr.sin_addr.s_addr=htons(INADDR_ANY);servaddr.sin_port=htons(port);if(-1==bind(sockfd,(structsockaddr*)&servaddr,sizeof(structsockaddr_in))){printf("bind error\n");return-1;}listen(sockfd,10);returnsockfd;}intmain(intargc,charconst*argv[]){unsignedshortport=2000;intsockfd=init_server(port);epfd=epoll_create(1);conn_list[sockfd].fd=sockfd;conn_list[sockfd].r_action.accept_callback=accept_cb;set_event(sockfd,EPOLLIN,1);while(1){structepoll_eventevents[1024]={0};intnready=epoll_wait(epfd,&events,1024,-1);inti=0;for(i=0;i<nready;i++){intconnfd=events[i].data.fd;if(events[i].events&EPOLLIN){conn_list[connfd].r_action.recv_callback(connfd);}if(events[i].events&EPOLLOUT){conn_list[connfd].send_callback(connfd);}}}return0;}

初始化函数的内容是socket连接部分——创建socket,绑定,监听这三步,并返回socket连接的sockfd;
主循环中采用epoll来循环接收判断,在while循环中死等事件,事件来了就找对应的回调去处理,处理完继续等。
3.回调函数

//设置事件intset_event(intfd,intevent,intflag){if(flag){structepoll_eventev;ev.data.fd=fd;ev.events=event;epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev);}else{structepoll_eventev;ev.data.fd=fd;ev.events=event;epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev);}}//注册事件//这部分代码可以直接放在accept_cb里。我为了代码简洁就把这部分单独拿出来了intevent_register(intfd,intevent){if(fd<0)return-1;conn_list[fd].fd=fd;conn_list[fd].r_action.recv_callback=recv_cb;conn_list[fd].send_callback=send_cb;memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH);conn_list[fd].rlength=0;memset(conn_list[fd].wbuffer,0,BUFFER_LENGTH);conn_list[fd].wlength=0;set_event(fd,event,1);}//accept回调函数intaccept_cb(intfd){structsockaddr_inclient_addr;socklen_tlen=sizeof(client_addr);intclientfd=accept(fd,(structsockaddr*)&client_addr,&len);printf("accept finishd: %d\n",clientfd);if(clientfd<0)return-1;event_register(clientfd,EPOLLIN);return0;}//recv回调函数intrecv_cb(intfd){intcount=recv(fd,conn_list[fd].rbuffer,BUFFER_LENGTH,0);if(count==0){printf("client disconnection: %d\n",fd);close(fd);epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);// un finishedreturn0;}conn_list[fd].rlength=count;printf("Recv: %s\n",conn_list[fd].rbuffer);conn_list[fd].wlength=conn_list[fd].rlength=count;memcpy(conn_list[fd].wbuffer,conn_list[fd].rbuffer,conn_list[fd].wlength);set_event(fd,EPOLLOUT,0);returncount;}//semd回调函数intsend_cb(intfd){intcount=send(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);set_event(fd,EPOLLIN,0);returncount;}

(1).accept_cb:
内部调用accept,完成socket连接,并注册事件。
(2).event_register
作用是把一个 fd注册到conn_list里,设置它的回调函数,清空读写缓冲器,重置读写长度,设置为可读。
这里我为了代码简洁性单独写了这个函数。它内部的代码也可以直接放在acceept_cb里,没什么区别。
(3).set_event
作用是设置事件类型。根据传进来的flag判断。非零则将传进来的fd添加进epoll监听队列中,类型是传进来的event。零则是对传进来的fd进行修改。
(4).recv_cb
内部调用recv来接收数据,并把数据及其长度写入写缓冲区和长度,便于后续调用send,并设置事件为可写使下一次循环进入send_cb。
(5).send_cb
内部调用send发送数据,并将事件置成可读便于后续接收消息
4.总结
Reactor 本质是通过主循环不断等待就绪的 fd,然后根据事件类型(可读/可写/新连接)执行对应的回调函数。下面是整体代码:

#include<sys/socket.h>#include<netinet/in.h>#include<stdio.h>#include<error.h>#include<pthread.h>#include<string.h>#include<sys/select.h>#include<poll.h>#include<sys/epoll.h>#defineBUFFER_LENGTH1024#defineCONNECTION_SIZE1048576intsend_cb(intfd);intaccept_cb(intfd,intevent);intrecv_cb(intfd);typedefint(*RCALLBACK)(intfd);intepfd=0;structconn{intfd;charrbuffer[BUFFER_LENGTH];intrlength;charwbuffer[BUFFER_LENGTH];intwlength;RCALLBACK send_callback;union{RCALLBACK recv_callback;RCALLBACK accept_callback;}r_action;};structconnconn_list[CONNECTION_SIZE]={0};intset_event(intfd,intevent,intflag){if(flag){// no 0structepoll_eventev;ev.data.fd=fd;ev.events=event;epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev);}else{structepoll_eventev;ev.data.fd=fd;ev.events=event;epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev);}}intaccept_cb(intfd){structsockaddr_inclient_addr;socklen_tlen=sizeof(client_addr);intclientfd=accept(fd,(structsockaddr*)&client_addr,&len);printf("accept finishd: %d\n",clientfd);if(clientfd<0)return-1;event_register(clientfd,EPOLLIN);return0;}intrecv_cb(intfd){intcount=recv(fd,conn_list[fd].rbuffer,BUFFER_LENGTH,0);if(count==0){printf("client disconnection: %d\n",fd);close(fd);epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);// un finishedreturn0;}conn_list[fd].rlength=count;printf("Recv: %s\n",conn_list[fd].rbuffer);conn_list[fd].wlength=conn_list[fd].rlength=count;memcpy(conn_list[fd].wbuffer,conn_list[fd].rbuffer,conn_list[fd].wlength);set_event(fd,EPOLLOUT,0);returncount;}intsend_cb(intfd){intcount=send(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);set_event(fd,EPOLLIN,0);returncount;}intinit_server(unsignedshortport){//chu shi hua serverintsockfd=socket(AF_INET,SOCK_STREAM,0);structsockaddr_inservaddr;servaddr.sin_family=AF_INET;servaddr.sin_addr.s_addr=htons(INADDR_ANY);servaddr.sin_port=htons(port);if(-1==bind(sockfd,(structsockaddr*)&servaddr,sizeof(structsockaddr_in))){printf("bind error\n");return-1;}listen(sockfd,10);returnsockfd;}intevent_register(intfd,intevent){if(fd<0)return-1;conn_list[fd].fd=fd;conn_list[fd].r_action.recv_callback=recv_cb;conn_list[fd].send_callback=send_cb;memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH);conn_list[fd].rlength=0;memset(conn_list[fd].wbuffer,0,BUFFER_LENGTH);conn_list[fd].wlength=0;set_event(fd,event,1);}intmain(intargc,charconst*argv[]){unsignedshortport=2000;intsockfd=init_server(port);epfd=epoll_create(1);conn_list[sockfd].fd=sockfd;conn_list[sockfd].r_action.accept_callback=accept_cb;set_event(sockfd,EPOLLIN,1);while(1){structepoll_eventevents[1024]={0};intnready=epoll_wait(epfd,&events,1024,-1);inti=0;for(i=0;i<nready;i++){intconnfd=events[i].data.fd;if(events[i].events&EPOLLIN){conn_list[connfd].r_action.recv_callback(connfd);}if(events[i].events&EPOLLOUT){conn_list[connfd].send_callback(connfd);}}}return0;}

零声社区资源链接:https:github.com/0voice

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/16 1:41:49

MSC8251 DDR控制器错误检测与中断处理机制深度解析

1. 项目概述在嵌入式系统&#xff0c;尤其是那些运行在恶劣环境或对可靠性有严苛要求的领域&#xff08;如工业控制、通信基站、医疗设备&#xff09;&#xff0c;内存的稳定性直接决定了整个系统的生死。DDR SDRAM作为系统的主内存&#xff0c;其数据完整性是系统可靠性的基石…

作者头像 李华
网站建设 2026/6/16 1:28:44

LLM API 网关设计:限流、熔断与多模型路由的工程实践

LLM API 网关设计&#xff1a;限流、熔断与多模型路由的工程实践 一、大模型调用洪峰&#xff1a;当 API 网关成为系统咽喉 大模型服务的调用模式与传统微服务有本质区别。一次 LLM 推理请求的延迟通常在 2-30 秒&#xff0c;Token 生成期间服务端持续占用 GPU 资源。当业务侧并…

作者头像 李华
网站建设 2026/6/16 1:28:43

Spring Boot 集成 LLM:从 API 调用到生产级封装的最佳实践

Spring Boot 集成 LLM&#xff1a;从 API 调用到生产级封装的最佳实践 一、大模型集成的工程陷阱&#xff1a;远不止调一个 API Spring Boot 集成 LLM 的第一步通常是引入一个 HTTP 客户端&#xff0c;调用大模型的 Chat Completion API&#xff0c;拿到响应返回给前端。这个&q…

作者头像 李华
网站建设 2026/6/16 1:27:56

SSL证书过期致业务宕机?企业证书管理三大痛点与自动化方案

2023年某电商平台双十一期间证书过期&#xff0c;直接造成千万元级别经济损失。SSL证书普遍仅一年有效期&#xff0c;纯人工运维管理极易出现疏漏。 三大证书管理困境 困境一&#xff1a;证书资产分散 证书散落于各大云平台、数据中心、CDN节点&#xff0c;无法精准统计存量证书…

作者头像 李华