Apollo 10.0
自动驾驶开放平台
timing_wheel.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2019 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <cmath>
20
21#include "cyber/task/task.h"
22
23namespace apollo {
24namespace cyber {
25
27 std::lock_guard<std::mutex> lock(running_mutex_);
28 if (!running_) {
29 ADEBUG << "TimeWheel start ok";
30 running_ = true;
31 tick_thread_ = std::thread([this]() { this->TickFunc(); });
32 scheduler::Instance()->SetInnerThreadAttr("timer", &tick_thread_);
33 }
34}
35
37 std::lock_guard<std::mutex> lock(running_mutex_);
38 if (running_) {
39 running_ = false;
40 if (tick_thread_.joinable()) {
41 tick_thread_.join();
42 }
43 }
44}
45
47 auto& bucket = work_wheel_[current_work_wheel_index_];
48 {
49 std::lock_guard<std::mutex> lock(bucket.mutex());
50 auto ite = bucket.task_list().begin();
51 while (ite != bucket.task_list().end()) {
52 auto task = ite->lock();
53 if (task) {
54 ADEBUG << "index: " << current_work_wheel_index_
55 << " timer id: " << task->timer_id_;
56 auto* callback =
57 reinterpret_cast<std::function<void()>*>(&(task->callback));
58 cyber::Async([this, callback] {
59 if (this->running_) {
60 (*callback)();
61 }
62 });
63 }
64 ite = bucket.task_list().erase(ite);
65 }
66 }
67}
68
69void TimingWheel::AddTask(const std::shared_ptr<TimerTask>& task) {
70 AddTask(task, current_work_wheel_index_);
71}
72
73void TimingWheel::AddTask(const std::shared_ptr<TimerTask>& task,
74 const uint64_t current_work_wheel_index) {
75 if (!running_) {
76 Start();
77 }
78 auto work_wheel_index = current_work_wheel_index +
79 static_cast<uint64_t>(std::ceil(
80 static_cast<double>(task->next_fire_duration_ms) /
81 TIMER_RESOLUTION_MS));
82 if (work_wheel_index >= WORK_WHEEL_SIZE) {
83 auto real_work_wheel_index = GetWorkWheelIndex(work_wheel_index);
84 task->remainder_interval_ms = real_work_wheel_index;
85 auto assistant_ticks = work_wheel_index / WORK_WHEEL_SIZE;
86 if (assistant_ticks == 1 &&
87 real_work_wheel_index < current_work_wheel_index_) {
88 work_wheel_[real_work_wheel_index].AddTask(task);
89 ADEBUG << "add task to work wheel. index :" << real_work_wheel_index;
90 } else {
91 auto assistant_wheel_index = 0;
92 {
93 std::lock_guard<std::mutex> lock(current_assistant_wheel_index_mutex_);
94 assistant_wheel_index = GetAssistantWheelIndex(
95 current_assistant_wheel_index_ + assistant_ticks);
96 assistant_wheel_[assistant_wheel_index].AddTask(task);
97 }
98 ADEBUG << "add task to assistant wheel. index : "
99 << assistant_wheel_index;
100 }
101 } else {
102 work_wheel_[work_wheel_index].AddTask(task);
103 ADEBUG << "add task [" << task->timer_id_
104 << "] to work wheel. index :" << work_wheel_index;
105 }
106}
107
108void TimingWheel::Cascade(const uint64_t assistant_wheel_index) {
109 auto& bucket = assistant_wheel_[assistant_wheel_index];
110 std::lock_guard<std::mutex> lock(bucket.mutex());
111 auto ite = bucket.task_list().begin();
112 while (ite != bucket.task_list().end()) {
113 auto task = ite->lock();
114 if (task) {
115 work_wheel_[task->remainder_interval_ms].AddTask(task);
116 }
117 ite = bucket.task_list().erase(ite);
118 }
119}
120
122 Rate rate(TIMER_RESOLUTION_MS * 1000000); // ms to ns
123 while (running_) {
124 Tick();
125 // AINFO_EVERY(1000) << "Tick " << TickCount();
126 tick_count_++;
127 rate.Sleep();
128 {
129 std::lock_guard<std::mutex> lock(current_work_wheel_index_mutex_);
130 current_work_wheel_index_ =
131 GetWorkWheelIndex(current_work_wheel_index_ + 1);
132 }
133 if (current_work_wheel_index_ == 0) {
134 {
135 std::lock_guard<std::mutex> lock(current_assistant_wheel_index_mutex_);
136 current_assistant_wheel_index_ =
137 GetAssistantWheelIndex(current_assistant_wheel_index_ + 1);
138 }
139 Cascade(current_assistant_wheel_index_);
140 }
141 }
142}
143
144TimingWheel::TimingWheel() {}
145
146} // namespace cyber
147} // namespace apollo
void AddTask(const std::shared_ptr< TimerTask > &task)
void AddTask(const std::shared_ptr< TimerTask > &task)
void Cascade(const uint64_t assistant_wheel_index)
void SetInnerThreadAttr(const std::string &name, std::thread *thr)
Definition scheduler.cc:90
#define ADEBUG
Definition log.h:41
class register implement
Definition arena_queue.h:37