最近实现了一个多级时间轮定时器,经过了简单的测试,如果将时钟步长设置为10ms以内误差会比较大。最新代码地址TimeWheel Github。
该例创建了一个四级的时间轮:小时级(高级时间轮),分钟级,秒级,毫秒级(低级时间轮)。
小时级:以1小时为1个刻度,总共24小时则24个刻度。1小时等于60*60*1000毫秒。tws.AppendTimeWheel(24, 60 * 60 * 1000, "HourTimeWheel")。
分钟级:以1分钟为1个刻度,总共60分钟则60个刻度。1分钟等于60*1000毫秒。tws.AppendTimeWheel(60, 60 * 1000, "MinuteTimeWheel");。
秒级:以1秒为1个刻度,总共60秒则60个刻度。1秒等于1000毫秒。tws.AppendTimeWheel(60, 1000, "SecondTimeWheel");。
毫秒级:以1个时钟步长timer_step_ms为1个单位,1秒等于1000毫秒,那么则有1000/timer_step_ms个刻度。tws.AppendTimeWheel(1000 / timer_step_ms, timer_step_ms, "MillisecondTimeWheel");。
timer.h
#ifndef TIMER_H_ #define TIMER_H_ #include <cstdint> #include <functional> #include <memory> typedef std::function<void()> TimerTask; class Timer { public: Timer(uint32_t id, int64_t when_ms, int64_t interval_ms, const TimerTask& task); void Run(); uint32_t id() const { return id_; } int64_t when_ms() const { return when_ms_; } bool repeated() const { return repeated_; } void UpdateWhenTime() { when_ms_ += interval_ms_; } private: uint32_t id_; TimerTask task_; int64_t when_ms_; uint32_t interval_ms_; bool repeated_; }; using TimerPtr = std::shared_ptr<Timer>; #endif // TIMER_H_timer.cpp
#include "timer.h" Timer::Timer(uint32_t id, int64_t when_ms, int64_t interval_ms, const TimerTask& handler) : interval_ms_(interval_ms) , repeated_(interval_ms_ > 0) , when_ms_(when_ms) , id_(id) , task_(handler) { } void Timer::Run() { if (task_) { task_(); } }time_wheel.h
#ifndef TIME_WHEEL_H_ #define TIME_WHEEL_H_ #include <chrono> #include <string> #include <memory> #include <vector> #include <list> #include "timer.h" class TimeWheel { public: TimeWheel(uint32_t scales, uint32_t scale_unit_ms, const std::string& name = ""); uint32_t scale_unit_ms() const { return scale_unit_ms_; } uint32_t scales() const { return scales_; } uint32_t current_index() const { return current_index_; } void set_less_level_tw(TimeWheel* less_level_tw) { less_level_tw_ = less_level_tw; } void set_greater_level_tw(TimeWheel* greater_level_tw) { greater_level_tw_ = greater_level_tw; } int64_t GetCurrentTime() const; void AddTimer(TimerPtr timer); void Increase(); std::list<TimerPtr> GetAndClearCurrentSlot(); private: std::string name_; uint32_t current_index_; // A time wheel can be devided into multiple scales. A scals has N ms. uint32_t scales_; uint32_t scale_unit_ms_; // Every slot corresponds to a scale. Every slot contains the timers. std::vector<std::list<TimerPtr>> slots_; TimeWheel* less_level_tw_; // Less scale unit. TimeWheel* greater_level_tw_; // Greater scale unit. }; using TimeWheelPtr = std::shared_ptr<TimeWheel>; inline int64_t GetNowTimestamp() { using namespace std::chrono; auto now = system_clock::now().time_since_epoch(); return duration_cast<milliseconds>(now).count(); } #endif // TIME_WHEEL_H_time_wheel.cpp
#include "time_wheel.h" TimeWheel::TimeWheel(uint32_t scales, uint32_t scale_unit_ms, const std::string& name) : name_(name) , current_index_(0) , scales_(scales) , scale_unit_ms_(scale_unit_ms) , slots_(scales) , greater_level_tw_(nullptr) , less_level_tw_(nullptr) { } int64_t TimeWheel::GetCurrentTime() const { int64_t time = current_index_ * scale_unit_ms_; if (less_level_tw_ != nullptr) { time += less_level_tw_->GetCurrentTime(); } return time; } void TimeWheel::AddTimer(TimerPtr timer) { int64_t less_tw_time = 0; if (less_level_tw_ != nullptr) { less_tw_time = less_level_tw_->GetCurrentTime(); } int64_t diff = timer->when_ms() + less_tw_time - GetNowTimestamp(); // If the difference is greater than scale unit, the timer can be added into the current time wheel. if (diff >= scale_unit_ms_) { size_t n = (current_index_ + diff / scale_unit_ms_) % scales_; slots_[n].push_back(timer); return; } // If the difference is less than scale uint, the timer should be added into less level time wheel. if (less_level_tw_ != nullptr) { less_level_tw_->AddTimer(timer); return; } // If the current time wheel is the least level, the timer can be added into the current time wheel. slots_[current_index_].push_back(timer); } void TimeWheel::Increase() { // Increase the time wheel. ++current_index_; if (current_index_ < scales_) { return; } // If the time wheel is full, the greater level time wheel should be increased. // The timers in the current slot of the greater level time wheel should be moved into // the less level time wheel. current_index_ = current_index_ % scales_; if (greater_level_tw_ != nullptr) { greater_level_tw_->Increase(); std::list<TimerPtr> slot = std::move(greater_level_tw_->GetAndClearCurrentSlot()); for (TimerPtr timer : slot) { AddTimer(timer); } } } std::list<TimerPtr> TimeWheel::GetAndClearCurrentSlot() { std::list<TimerPtr> slot; slot = std::move(slots_[current_index_]); return slot; }time_wheel_scheduler.h
#ifndef TIME_WHEEL_SCHEDULER_H_ #define TIME_WHEEL_SCHEDULER_H_ #include <mutex> #include <vector> #include <thread> #include <unordered_set> #include "time_wheel.h" class TimeWheelScheduler { public: explicit TimeWheelScheduler(uint32_t timer_step_ms = 50); // Return timer id. Return 0 if the timer creation fails. uint32_t CreateTimerAt(int64_t when_ms, const TimerTask& handler); uint32_t CreateTimerAfter(int64_t delay_ms, const TimerTask& handler); uint32_t CreateTimerEvery(int64_t interval_ms, const TimerTask& handler); void CancelTimer(uint32_t timer_id); bool Start(); void Stop(); void AppendTimeWheel(uint32_t scales, uint32_t scale_unit_ms, const std::string& name = ""); private: void Run(); TimeWheelPtr GetGreatestTimeWheel(); TimeWheelPtr GetLeastTimeWheel(); private: std::mutex mutex_; std::thread thread_; bool stop_; std::unordered_set<uint32_t> cancel_timer_ids_; uint32_t timer_step_ms_; std::vector<TimeWheelPtr> time_wheels_; }; #endif // TIME_WHEEL_SCHEDULER_H_time_wheel_scheduler.cpp
#include "time_wheel_scheduler.h" static uint32_t s_inc_id = 1; TimeWheelScheduler::TimeWheelScheduler(uint32_t timer_step_ms) : timer_step_ms_(timer_step_ms) , stop_(false) { } bool TimeWheelScheduler::Start() { if (timer_step_ms_ < 50) { return false; } if (time_wheels_.empty()) { return false; } thread_ = std::thread(std::bind(&TimeWheelScheduler::Run, this)); return true; } void TimeWheelScheduler::Run() { while (true) { std::this_thread::sleep_for(std::chrono::milliseconds(timer_step_ms_)); std::lock_guard<std::mutex> lock(mutex_); if (stop_) { break; } TimeWheelPtr least_time_wheel = GetLeastTimeWheel(); least_time_wheel->Increase(); std::list<TimerPtr> slot = std::move(least_time_wheel->GetAndClearCurrentSlot()); for (const TimerPtr& timer : slot) { auto it = cancel_timer_ids_.find(timer->id()); if (it != cancel_timer_ids_.end()) { cancel_timer_ids_.erase(it); continue; } timer->Run(); if (timer->repeated()) { timer->UpdateWhenTime(); GetGreatestTimeWheel()->AddTimer(timer); } } } } void TimeWheelScheduler::Stop() { { std::lock_guard<std::mutex> lock(mutex_); stop_ = true; } thread_.join(); } TimeWheelPtr TimeWheelScheduler::GetGreatestTimeWheel() { if (time_wheels_.empty()) { return TimeWheelPtr(); } return time_wheels_.front(); } TimeWheelPtr TimeWheelScheduler::GetLeastTimeWheel() { if (time_wheels_.empty()) { return TimeWheelPtr(); } return time_wheels_.back(); } void TimeWheelScheduler::AppendTimeWheel(uint32_t scales, uint32_t scale_unit_ms, const std::string& name) { TimeWheelPtr time_wheel = std::make_shared<TimeWheel>(scales, scale_unit_ms, name); if (time_wheels_.empty()) { time_wheels_.push_back(time_wheel); return; } TimeWheelPtr greater_time_wheel = time_wheels_.back(); greater_time_wheel->set_less_level_tw(time_wheel.get()); time_wheel->set_greater_level_tw(greater_time_wheel.get()); time_wheels_.push_back(time_wheel); } uint32_t TimeWheelScheduler::CreateTimerAt(int64_t when_ms, const TimerTask& handler) { if (time_wheels_.empty()) { return 0; } std::lock_guard<std::mutex> lock(mutex_); ++s_inc_id; GetGreatestTimeWheel()->AddTimer(std::make_shared<Timer>(s_inc_id, when_ms, 0, handler)); return s_inc_id; } uint32_t TimeWheelScheduler::CreateTimerAfter(int64_t delay_ms, const TimerTask& handler) { int64_t when = GetNowTimestamp() + delay_ms; return CreateTimerAt(when, handler); } uint32_t TimeWheelScheduler::CreateTimerEvery(int64_t interval_ms, const TimerTask& handler) { if (time_wheels_.empty()) { return 0; } std::lock_guard<std::mutex> lock(mutex_); ++s_inc_id; int64_t when = GetNowTimestamp() + interval_ms; GetGreatestTimeWheel()->AddTimer(std::make_shared<Timer>(s_inc_id, when, interval_ms, handler)); return s_inc_id; } void TimeWheelScheduler::CancelTimer(uint32_t timer_id) { std::lock_guard<std::mutex> lock(mutex_); cancel_timer_ids_.insert(timer_id); }