Reactor 是一种事件驱动的 I/O 多路复用架构(如 epoll + 非阻塞 socket + 回调分发),一般用于高并发服务端。只在 I/O 事件就绪时才执行对应回调,无事可做时阻塞在 epoll_wait,CPU 不会空转,相比轮询占用极低。同时 Reactor 解耦了事件检测和业务处理,主循环只负责分发事件,具体逻辑全部交给回调,在操作系统 I/O 层和业务层之间起到事件分发中间层的作用。
从代码实现角度来看,整体分为三部分:结构体,主循环和回调函数。本文均为个人观点,如有错误请纠正。
1.结构体:
typedefint(*RCALLBACK)(intfd);structconn{intfd;charrbuffer[BUFFER_LENGTH];intrlength;charwbuffer[BUFFER_LENGTH];intwlength;RCALLBACK send_callback;union{RCALLBACK recv_callback;RCALLBACK accept_callback;}r_action;};structconnconn_list[CONNECTION_SIZE]={0};内部分别是socket的fd,读写缓冲区,读写的长度和回调函数。
2.主循环
intinit_server(unsignedshortport){//初始化部分intsockfd=socket(AF_INET,SOCK_STREAM,0);structsockaddr_inservaddr;servaddr.sin_family=AF_INET;servaddr.sin_addr.s_addr=htons(INADDR_ANY);servaddr.sin_port=htons(port);if(-1==bind(sockfd,(structsockaddr*)&servaddr,sizeof(structsockaddr_in))){printf("bind error\n");return-1;}listen(sockfd,10);returnsockfd;}intmain(intargc,charconst*argv[]){unsignedshortport=2000;intsockfd=init_server(port);epfd=epoll_create(1);conn_list[sockfd].fd=sockfd;conn_list[sockfd].r_action.accept_callback=accept_cb;set_event(sockfd,EPOLLIN,1);while(1){structepoll_eventevents[1024]={0};intnready=epoll_wait(epfd,&events,1024,-1);inti=0;for(i=0;i<nready;i++){intconnfd=events[i].data.fd;if(events[i].events&EPOLLIN){conn_list[connfd].r_action.recv_callback(connfd);}if(events[i].events&EPOLLOUT){conn_list[connfd].send_callback(connfd);}}}return0;}初始化函数的内容是socket连接部分——创建socket,绑定,监听这三步,并返回socket连接的sockfd;
主循环中采用epoll来循环接收判断,在while循环中死等事件,事件来了就找对应的回调去处理,处理完继续等。
3.回调函数
//设置事件intset_event(intfd,intevent,intflag){if(flag){structepoll_eventev;ev.data.fd=fd;ev.events=event;epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev);}else{structepoll_eventev;ev.data.fd=fd;ev.events=event;epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev);}}//注册事件//这部分代码可以直接放在accept_cb里。我为了代码简洁就把这部分单独拿出来了intevent_register(intfd,intevent){if(fd<0)return-1;conn_list[fd].fd=fd;conn_list[fd].r_action.recv_callback=recv_cb;conn_list[fd].send_callback=send_cb;memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH);conn_list[fd].rlength=0;memset(conn_list[fd].wbuffer,0,BUFFER_LENGTH);conn_list[fd].wlength=0;set_event(fd,event,1);}//accept回调函数intaccept_cb(intfd){structsockaddr_inclient_addr;socklen_tlen=sizeof(client_addr);intclientfd=accept(fd,(structsockaddr*)&client_addr,&len);printf("accept finishd: %d\n",clientfd);if(clientfd<0)return-1;event_register(clientfd,EPOLLIN);return0;}//recv回调函数intrecv_cb(intfd){intcount=recv(fd,conn_list[fd].rbuffer,BUFFER_LENGTH,0);if(count==0){printf("client disconnection: %d\n",fd);close(fd);epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);// un finishedreturn0;}conn_list[fd].rlength=count;printf("Recv: %s\n",conn_list[fd].rbuffer);conn_list[fd].wlength=conn_list[fd].rlength=count;memcpy(conn_list[fd].wbuffer,conn_list[fd].rbuffer,conn_list[fd].wlength);set_event(fd,EPOLLOUT,0);returncount;}//semd回调函数intsend_cb(intfd){intcount=send(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);set_event(fd,EPOLLIN,0);returncount;}(1).accept_cb:
内部调用accept,完成socket连接,并注册事件。
(2).event_register
作用是把一个 fd注册到conn_list里,设置它的回调函数,清空读写缓冲器,重置读写长度,设置为可读。
这里我为了代码简洁性单独写了这个函数。它内部的代码也可以直接放在acceept_cb里,没什么区别。
(3).set_event
作用是设置事件类型。根据传进来的flag判断。非零则将传进来的fd添加进epoll监听队列中,类型是传进来的event。零则是对传进来的fd进行修改。
(4).recv_cb
内部调用recv来接收数据,并把数据及其长度写入写缓冲区和长度,便于后续调用send,并设置事件为可写使下一次循环进入send_cb。
(5).send_cb
内部调用send发送数据,并将事件置成可读便于后续接收消息
4.总结
Reactor 本质是通过主循环不断等待就绪的 fd,然后根据事件类型(可读/可写/新连接)执行对应的回调函数。下面是整体代码:
#include<sys/socket.h>#include<netinet/in.h>#include<stdio.h>#include<error.h>#include<pthread.h>#include<string.h>#include<sys/select.h>#include<poll.h>#include<sys/epoll.h>#defineBUFFER_LENGTH1024#defineCONNECTION_SIZE1048576intsend_cb(intfd);intaccept_cb(intfd,intevent);intrecv_cb(intfd);typedefint(*RCALLBACK)(intfd);intepfd=0;structconn{intfd;charrbuffer[BUFFER_LENGTH];intrlength;charwbuffer[BUFFER_LENGTH];intwlength;RCALLBACK send_callback;union{RCALLBACK recv_callback;RCALLBACK accept_callback;}r_action;};structconnconn_list[CONNECTION_SIZE]={0};intset_event(intfd,intevent,intflag){if(flag){// no 0structepoll_eventev;ev.data.fd=fd;ev.events=event;epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev);}else{structepoll_eventev;ev.data.fd=fd;ev.events=event;epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev);}}intaccept_cb(intfd){structsockaddr_inclient_addr;socklen_tlen=sizeof(client_addr);intclientfd=accept(fd,(structsockaddr*)&client_addr,&len);printf("accept finishd: %d\n",clientfd);if(clientfd<0)return-1;event_register(clientfd,EPOLLIN);return0;}intrecv_cb(intfd){intcount=recv(fd,conn_list[fd].rbuffer,BUFFER_LENGTH,0);if(count==0){printf("client disconnection: %d\n",fd);close(fd);epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);// un finishedreturn0;}conn_list[fd].rlength=count;printf("Recv: %s\n",conn_list[fd].rbuffer);conn_list[fd].wlength=conn_list[fd].rlength=count;memcpy(conn_list[fd].wbuffer,conn_list[fd].rbuffer,conn_list[fd].wlength);set_event(fd,EPOLLOUT,0);returncount;}intsend_cb(intfd){intcount=send(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);set_event(fd,EPOLLIN,0);returncount;}intinit_server(unsignedshortport){//chu shi hua serverintsockfd=socket(AF_INET,SOCK_STREAM,0);structsockaddr_inservaddr;servaddr.sin_family=AF_INET;servaddr.sin_addr.s_addr=htons(INADDR_ANY);servaddr.sin_port=htons(port);if(-1==bind(sockfd,(structsockaddr*)&servaddr,sizeof(structsockaddr_in))){printf("bind error\n");return-1;}listen(sockfd,10);returnsockfd;}intevent_register(intfd,intevent){if(fd<0)return-1;conn_list[fd].fd=fd;conn_list[fd].r_action.recv_callback=recv_cb;conn_list[fd].send_callback=send_cb;memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH);conn_list[fd].rlength=0;memset(conn_list[fd].wbuffer,0,BUFFER_LENGTH);conn_list[fd].wlength=0;set_event(fd,event,1);}intmain(intargc,charconst*argv[]){unsignedshortport=2000;intsockfd=init_server(port);epfd=epoll_create(1);conn_list[sockfd].fd=sockfd;conn_list[sockfd].r_action.accept_callback=accept_cb;set_event(sockfd,EPOLLIN,1);while(1){structepoll_eventevents[1024]={0};intnready=epoll_wait(epfd,&events,1024,-1);inti=0;for(i=0;i<nready;i++){intconnfd=events[i].data.fd;if(events[i].events&EPOLLIN){conn_list[connfd].r_action.recv_callback(connfd);}if(events[i].events&EPOLLOUT){conn_list[connfd].send_callback(connfd);}}}return0;}零声社区资源链接:https:github.com/0voice