Apollo 10.0
自动驾驶开放平台
unbounded_queue.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_BASE_UNBOUNDED_QUEUE_H_
18#define CYBER_BASE_UNBOUNDED_QUEUE_H_
19
20#include <unistd.h>
21
22#include <atomic>
23#include <cstdint>
24#include <memory>
25
26namespace apollo {
27namespace cyber {
28namespace base {
29
30template <typename T>
32 public:
33 UnboundedQueue() { Reset(); }
34 UnboundedQueue& operator=(const UnboundedQueue& other) = delete;
35 UnboundedQueue(const UnboundedQueue& other) = delete;
36
37 ~UnboundedQueue() { Destroy(); }
38
39 void Clear() {
40 Destroy();
41 Reset();
42 }
43
44 void Enqueue(const T& element) {
45 auto node = new Node();
46 node->data = element;
47 Node* old_tail = tail_.load();
48
49 while (true) {
50 if (tail_.compare_exchange_strong(old_tail, node)) {
51 old_tail->next = node;
52 old_tail->release();
53 size_.fetch_add(1);
54 break;
55 }
56 }
57 }
58
59 bool Dequeue(T* element) {
60 Node* old_head = head_.load();
61 Node* head_next = nullptr;
62 do {
63 head_next = old_head->next;
64
65 if (head_next == nullptr) {
66 return false;
67 }
68 } while (!head_.compare_exchange_strong(old_head, head_next));
69 *element = head_next->data;
70 size_.fetch_sub(1);
71 old_head->release();
72 return true;
73 }
74
75 size_t Size() { return size_.load(); }
76
77 bool Empty() { return size_.load() == 0; }
78
79 private:
80 struct Node {
81 T data;
82 std::atomic<uint32_t> ref_count;
83 Node* next = nullptr;
84 Node() { ref_count.store(2); }
85 void release() {
86 ref_count.fetch_sub(1);
87 if (ref_count.load() == 0) {
88 delete this;
89 }
90 }
91 };
92
93 void Reset() {
94 auto node = new Node();
95 head_.store(node);
96 tail_.store(node);
97 size_.store(0);
98 }
99
100 void Destroy() {
101 auto ite = head_.load();
102 Node* tmp = nullptr;
103 while (ite != nullptr) {
104 tmp = ite->next;
105 delete ite;
106 ite = tmp;
107 }
108 }
109
110 std::atomic<Node*> head_;
111 std::atomic<Node*> tail_;
112 std::atomic<size_t> size_;
113};
114
115} // namespace base
116} // namespace cyber
117} // namespace apollo
118
119#endif // CYBER_BASE_UNBOUNDED_QUEUE_H_
Definition node.h:31
Node is the fundamental building block of Cyber RT.
Definition node.h:44
UnboundedQueue(const UnboundedQueue &other)=delete
UnboundedQueue & operator=(const UnboundedQueue &other)=delete
class register implement
Definition arena_queue.h:37