Linux网络编程 | 高性能定时器 :时间轮、时间堆

    科技2024-08-10  62

    文章目录

    时间轮时间堆


    在上一篇博客中我实现了一个基于排序链表的定时器容器,但是其存在一个缺点——随着定时器越来越多,添加定时器的效率也会越来越低。 而下面的两个高效定时器——时间轮、时间堆,会完美的解决这个问题。

    时间轮

    为了解决排序链表的缺点,时间轮运用了哈希的思想,将定时器散列到不同的链表上,这样每条链表上的定时器数目就明显的少于全部存在一条链表上的情况,此时的插入操作效率就不会收到定时器数量的影响

    在图上所示的时间轮中,指针每次会指向轮子上的一个槽,并且会顺时针移动,每次移动则会指向下一个槽。其中每次移动的时间间隔就是心搏时间si,由于时间轮有N个槽,所以转动一周的时间就是N * si。

    每一个槽都指向一个链表,每条链表之间设定的定时时间都是si的整数倍,通过利用这个关系来将定时器映射到不同的槽中。并且由于结构呈换环状,即使定时时间大于一圈N * si,也只需要让其在对应转动的圈数生效即可。

    移动的槽数 = 定时时间 / 转动间隔 插入槽 = (当前槽 + (移动的槽数 % 槽总数) % 槽总数);

    所以对于时间轮来说,要提高定时精度就需要使得转动间隔足够小。而要提高效率则要求槽数尽量的多。

    #ifndef __TIMER_WHEEL_H__ #define __TIMER_WHEEL_H__ #include<time.h> #include<stdio.h> #include<netinet/in.h> const int MAX_BUFFER_SIZE = 1024; const int SLOT_COUNT = 60; const int SLOT_INERTVAL = 1; class tw_timer; //用户数据 struct client_data { sockaddr_in addr; int sock_fd; char buff[MAX_BUFFER_SIZE]; tw_timer* timer; }; //定时器类 class tw_timer { public: tw_timer(int rot, int ts) : _rotation(rot) , _time_slot(ts) , _next(nullptr) , _prev(nullptr) {} int _rotation; //旋转的圈数 int _time_slot; //记录在哪一个槽中 void (*fun)(client_data*); //处理函数 client_data* _user_data; //用户参数 tw_timer* _next; tw_timer* _prev; }; //定时器链表,带头尾双向链表,定时器以升序排序 class timer_wheel { public: timer_wheel() : cur_slot(0) { //初始化每个槽的头节点 for(int i = 0; i < SLOT_COUNT; i++) { _slots[i] = nullptr; } } ~timer_wheel() { for(int i = 0; i < SLOT_COUNT; i++) { //删除每一个槽中的所有节点 tw_timer* cur = _slots[i]; while(cur) { tw_timer* next = cur->_next; delete cur; cur = next; } } } //防拷贝 timer_wheel(const timer_wheel&) = delete; timer_wheel& operator=(const timer_wheel&) = delete; //根据超时时间新建定时器并插入时间轮中 tw_timer* add_timer(int time_out) { //如果超时时间为负数则直接返回 if(time_out < 0) { return nullptr; } int ticks = 0; //移动多少个槽时触发 //如果超时时间小于一个时间间隔,则槽数取整为1 if(time_out < SLOT_INERTVAL) { ticks = 1; } else { //计算移动的槽数 ticks = time_out / SLOT_INERTVAL; } int rotation = ticks / SLOT_COUNT; //计算插入的定时器移动多少圈后会被触发 int time_slot = (cur_slot + (ticks % SLOT_COUNT) % SLOT_COUNT); //计算其应该插入的槽位 tw_timer* timer = new tw_timer(rotation, time_slot); //如果要插入的槽为空,则成为该槽的头节点 if(_slots[time_slot] == nullptr) { _slots[time_slot] = timer; } //否则头插进入该槽中 else { timer->_next = _slots[time_slot]; _slots[time_slot]->_prev = timer; _slots[time_slot] = timer; } return timer; } //删除指定定时器 void del_timer(tw_timer* timer) { if(timer == nullptr) { return; } int time_slot = timer->_time_slot; //如果该定时器为槽的头节点,则让下一个节点成为新的头节点 if(timer == _slots[time_slot]) { _slots[time_slot] = _slots[time_slot]->_next; if(_slots[time_slot]) { _slots[time_slot]->_prev = nullptr; } delete timer; timer = nullptr; } //此时槽为中间节点,正常的链表删除操作即可 else { timer->_prev->_next = timer->_next; if(timer->_next) { timer->_next->_prev = timer->_prev; } delete timer; timer = nullptr; } } //处理当前槽的定时事件,并使时间轮转动一个槽 void tick() { tw_timer* cur = _slots[cur_slot]; while(cur) { //如果不在本轮进行处理,则轮数减一后跳过 if(cur->_rotation > 0) { --cur->_rotation; cur = cur->_next; } //本轮需要处理的定时器,执行定时任务后将其删除 else { cur->fun(cur->_user_data); //如果删除的是头节点 if(cur == _slots[cur_slot]) { _slots[cur_slot] = cur->_next; if(_slots[cur_slot]) { _slots[cur_slot]->_prev = nullptr; } delete cur; cur = _slots[cur_slot]; } //删除的是中间节点 else { cur->_prev->_next = cur->_next; if(cur->_next) { cur->_next->_prev = cur->_prev; } tw_timer* next = cur->_next; delete cur; cur = next; } } } //本槽处理完成,时间轮转动一个槽位 cur_slot = (cur_slot + 1) % SLOT_COUNT; } private: tw_timer* _slots[SLOT_COUNT]; //时间轮的槽,每个槽的元素为一个无序定时器链表 int cur_slot; //当前指向的槽 }; #endif // !__TIMER_WHEEL_H__

    时间复杂度 添加节点:O(1) 删除节点:O(1) 执行定时任务:O(n)

    虽然执行定时任务的时间复杂度为O(n),但是当我们使用多个轮子来实现时间轮时,时间复杂度会接近于O(1)


    时间堆

    在我们前面讨论的定时器链表、时间轮都是以固定的时间间隔来调用到时检测函数tick来检测是否到期,然后执行到期定时器的回调函数,这样的容器存在一个严重的缺点,就是定时不够精确

    为了解决这个缺点,在设计定时器容器的时候可以采用另外一种思路,将所有定时器中超时时间最小的定时器的超时时间设置为时间间隔。一旦tick被调用超时时间最小的定时器必然到期,对其进行处理。接着我们再从剩余的定时器中找出超时时间最小的,继续以上逻辑。

    通过这种方法,就可以实现准确的定时。而这种数据结构,恰好和我们之前学过的相同,所以我们又将这种以最小堆实现的定时器容器称为时间堆

    关于堆的基本操作在这里就不赘述了,如果不了解的可以参考我的往期博客 数据结构与算法 | 堆

    #ifndef __TIMER_HEAP_H__ #define __TIMER_HEAP_H__ #include<time.h> #include<iostream> #include<netinet/in.h> const int MAX_BUFFER_SIZE = 1024; class heap_timer; //用户数据 struct client_data { sockaddr_in addr; int sock_fd; char buff[MAX_BUFFER_SIZE]; heap_timer* timer; }; //定时器类 class heap_timer { public: heap_timer(int delay) { _expire = time(nullptr) + delay; } time_t _expire; //到期时间 void (*fun)(client_data*); //处理函数 client_data* _user_data; //用户参数 }; //定时器链表,带头尾双向链表,定时器以升序排序 class timer_heap { public: timer_heap(int capacity = 10) throw (std::exception) : _capacity(capacity) , _size(0) { _array = new heap_timer* [_capacity]; //空间申请失败则抛出异常 if(_array == nullptr) { throw std::exception(); } //初始化数组 for(int i = 0; i < _capacity; i++) { _array[i] = nullptr; } } //使用定时器数组初始化 timer_heap(heap_timer** array, int capacity, int size) throw (std::exception) : _capacity(capacity) , _size(size) { _array = new heap_timer* [_capacity]; //容量小于大小时抛出异常 if(capacity < size) { throw std::exception(); } //空间申请失败则抛出异常 if(_array == nullptr) { throw std::exception(); } //拷贝数据 for(int i = 0; i < _size; i++) { _array[i] = array[i]; } //初始化剩余空间 for(int i = _size; i < _capacity; i++) { _array[i] = nullptr; } //从尾部开始调整堆 for(int i = (_size - 2) / 2; i >= 0; i--) { adjust_down(i); } } ~timer_heap() { for(int i = 0; i < _capacity; i++) { delete _array[i]; _array[i] = nullptr; } delete[] _array; _array = nullptr; } //防拷贝 timer_heap(const timer_heap&) = delete; timer_heap& operator=(const timer_heap&) = delete; //将定时器插入时间堆中 void push(heap_timer* timer) throw ( std::exception ) { if(timer == nullptr) { return; } //如果容量满了则扩容 if(_size == _capacity) { reserve(_capacity * 2); //申请两倍的空间 } //直接在尾部插入,然后向上调整即可 _array[_size] = timer; ++_size; adjust_up(_size - 1); } //删除指定定时器。 void del_timer(heap_timer* timer) { if(timer == nullptr) { return; } //为了保证堆的结构不被破坏,这里并不会实际将他删除,而是将执行函数清空的伪删除操作。 timer->fun = nullptr; } //获取堆顶元素 heap_timer* top() const { if(empty()) { return nullptr; } return _array[0]; } //出堆 void pop() { //交换堆顶堆尾后直接从首部向下调整即可 std::swap(_array[0], _array[_size - 1]); --_size; adjust_down(0); } //判断时间堆是否为空 bool empty() const { return _size == 0; } void reserve(int capacity) throw (std::exception) { //如果新容量没有之前的大, 则没必要扩容 if(capacity <= _capacity) { return; } //开辟新空间 heap_timer** temp = new heap_timer* [capacity]; if(temp == nullptr) { throw std::exception(); } //拷贝原数据 for(int i = 0; i < _size; i++) { temp[i] = _array[i]; } //初始化剩余空间 for(int i = _size; i < capacity; i++) { temp[i] = nullptr; } delete[] _array; //删除原空间 _array = temp; //更新新空间 _capacity = capacity; } //以堆顶为基准执行定时事件 void tick() { time_t cur_time = time(nullptr); while(empty()) { if(_array[0] == nullptr) { return; } //如果堆顶没有超时,则剩下的不可能超时 if(_array[0]->_expire > cur_time) { break; } //如果执行任务为空,则说明被伪删除,直接出堆即可 if(_array[0]->fun != nullptr) { _array[0]->fun(_array[0]->_user_data); //执行定时任务 } pop(); //处理完定时任务后出堆 } } private: //向下调整算法 void adjust_down(int root) { int parent = root; int child = root * 2 + 1; while(child < parent) { //选出子节点较小的那个 if(child + 1 < _size && _array[child] > _array[child + 1]) { ++child; } //如果父节点比子节点大则进行交换,如果不大于则说明此时处理已完毕 if(_array[parent] > _array[child]) { std::swap(_array[parent], _array[child]); } else { break; } //继续往下更新 parent = child; child = parent * 2 + 1; } } //向上调整算法 void adjust_up(int root) { int child = root; int parent = (child - 1) / 2; while(child > 0) { if(_array[parent] > _array[child]) { std::swap(_array[parent], _array[child]); } else { break; } //往上继续更新 child = parent; parent = (child - 1) / 2; } } heap_timer** _array; //数组 int _capacity; //数据容量 int _size; //当前数据个数 }; #endif // !__TIMER_HEAP_H__

    时间复杂度 添加节点:O(logN) 删除节点:O(1) 执行定时任务:O(1)

    Processed: 0.014, SQL: 8