Apollo 10.0
自动驾驶开放平台
bounded_queue.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_BASE_BOUNDED_QUEUE_H_
18#define CYBER_BASE_BOUNDED_QUEUE_H_
19
20#include <unistd.h>
21
22#include <algorithm>
23#include <atomic>
24#include <cstdint>
25#include <cstdlib>
26#include <memory>
27#include <utility>
28
29#include "cyber/base/macros.h"
31
32namespace apollo {
33namespace cyber {
34namespace base {
35
36template <typename T>
38 public:
39 using value_type = T;
40 using size_type = uint64_t;
41
42 public:
44 BoundedQueue& operator=(const BoundedQueue& other) = delete;
45 BoundedQueue(const BoundedQueue& other) = delete;
47 bool Init(uint64_t size);
48 bool Init(uint64_t size, WaitStrategy* strategy);
49 bool Enqueue(const T& element);
50 bool Enqueue(T&& element);
51 bool WaitEnqueue(const T& element);
52 bool WaitEnqueue(T&& element);
53 bool Dequeue(T* element);
54 bool WaitDequeue(T* element);
55 uint64_t Size();
56 bool Empty();
59 uint64_t Head() { return head_.load(); }
60 uint64_t Tail() { return tail_.load(); }
61 uint64_t Commit() { return commit_.load(); }
62
63 private:
64 uint64_t GetIndex(uint64_t num);
65
66 alignas(CACHELINE_SIZE) std::atomic<uint64_t> head_ = {0};
67 alignas(CACHELINE_SIZE) std::atomic<uint64_t> tail_ = {1};
68 alignas(CACHELINE_SIZE) std::atomic<uint64_t> commit_ = {1};
69 // alignas(CACHELINE_SIZE) std::atomic<uint64_t> size_ = {0};
70 uint64_t pool_size_ = 0;
71 T* pool_ = nullptr;
72 std::unique_ptr<WaitStrategy> wait_strategy_ = nullptr;
73 volatile bool break_all_wait_ = false;
74};
75
76template <typename T>
78 if (wait_strategy_) {
79 BreakAllWait();
80 }
81 if (pool_) {
82 for (uint64_t i = 0; i < pool_size_; ++i) {
83 pool_[i].~T();
84 }
85 std::free(pool_);
86 }
87}
88
89template <typename T>
90inline bool BoundedQueue<T>::Init(uint64_t size) {
91 return Init(size, new SleepWaitStrategy());
92}
93
94template <typename T>
95bool BoundedQueue<T>::Init(uint64_t size, WaitStrategy* strategy) {
96 // Head and tail each occupy a space
97 pool_size_ = size + 2;
98 pool_ = reinterpret_cast<T*>(std::calloc(pool_size_, sizeof(T)));
99 if (pool_ == nullptr) {
100 return false;
101 }
102 for (uint64_t i = 0; i < pool_size_; ++i) {
103 new (&(pool_[i])) T();
104 }
105 wait_strategy_.reset(strategy);
106 return true;
107}
108
109template <typename T>
110bool BoundedQueue<T>::Enqueue(const T& element) {
111 uint64_t new_tail = 0;
112 uint64_t old_commit = 0;
113 uint64_t old_tail = tail_.load(std::memory_order_acquire);
114 do {
115 new_tail = old_tail + 1;
116 if (GetIndex(new_tail) == GetIndex(head_.load(std::memory_order_acquire))) {
117 return false;
118 }
119 } while (!tail_.compare_exchange_weak(old_tail, new_tail,
120 std::memory_order_acq_rel,
121 std::memory_order_relaxed));
122 pool_[GetIndex(old_tail)] = element;
123 do {
124 old_commit = old_tail;
125 } while (cyber_unlikely(!commit_.compare_exchange_weak(
126 old_commit, new_tail, std::memory_order_acq_rel,
127 std::memory_order_relaxed)));
128 wait_strategy_->NotifyOne();
129 return true;
130}
131
132template <typename T>
133bool BoundedQueue<T>::Enqueue(T&& element) {
134 uint64_t new_tail = 0;
135 uint64_t old_commit = 0;
136 uint64_t old_tail = tail_.load(std::memory_order_acquire);
137 do {
138 new_tail = old_tail + 1;
139 if (GetIndex(new_tail) == GetIndex(head_.load(std::memory_order_acquire))) {
140 return false;
141 }
142 } while (!tail_.compare_exchange_weak(old_tail, new_tail,
143 std::memory_order_acq_rel,
144 std::memory_order_relaxed));
145 pool_[GetIndex(old_tail)] = std::move(element);
146 do {
147 old_commit = old_tail;
148 } while (cyber_unlikely(!commit_.compare_exchange_weak(
149 old_commit, new_tail, std::memory_order_acq_rel,
150 std::memory_order_relaxed)));
151 wait_strategy_->NotifyOne();
152 return true;
153}
154
155template <typename T>
156bool BoundedQueue<T>::Dequeue(T* element) {
157 uint64_t new_head = 0;
158 uint64_t old_head = head_.load(std::memory_order_acquire);
159 do {
160 new_head = old_head + 1;
161 if (new_head == commit_.load(std::memory_order_acquire)) {
162 return false;
163 }
164 *element = pool_[GetIndex(new_head)];
165 } while (!head_.compare_exchange_weak(old_head, new_head,
166 std::memory_order_acq_rel,
167 std::memory_order_relaxed));
168 return true;
169}
170
171template <typename T>
172bool BoundedQueue<T>::WaitEnqueue(const T& element) {
173 while (!break_all_wait_) {
174 if (Enqueue(element)) {
175 return true;
176 }
177 if (wait_strategy_->EmptyWait()) {
178 continue;
179 }
180 // wait timeout
181 break;
182 }
183
184 return false;
185}
186
187template <typename T>
189 while (!break_all_wait_) {
190 if (Enqueue(std::move(element))) {
191 return true;
192 }
193 if (wait_strategy_->EmptyWait()) {
194 continue;
195 }
196 // wait timeout
197 break;
198 }
199
200 return false;
201}
202
203template <typename T>
205 while (!break_all_wait_) {
206 if (Dequeue(element)) {
207 return true;
208 }
209 if (wait_strategy_->EmptyWait()) {
210 continue;
211 }
212 // wait timeout
213 break;
214 }
215
216 return false;
217}
218
219template <typename T>
220inline uint64_t BoundedQueue<T>::Size() {
221 return tail_ - head_ - 1;
222}
223
224template <typename T>
226 return Size() == 0;
227}
228
229template <typename T>
230inline uint64_t BoundedQueue<T>::GetIndex(uint64_t num) {
231 return num - (num / pool_size_) * pool_size_; // faster than %
232}
233
234template <typename T>
236 wait_strategy_.reset(strategy);
237}
238
239template <typename T>
241 break_all_wait_ = true;
242 wait_strategy_->BreakAllWait();
243}
244
245} // namespace base
246} // namespace cyber
247} // namespace apollo
248
249#endif // CYBER_BASE_BOUNDED_QUEUE_H_
bool Init(uint64_t size, WaitStrategy *strategy)
BoundedQueue(const BoundedQueue &other)=delete
bool WaitEnqueue(const T &element)
BoundedQueue & operator=(const BoundedQueue &other)=delete
bool Enqueue(const T &element)
void SetWaitStrategy(WaitStrategy *WaitStrategy)
#define cyber_unlikely(x)
Definition macros.h:30
#define CACHELINE_SIZE
Definition macros.h:33
bool Init(const char *binary_name, const std::string &dag_info)
Definition init.cc:98
class register implement
Definition arena_queue.h:37