Apollo 10.0
自动驾驶开放平台
arena_queue.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2024 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_BASE_ARENA_QUEUE_H_
18#define CYBER_BASE_ARENA_QUEUE_H_
19
20#include <unistd.h>
21
22#include <algorithm>
23#include <atomic>
24#include <cstdint>
25#include <cstdlib>
26#include <deque>
27#include <iostream>
28#include <memory>
29#include <utility>
30#include <vector>
31
32#include <google/protobuf/arena.h>
33
34#include "cyber/base/macros.h"
36
37namespace apollo {
38namespace cyber {
39namespace base {
40
41template <typename T>
43 public:
44 using value_type = T;
45 using size_type = uint64_t;
46
47 public:
49 ArenaQueue& operator=(const ArenaQueue& other) = delete;
50 ArenaQueue(const ArenaQueue& other) = delete;
52 bool Init(uint64_t size);
53 bool Init(uint64_t size, google::protobuf::Arena* arena);
54
55 T* AddBack();
56 T* PopFront();
57 T* GetBack();
58 T* GetFront();
59
60 uint64_t Size();
61 bool Empty();
62 uint64_t Head() { return head_.load(); }
63 uint64_t Tail() { return tail_.load(); }
64 uint64_t Commit() { return commit_.load(); }
65 bool NextIndex(uint64_t& index) {
66 if (Empty()) {
67 return false;
68 }
69 if (arena_) {
70 if (GetIndex(index) < Tail() - 1) {
71 index = GetIndex(index + 1);
72 return true;
73 }
74 return false;
75 } else {
76 if (index < Size() - 1) {
77 index = index + 1;
78 return true;
79 }
80 return false;
81 }
82 }
83 bool GetHeadIndex(uint64_t& index) {
84 if (Empty()) {
85 return false;
86 }
87 if (arena_) {
88 index = GetIndex(head_ + 1);
89 return true;
90 } else {
91 index = 0;
92 return true;
93 }
94 }
95 bool GetTailIndex(uint64_t& index) {
96 if (Empty()) {
97 return false;
98 }
99 if (arena_) {
100 index = GetIndex(tail_ - 1);
101 return true;
102 } else {
103 index = Size() - 1;
104 return true;
105 }
106 }
107 bool GetEleByIndex(uint64_t i, T*& ptr) {
108 if (Empty()) {
109 return false;
110 }
111 if (arena_) {
112 ptr = pool_[GetIndex(i)];
113 return true;
114 } else {
115 if (i > Size() - 1) {
116 return false;
117 }
118 ptr = &normal_queue[i];
119 return true;
120 }
121 }
122 bool IsArenaEnable() { return arena_; }
123
124 private:
125 uint64_t GetIndex(uint64_t num);
126
127 alignas(CACHELINE_SIZE) std::atomic<uint64_t> head_ = {0};
128 alignas(CACHELINE_SIZE) std::atomic<uint64_t> tail_ = {1};
129 alignas(CACHELINE_SIZE) std::atomic<uint64_t> commit_ = {1};
130
131 uint64_t pool_size_ = 0;
132 std::vector<T*> pool_;
133 bool arena_;
134 std::deque<T> normal_queue;
135};
136
137template <typename T>
139
140template <typename T>
141inline bool ArenaQueue<T>::Init(uint64_t size) {
142 arena_ = false;
143 return true;
144}
145
146template <typename T>
147inline bool ArenaQueue<T>::Init(uint64_t size, google::protobuf::Arena* arena) {
148 pool_size_ = size + 2;
149 if (pool_.size() == pool_size_) {
150 return true;
151 }
152 pool_.clear();
153 for (uint64_t i = 0; i < pool_size_; ++i) {
154 pool_.push_back(google::protobuf::Arena::CreateMessage<T>(arena));
155 }
156 arena_ = true;
157 return true;
158}
159
160template <typename T>
162 if (Empty()) {
163 return nullptr;
164 }
165 if (arena_) {
166 return pool_[GetIndex(tail_ - 1)];
167 } else {
168 return &normal_queue.back();
169 }
170}
171
172template <typename T>
174 if (Empty()) {
175 return nullptr;
176 }
177 if (arena_) {
178 return pool_[GetIndex(head_ + 1)];
179 } else {
180 return &normal_queue.front();
181 }
182}
183
184template <typename T>
186 if (arena_) {
187 uint64_t new_tail = 0;
188 uint64_t old_commit = 0;
189 uint64_t old_tail = tail_.load(std::memory_order_acquire);
190 do {
191 new_tail = old_tail + 1;
192 if (GetIndex(new_tail) ==
193 GetIndex(head_.load(std::memory_order_acquire))) {
194 return nullptr;
195 }
196 } while (!tail_.compare_exchange_weak(old_tail, new_tail,
197 std::memory_order_acq_rel,
198 std::memory_order_relaxed));
199 do {
200 old_commit = old_tail;
201 } while (cyber_unlikely(!commit_.compare_exchange_weak(
202 old_commit, new_tail, std::memory_order_acq_rel,
203 std::memory_order_relaxed)));
204 return pool_[GetIndex(old_tail)];
205 } else {
206 T instance;
207 normal_queue.push_back(instance);
208 return &normal_queue.back();
209 }
210}
211
212template <typename T>
214 if (Empty()) {
215 return nullptr;
216 }
217 if (arena_) {
218 uint64_t new_head = 0;
219 uint64_t old_head = head_.load(std::memory_order_acquire);
220 do {
221 new_head = old_head + 1;
222 if (new_head == commit_.load(std::memory_order_acquire)) {
223 return nullptr;
224 }
225 } while (!head_.compare_exchange_weak(old_head, new_head,
226 std::memory_order_acq_rel,
227 std::memory_order_relaxed));
228 return pool_[GetIndex(new_head)];
229 } else {
230 normal_queue.pop_front();
231 return nullptr;
232 }
233}
234
235template <typename T>
236inline uint64_t ArenaQueue<T>::Size() {
237 if (arena_) {
238 return tail_ - head_ - 1;
239 } else {
240 return normal_queue.size();
241 }
242}
243
244template <typename T>
245inline bool ArenaQueue<T>::Empty() {
246 if (arena_) {
247 return Size() == 0;
248 } else {
249 return normal_queue.empty();
250 }
251}
252
253template <typename T>
254inline uint64_t ArenaQueue<T>::GetIndex(uint64_t num) {
255 return num - (num / pool_size_) * pool_size_; // faster than %
256}
257
258} // namespace base
259} // namespace cyber
260} // namespace apollo
261
262#endif // CYBER_BASE_ARENA_QUEUE_H_
bool GetEleByIndex(uint64_t i, T *&ptr)
ArenaQueue & operator=(const ArenaQueue &other)=delete
bool GetHeadIndex(uint64_t &index)
Definition arena_queue.h:83
ArenaQueue(const ArenaQueue &other)=delete
bool GetTailIndex(uint64_t &index)
Definition arena_queue.h:95
bool NextIndex(uint64_t &index)
Definition arena_queue.h:65
#define cyber_unlikely(x)
Definition macros.h:30
#define CACHELINE_SIZE
Definition macros.h:33
class register implement
Definition arena_queue.h:37