Apollo 10.0
自动驾驶开放平台
apollo::cyber::TimingWheel类 参考

#include <timing_wheel.h>

apollo::cyber::TimingWheel 的协作图:

Public 成员函数

 ~TimingWheel ()
 
void Start ()
 
void Shutdown ()
 
void Tick ()
 
void AddTask (const std::shared_ptr< TimerTask > &task)
 
void AddTask (const std::shared_ptr< TimerTask > &task, const uint64_t current_work_wheel_index)
 
void Cascade (const uint64_t assistant_wheel_index)
 
void TickFunc ()
 
uint64_t TickCount () const
 

详细描述

在文件 timing_wheel.h42 行定义.

构造及析构函数说明

◆ ~TimingWheel()

apollo::cyber::TimingWheel::~TimingWheel ( )
inline

在文件 timing_wheel.h44 行定义.

44 {
45 if (running_) {
46 Shutdown();
47 }
48 }

成员函数说明

◆ AddTask() [1/2]

void apollo::cyber::TimingWheel::AddTask ( const std::shared_ptr< TimerTask > &  task)

在文件 timing_wheel.cc69 行定义.

69 {
70 AddTask(task, current_work_wheel_index_);
71}
void AddTask(const std::shared_ptr< TimerTask > &task)

◆ AddTask() [2/2]

void apollo::cyber::TimingWheel::AddTask ( const std::shared_ptr< TimerTask > &  task,
const uint64_t  current_work_wheel_index 
)

在文件 timing_wheel.cc73 行定义.

74 {
75 if (!running_) {
76 Start();
77 }
78 auto work_wheel_index = current_work_wheel_index +
79 static_cast<uint64_t>(std::ceil(
80 static_cast<double>(task->next_fire_duration_ms) /
81 TIMER_RESOLUTION_MS));
82 if (work_wheel_index >= WORK_WHEEL_SIZE) {
83 auto real_work_wheel_index = GetWorkWheelIndex(work_wheel_index);
84 task->remainder_interval_ms = real_work_wheel_index;
85 auto assistant_ticks = work_wheel_index / WORK_WHEEL_SIZE;
86 if (assistant_ticks == 1 &&
87 real_work_wheel_index < current_work_wheel_index_) {
88 work_wheel_[real_work_wheel_index].AddTask(task);
89 ADEBUG << "add task to work wheel. index :" << real_work_wheel_index;
90 } else {
91 auto assistant_wheel_index = 0;
92 {
93 std::lock_guard<std::mutex> lock(current_assistant_wheel_index_mutex_);
94 assistant_wheel_index = GetAssistantWheelIndex(
95 current_assistant_wheel_index_ + assistant_ticks);
96 assistant_wheel_[assistant_wheel_index].AddTask(task);
97 }
98 ADEBUG << "add task to assistant wheel. index : "
99 << assistant_wheel_index;
100 }
101 } else {
102 work_wheel_[work_wheel_index].AddTask(task);
103 ADEBUG << "add task [" << task->timer_id_
104 << "] to work wheel. index :" << work_wheel_index;
105 }
106}
void AddTask(const std::shared_ptr< TimerTask > &task)
#define ADEBUG
Definition log.h:41

◆ Cascade()

void apollo::cyber::TimingWheel::Cascade ( const uint64_t  assistant_wheel_index)

在文件 timing_wheel.cc108 行定义.

108 {
109 auto& bucket = assistant_wheel_[assistant_wheel_index];
110 std::lock_guard<std::mutex> lock(bucket.mutex());
111 auto ite = bucket.task_list().begin();
112 while (ite != bucket.task_list().end()) {
113 auto task = ite->lock();
114 if (task) {
115 work_wheel_[task->remainder_interval_ms].AddTask(task);
116 }
117 ite = bucket.task_list().erase(ite);
118 }
119}

◆ Shutdown()

void apollo::cyber::TimingWheel::Shutdown ( )

在文件 timing_wheel.cc36 行定义.

36 {
37 std::lock_guard<std::mutex> lock(running_mutex_);
38 if (running_) {
39 running_ = false;
40 if (tick_thread_.joinable()) {
41 tick_thread_.join();
42 }
43 }
44}

◆ Start()

void apollo::cyber::TimingWheel::Start ( )

在文件 timing_wheel.cc26 行定义.

26 {
27 std::lock_guard<std::mutex> lock(running_mutex_);
28 if (!running_) {
29 ADEBUG << "TimeWheel start ok";
30 running_ = true;
31 tick_thread_ = std::thread([this]() { this->TickFunc(); });
32 scheduler::Instance()->SetInnerThreadAttr("timer", &tick_thread_);
33 }
34}
void SetInnerThreadAttr(const std::string &name, std::thread *thr)
Definition scheduler.cc:90

◆ Tick()

void apollo::cyber::TimingWheel::Tick ( )

在文件 timing_wheel.cc46 行定义.

46 {
47 auto& bucket = work_wheel_[current_work_wheel_index_];
48 {
49 std::lock_guard<std::mutex> lock(bucket.mutex());
50 auto ite = bucket.task_list().begin();
51 while (ite != bucket.task_list().end()) {
52 auto task = ite->lock();
53 if (task) {
54 ADEBUG << "index: " << current_work_wheel_index_
55 << " timer id: " << task->timer_id_;
56 auto* callback =
57 reinterpret_cast<std::function<void()>*>(&(task->callback));
58 cyber::Async([this, callback] {
59 if (this->running_) {
60 (*callback)();
61 }
62 });
63 }
64 ite = bucket.task_list().erase(ite);
65 }
66 }
67}

◆ TickCount()

uint64_t apollo::cyber::TimingWheel::TickCount ( ) const
inline

在文件 timing_wheel.h65 行定义.

65{ return tick_count_; }

◆ TickFunc()

void apollo::cyber::TimingWheel::TickFunc ( )

在文件 timing_wheel.cc121 行定义.

121 {
122 Rate rate(TIMER_RESOLUTION_MS * 1000000); // ms to ns
123 while (running_) {
124 Tick();
125 // AINFO_EVERY(1000) << "Tick " << TickCount();
126 tick_count_++;
127 rate.Sleep();
128 {
129 std::lock_guard<std::mutex> lock(current_work_wheel_index_mutex_);
130 current_work_wheel_index_ =
131 GetWorkWheelIndex(current_work_wheel_index_ + 1);
132 }
133 if (current_work_wheel_index_ == 0) {
134 {
135 std::lock_guard<std::mutex> lock(current_assistant_wheel_index_mutex_);
136 current_assistant_wheel_index_ =
137 GetAssistantWheelIndex(current_assistant_wheel_index_ + 1);
138 }
139 Cascade(current_assistant_wheel_index_);
140 }
141 }
142}
void Cascade(const uint64_t assistant_wheel_index)

该类的文档由以下文件生成: