C++实现多级时间轮定时器

    科技2022-08-05  135

    最近实现了一个多级时间轮定时器,经过了简单的测试,如果将时钟步长设置为10ms以内误差会比较大。最新代码地址TimeWheel Github。

    特性

    C++11实现可自定义时间轮层级可自定义时间轮刻度可设置时钟步长误差基本在时钟步长以内推荐时钟步长50ms

    类介绍

    代码由三个类组成:Timer,TimeWheel,TimeWheelScheduler。 Timer:定时器类,封装了定时器任务std::function<void()>。TimeWheel:时间轮类,每个时间轮都有自己的刻度,当低级的时间轮刻度转满一圈的时候,当前时间轮进行进位,即当前时间轮刻度加一。每个时间轮刻度可设置为N,在每个刻度都对应了一个双向链表来保存该时刻的定时器std::vector<std::list<TimerPtr>>(N)。TimeWheelScheduler:时间轮调度,启动一个线程按照固定的时钟步长去运行时间轮。提供创建定时器的接口CreateTimerAt,CreateTimerAfter,CreateTimerEvery。

    例子

    int main() { // Four level time wheels: Hour, Minute, Secon, Millisecond. int timer_step_ms = 50; TimeWheelScheduler tws(timer_step_ms); // Hour time wheel. 24 scales, 1 scale = 60 * 60 * 1000 ms. tws.AppendTimeWheel(24, 60 * 60 * 1000, "HourTimeWheel"); // Minute time wheel. 60 scales, 1 scale = 60 * 1000 ms. tws.AppendTimeWheel(60, 60 * 1000, "MinuteTimeWheel"); // Second time wheel. 60 scales, 1 scale = 1000 ms. tws.AppendTimeWheel(60, 1000, "SecondTimeWheel"); // Millisecond time wheel. 1000/timer_step_ms scales, 1 scale = timer_step ms. tws.AppendTimeWheel(1000 / timer_step_ms, timer_step_ms, "MillisecondTimeWheel"); tws.Start(); tws.CreateTimerAt(GetNowTimestamp() + 10000, []() { std::cout << "At now+10s" << std::endl; }); tws.CreateTimerAfter(500, []() { std::cout << "After 0.5s" << std::endl; }); std::cout << timetoStr() << std::endl; auto timer_id = tws.CreateTimerEvery(5000, []() { std::cout << "Every 5s: " << timetoStr() << std::endl; }); tws.CreateTimerEvery(30 * 1000, []() { std::cout << "Every 30s: " << timetoStr() <<std::endl; }); tws.CancelTimer(timer_id); std::this_thread::sleep_for(std::chrono::minutes(20)); tws.Stop(); return 0; }

    该例创建了一个四级的时间轮:小时级(高级时间轮),分钟级,秒级,毫秒级(低级时间轮)。

    小时级:以1小时为1个刻度,总共24小时则24个刻度。1小时等于60*60*1000毫秒。tws.AppendTimeWheel(24, 60 * 60 * 1000, "HourTimeWheel")。

    分钟级:以1分钟为1个刻度,总共60分钟则60个刻度。1分钟等于60*1000毫秒。tws.AppendTimeWheel(60, 60 * 1000, "MinuteTimeWheel");。

    秒级:以1秒为1个刻度,总共60秒则60个刻度。1秒等于1000毫秒。tws.AppendTimeWheel(60, 1000, "SecondTimeWheel");。

    毫秒级:以1个时钟步长timer_step_ms为1个单位,1秒等于1000毫秒,那么则有1000/timer_step_ms个刻度。tws.AppendTimeWheel(1000 / timer_step_ms, timer_step_ms, "MillisecondTimeWheel");。

    代码

    timer.h

    #ifndef TIMER_H_ #define TIMER_H_ #include <cstdint> #include <functional> #include <memory> typedef std::function<void()> TimerTask; class Timer { public: Timer(uint32_t id, int64_t when_ms, int64_t interval_ms, const TimerTask& task); void Run(); uint32_t id() const { return id_; } int64_t when_ms() const { return when_ms_; } bool repeated() const { return repeated_; } void UpdateWhenTime() { when_ms_ += interval_ms_; } private: uint32_t id_; TimerTask task_; int64_t when_ms_; uint32_t interval_ms_; bool repeated_; }; using TimerPtr = std::shared_ptr<Timer>; #endif // TIMER_H_

    timer.cpp

    #include "timer.h" Timer::Timer(uint32_t id, int64_t when_ms, int64_t interval_ms, const TimerTask& handler) : interval_ms_(interval_ms) , repeated_(interval_ms_ > 0) , when_ms_(when_ms) , id_(id) , task_(handler) { } void Timer::Run() { if (task_) { task_(); } }

    time_wheel.h

    #ifndef TIME_WHEEL_H_ #define TIME_WHEEL_H_ #include <chrono> #include <string> #include <memory> #include <vector> #include <list> #include "timer.h" class TimeWheel { public: TimeWheel(uint32_t scales, uint32_t scale_unit_ms, const std::string& name = ""); uint32_t scale_unit_ms() const { return scale_unit_ms_; } uint32_t scales() const { return scales_; } uint32_t current_index() const { return current_index_; } void set_less_level_tw(TimeWheel* less_level_tw) { less_level_tw_ = less_level_tw; } void set_greater_level_tw(TimeWheel* greater_level_tw) { greater_level_tw_ = greater_level_tw; } int64_t GetCurrentTime() const; void AddTimer(TimerPtr timer); void Increase(); std::list<TimerPtr> GetAndClearCurrentSlot(); private: std::string name_; uint32_t current_index_; // A time wheel can be devided into multiple scales. A scals has N ms. uint32_t scales_; uint32_t scale_unit_ms_; // Every slot corresponds to a scale. Every slot contains the timers. std::vector<std::list<TimerPtr>> slots_; TimeWheel* less_level_tw_; // Less scale unit. TimeWheel* greater_level_tw_; // Greater scale unit. }; using TimeWheelPtr = std::shared_ptr<TimeWheel>; inline int64_t GetNowTimestamp() { using namespace std::chrono; auto now = system_clock::now().time_since_epoch(); return duration_cast<milliseconds>(now).count(); } #endif // TIME_WHEEL_H_

    time_wheel.cpp

    #include "time_wheel.h" TimeWheel::TimeWheel(uint32_t scales, uint32_t scale_unit_ms, const std::string& name) : name_(name) , current_index_(0) , scales_(scales) , scale_unit_ms_(scale_unit_ms) , slots_(scales) , greater_level_tw_(nullptr) , less_level_tw_(nullptr) { } int64_t TimeWheel::GetCurrentTime() const { int64_t time = current_index_ * scale_unit_ms_; if (less_level_tw_ != nullptr) { time += less_level_tw_->GetCurrentTime(); } return time; } void TimeWheel::AddTimer(TimerPtr timer) { int64_t less_tw_time = 0; if (less_level_tw_ != nullptr) { less_tw_time = less_level_tw_->GetCurrentTime(); } int64_t diff = timer->when_ms() + less_tw_time - GetNowTimestamp(); // If the difference is greater than scale unit, the timer can be added into the current time wheel. if (diff >= scale_unit_ms_) { size_t n = (current_index_ + diff / scale_unit_ms_) % scales_; slots_[n].push_back(timer); return; } // If the difference is less than scale uint, the timer should be added into less level time wheel. if (less_level_tw_ != nullptr) { less_level_tw_->AddTimer(timer); return; } // If the current time wheel is the least level, the timer can be added into the current time wheel. slots_[current_index_].push_back(timer); } void TimeWheel::Increase() { // Increase the time wheel. ++current_index_; if (current_index_ < scales_) { return; } // If the time wheel is full, the greater level time wheel should be increased. // The timers in the current slot of the greater level time wheel should be moved into // the less level time wheel. current_index_ = current_index_ % scales_; if (greater_level_tw_ != nullptr) { greater_level_tw_->Increase(); std::list<TimerPtr> slot = std::move(greater_level_tw_->GetAndClearCurrentSlot()); for (TimerPtr timer : slot) { AddTimer(timer); } } } std::list<TimerPtr> TimeWheel::GetAndClearCurrentSlot() { std::list<TimerPtr> slot; slot = std::move(slots_[current_index_]); return slot; }

    time_wheel_scheduler.h

    #ifndef TIME_WHEEL_SCHEDULER_H_ #define TIME_WHEEL_SCHEDULER_H_ #include <mutex> #include <vector> #include <thread> #include <unordered_set> #include "time_wheel.h" class TimeWheelScheduler { public: explicit TimeWheelScheduler(uint32_t timer_step_ms = 50); // Return timer id. Return 0 if the timer creation fails. uint32_t CreateTimerAt(int64_t when_ms, const TimerTask& handler); uint32_t CreateTimerAfter(int64_t delay_ms, const TimerTask& handler); uint32_t CreateTimerEvery(int64_t interval_ms, const TimerTask& handler); void CancelTimer(uint32_t timer_id); bool Start(); void Stop(); void AppendTimeWheel(uint32_t scales, uint32_t scale_unit_ms, const std::string& name = ""); private: void Run(); TimeWheelPtr GetGreatestTimeWheel(); TimeWheelPtr GetLeastTimeWheel(); private: std::mutex mutex_; std::thread thread_; bool stop_; std::unordered_set<uint32_t> cancel_timer_ids_; uint32_t timer_step_ms_; std::vector<TimeWheelPtr> time_wheels_; }; #endif // TIME_WHEEL_SCHEDULER_H_

    time_wheel_scheduler.cpp

    #include "time_wheel_scheduler.h" static uint32_t s_inc_id = 1; TimeWheelScheduler::TimeWheelScheduler(uint32_t timer_step_ms) : timer_step_ms_(timer_step_ms) , stop_(false) { } bool TimeWheelScheduler::Start() { if (timer_step_ms_ < 50) { return false; } if (time_wheels_.empty()) { return false; } thread_ = std::thread(std::bind(&TimeWheelScheduler::Run, this)); return true; } void TimeWheelScheduler::Run() { while (true) { std::this_thread::sleep_for(std::chrono::milliseconds(timer_step_ms_)); std::lock_guard<std::mutex> lock(mutex_); if (stop_) { break; } TimeWheelPtr least_time_wheel = GetLeastTimeWheel(); least_time_wheel->Increase(); std::list<TimerPtr> slot = std::move(least_time_wheel->GetAndClearCurrentSlot()); for (const TimerPtr& timer : slot) { auto it = cancel_timer_ids_.find(timer->id()); if (it != cancel_timer_ids_.end()) { cancel_timer_ids_.erase(it); continue; } timer->Run(); if (timer->repeated()) { timer->UpdateWhenTime(); GetGreatestTimeWheel()->AddTimer(timer); } } } } void TimeWheelScheduler::Stop() { { std::lock_guard<std::mutex> lock(mutex_); stop_ = true; } thread_.join(); } TimeWheelPtr TimeWheelScheduler::GetGreatestTimeWheel() { if (time_wheels_.empty()) { return TimeWheelPtr(); } return time_wheels_.front(); } TimeWheelPtr TimeWheelScheduler::GetLeastTimeWheel() { if (time_wheels_.empty()) { return TimeWheelPtr(); } return time_wheels_.back(); } void TimeWheelScheduler::AppendTimeWheel(uint32_t scales, uint32_t scale_unit_ms, const std::string& name) { TimeWheelPtr time_wheel = std::make_shared<TimeWheel>(scales, scale_unit_ms, name); if (time_wheels_.empty()) { time_wheels_.push_back(time_wheel); return; } TimeWheelPtr greater_time_wheel = time_wheels_.back(); greater_time_wheel->set_less_level_tw(time_wheel.get()); time_wheel->set_greater_level_tw(greater_time_wheel.get()); time_wheels_.push_back(time_wheel); } uint32_t TimeWheelScheduler::CreateTimerAt(int64_t when_ms, const TimerTask& handler) { if (time_wheels_.empty()) { return 0; } std::lock_guard<std::mutex> lock(mutex_); ++s_inc_id; GetGreatestTimeWheel()->AddTimer(std::make_shared<Timer>(s_inc_id, when_ms, 0, handler)); return s_inc_id; } uint32_t TimeWheelScheduler::CreateTimerAfter(int64_t delay_ms, const TimerTask& handler) { int64_t when = GetNowTimestamp() + delay_ms; return CreateTimerAt(when, handler); } uint32_t TimeWheelScheduler::CreateTimerEvery(int64_t interval_ms, const TimerTask& handler) { if (time_wheels_.empty()) { return 0; } std::lock_guard<std::mutex> lock(mutex_); ++s_inc_id; int64_t when = GetNowTimestamp() + interval_ms; GetGreatestTimeWheel()->AddTimer(std::make_shared<Timer>(s_inc_id, when, interval_ms, handler)); return s_inc_id; } void TimeWheelScheduler::CancelTimer(uint32_t timer_id) { std::lock_guard<std::mutex> lock(mutex_); cancel_timer_ids_.insert(timer_id); }
    Processed: 0.029, SQL: 8