47 auto& bucket = work_wheel_[current_work_wheel_index_];
49 std::lock_guard<std::mutex> lock(bucket.mutex());
50 auto ite = bucket.task_list().begin();
51 while (ite != bucket.task_list().end()) {
52 auto task = ite->lock();
54 ADEBUG <<
"index: " << current_work_wheel_index_
55 <<
" timer id: " << task->timer_id_;
57 reinterpret_cast<std::function<
void()
>*>(&(task->callback));
58 cyber::Async([
this, callback] {
64 ite = bucket.task_list().erase(ite);
74 const uint64_t current_work_wheel_index) {
78 auto work_wheel_index = current_work_wheel_index +
79 static_cast<uint64_t
>(std::ceil(
80 static_cast<double>(task->next_fire_duration_ms) /
81 TIMER_RESOLUTION_MS));
82 if (work_wheel_index >= WORK_WHEEL_SIZE) {
83 auto real_work_wheel_index = GetWorkWheelIndex(work_wheel_index);
84 task->remainder_interval_ms = real_work_wheel_index;
85 auto assistant_ticks = work_wheel_index / WORK_WHEEL_SIZE;
86 if (assistant_ticks == 1 &&
87 real_work_wheel_index < current_work_wheel_index_) {
88 work_wheel_[real_work_wheel_index].
AddTask(task);
89 ADEBUG <<
"add task to work wheel. index :" << real_work_wheel_index;
91 auto assistant_wheel_index = 0;
93 std::lock_guard<std::mutex> lock(current_assistant_wheel_index_mutex_);
94 assistant_wheel_index = GetAssistantWheelIndex(
95 current_assistant_wheel_index_ + assistant_ticks);
96 assistant_wheel_[assistant_wheel_index].
AddTask(task);
98 ADEBUG <<
"add task to assistant wheel. index : "
99 << assistant_wheel_index;
102 work_wheel_[work_wheel_index].
AddTask(task);
103 ADEBUG <<
"add task [" << task->timer_id_
104 <<
"] to work wheel. index :" << work_wheel_index;
109 auto& bucket = assistant_wheel_[assistant_wheel_index];
110 std::lock_guard<std::mutex> lock(bucket.mutex());
111 auto ite = bucket.task_list().begin();
112 while (ite != bucket.task_list().end()) {
113 auto task = ite->lock();
115 work_wheel_[task->remainder_interval_ms].
AddTask(task);
117 ite = bucket.task_list().erase(ite);
122 Rate rate(TIMER_RESOLUTION_MS * 1000000);
129 std::lock_guard<std::mutex> lock(current_work_wheel_index_mutex_);
130 current_work_wheel_index_ =
131 GetWorkWheelIndex(current_work_wheel_index_ + 1);
133 if (current_work_wheel_index_ == 0) {
135 std::lock_guard<std::mutex> lock(current_assistant_wheel_index_mutex_);
136 current_assistant_wheel_index_ =
137 GetAssistantWheelIndex(current_assistant_wheel_index_ + 1);
139 Cascade(current_assistant_wheel_index_);