基于时间轮实现毫秒级定时器
一、为什么需要时间轮
在服务器开发中,定时器经常用于处理连接超时、心跳检测、资源清理、缓存刷新等任务
如果使用排序链表管理定时器,每次添加新定时器都要按照过期时间找到插入位置,定时器数量多时插入效率会下降
时间轮的核心思想是:
把定时器按照过期时间分散到不同槽中,用空间换时间,降低插入成本
可以把时间轮理解成一个环形数组,数组中的每个位置叫做一个槽,也就是slot
每个槽后面挂一条定时器链表,时间轮指针每隔一段时间向后移动一格,这一次移动叫做一次tick
二、时间轮中的几个核心概念
时间轮主要有两个参数:
N:时间轮中槽的数量SI:相邻两个槽之间的时间间隔,也叫槽间隔
如果时间轮有N个槽,每个槽间隔是SI,那么时间轮转一圈的时间就是N * SI
例如N = 60,SI = 1s,那么时间轮转一圈就是60s
添加一个定时器时,假设当前槽是cur_slot,定时器超时时间是timeout
需要先计算它经过多少个 tick 到期:ticks = timeout / SI
然后计算它应该放到哪个槽中:ts = (cur_slot + ticks) % N
如果定时器超时时间超过一圈,还需要记录它要等待几圈:rotation = ticks / N
所以一个定时器节点中通常要保存两个关键字段:
time_slot_:表示定时器在哪个槽中rotation_:表示还需要转多少圈才真正到期
三、时间轮实现
1 定时器节点设计
我的定时器节点是tw_timer
核心代码如下:
classtw_timer{public:tw_timer(introtation,inttime_slot):rotation_(rotation),time_slot_(time_slot),next_(nullptr),prev_(nullptr){}public:introtation_;inttime_slot_;std::function<void()>cb_func;tw_timer*next_;tw_timer*prev_;};这个节点中主要保存了:
rotation_:还需要转多少圈time_slot_:当前定时器所在的槽cb_func:定时器到期后的回调函数next_和prev_:用于组成双向链表
这里使用std::function<void()>保存回调,比普通函数指针更灵活,可以直接绑定 lambda、普通函数或者std::bind
2 时间轮结构设计
我的时间轮类中,核心成员如下:
static const int N = 100
static const int SI = 1
vector<tw_timer *> slots_
int cur_slot_
其中slots_是一个数组,每个元素都是一个定时器链表的头指针
cur_slot_表示当前时间轮指针指向哪个槽
在我的实现中,N = 100,SI = 1,表示时间轮有 100 个槽,每个槽的时间间隔是 1 个时间单位
3 添加和触发定时器
添加定时器的核心逻辑是:
tw_timer*add_timer(inttimeout){if(timeout<0){returnnullptr;}intticks;if(timeout<SI){ticks=1;}else{ticks=timeout/SI;}introtation=ticks/N;intts=(ticks+cur_slot_)%N;tw_timer*new_timer=newtw_timer(rotation,ts);if(!slots_[ts]){slots_[ts]=new_timer;}else{new_timer->next_=slots_[ts];slots_[ts]->prev_=new_timer;slots_[ts]=new_timer;}returnnew_timer;}这段代码的逻辑很清晰:
先根据
timeout和SI计算ticks再通过
rotation = ticks / N计算需要等待几圈通过
ts = (ticks + cur_slot_) % N计算应该插入哪个槽最后把定时器头插到对应槽的链表中
头插法的好处是插入效率高,添加一个定时器的复杂度接近O(1)
时间轮推进时,会调用tick()
voidtick(){tw_timer*timer=slots_[cur_slot_];while(timer){if(timer->rotation_>0){timer->rotation_--;timer=timer->next_;continue;}timer->cb_func();timer=del_timer(timer);}cur_slot_=(cur_slot_+1)%N;}每次tick()只处理当前槽上的链表
如果rotation_ > 0,说明这个定时器虽然在当前槽,但还没有真正到期,只需要执行rotation_--
如果rotation_ == 0,说明定时器到期,执行cb_func(),然后从链表中删除这个节点
四、timerfd + epoll 如何驱动时间轮
时间轮本身只是一个数据结构,它不会自己转动,需要外部周期性触发tick()
我的测试代码中使用timerfd作为时间源,并把timerfd加入epoll
核心流程是:
使用
timerfd_create创建定时器 fd使用
timerfd_settime设置周期触发时间把
timerfd加入epoll当
timerfd可读时,读取触发次数根据触发次数调用对应次数的
wheel->tick()
// 创建 timerfd,每 100ms 触发一次inttimerfd=create_timerfd(100);if(timerfd==-1){close(epollfd);return-1;}// 加入 epollepoll_addfd(epollfd,timerfd);epoll_event events[1024];while(true){intnum=epoll_wait(epollfd,events,1024,-1);if(num<0){if(errno==EINTR){continue;}perror("epoll_wait");break;}for(inti=0;i<num;++i){intsockfd=events[i].data.fd;if(sockfd==timerfd&&(events[i].events&EPOLLIN)){uint64_texpirations=0;// expirations表示从上次读取到现在,timerfd一共触发了多少次ssize_t ret=read(timerfd,&expirations,sizeof(expirations));if(ret!=sizeof(expirations)){continue;}for(uint64_tj=0;j<expirations;++j){wheel->tick();}}}}五、一个需要注意的问题:SI 和 timerfd 间隔要统一
我的时间轮中写的是SI = 1
但测试代码里创建timerfd时使用的是create_timerfd(100),也就是每100ms触发一次
这时要注意单位问题
如果我把SI = 1理解成 1 秒,那么timerfd应该每1000ms触发一次
如果timerfd每100ms调用一次tick(),那么add_timer(10)实际上不是 10 秒后触发,而是 10 个 tick 后触发,也就是大约 1 秒后触发
所以实际写项目时最好统一单位,比如:
规定
SI = 100,表示一个槽间隔是 100ms所有
timeout都用毫秒表示timerfd也按照 100ms 周期触发
这样时间含义更清楚,不容易混乱
六、关于时间轮中 N 的意义
我对N的理解是:N 类似哈希表中桶的数量
时间轮中的每个槽就像哈希表中的一个桶,定时器会根据过期时间被映射到不同的槽中
如果N太小,槽的数量少,定时器就容易集中到同一个槽上,导致某些槽后面的链表很长
这样虽然添加定时器还是接近O(1),但当时间轮指针转到这个槽时,需要遍历很长的链表,执行效率会下降