Apollo 10.0
自动驾驶开放平台
blocker.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_BLOCKER_BLOCKER_H_
18#define CYBER_BLOCKER_BLOCKER_H_
19
20#include <cstddef>
21#include <functional>
22#include <list>
23#include <memory>
24#include <mutex>
25#include <string>
26#include <unordered_map>
27#include <vector>
28
29namespace apollo {
30namespace cyber {
31namespace blocker {
32
34 public:
35 virtual ~BlockerBase() = default;
36
37 virtual void Reset() = 0;
38 virtual void ClearObserved() = 0;
39 virtual void ClearPublished() = 0;
40 virtual void Observe() = 0;
41 virtual bool IsObservedEmpty() const = 0;
42 virtual bool IsPublishedEmpty() const = 0;
43 virtual bool Unsubscribe(const std::string& callback_id) = 0;
44
45 virtual size_t capacity() const = 0;
46 virtual void set_capacity(size_t capacity) = 0;
47 virtual const std::string& channel_name() const = 0;
48};
49
52 explicit BlockerAttr(const std::string& channel)
53 : capacity(10), channel_name(channel) {}
54 BlockerAttr(size_t cap, const std::string& channel)
55 : capacity(cap), channel_name(channel) {}
58
59 size_t capacity;
60 std::string channel_name;
61};
62
63template <typename T>
64class Blocker : public BlockerBase {
65 friend class BlockerManager;
66
67 public:
68 using MessageType = T;
69 using MessagePtr = std::shared_ptr<T>;
70 using MessageQueue = std::list<MessagePtr>;
71 using Callback = std::function<void(const MessagePtr&)>;
72 using CallbackMap = std::unordered_map<std::string, Callback>;
73 using Iterator = typename std::list<std::shared_ptr<T>>::const_iterator;
74
75 explicit Blocker(const BlockerAttr& attr);
76 virtual ~Blocker();
77
78 void Publish(const MessageType& msg);
79 void Publish(const MessagePtr& msg);
80
81 void ClearObserved() override;
82 void ClearPublished() override;
83 void Observe() override;
84 bool IsObservedEmpty() const override;
85 bool IsPublishedEmpty() const override;
86
87 bool Subscribe(const std::string& callback_id, const Callback& callback);
88 bool Unsubscribe(const std::string& callback_id) override;
89
90 const MessageType& GetLatestObserved() const;
91 const MessagePtr GetLatestObservedPtr() const;
92 const MessagePtr GetOldestObservedPtr() const;
94
95 Iterator ObservedBegin() const;
96 Iterator ObservedEnd() const;
97
98 size_t capacity() const override;
99 void set_capacity(size_t capacity) override;
100 const std::string& channel_name() const override;
101
102 private:
103 void Reset() override;
104 void Enqueue(const MessagePtr& msg);
105 void Notify(const MessagePtr& msg);
106
107 BlockerAttr attr_;
108 MessageQueue observed_msg_queue_;
109 MessageQueue published_msg_queue_;
110 mutable std::mutex msg_mutex_;
111
112 CallbackMap published_callbacks_;
113 mutable std::mutex cb_mutex_;
114
115 MessageType dummy_msg_;
116};
117
118template <typename T>
119Blocker<T>::Blocker(const BlockerAttr& attr) : attr_(attr), dummy_msg_() {}
120
121template <typename T>
123 published_msg_queue_.clear();
124 observed_msg_queue_.clear();
125 published_callbacks_.clear();
126}
127
128template <typename T>
130 Publish(std::make_shared<MessageType>(msg));
131}
132
133template <typename T>
135 Enqueue(msg);
136 Notify(msg);
137}
138
139template <typename T>
140void Blocker<T>::Reset() {
141 {
142 std::lock_guard<std::mutex> lock(msg_mutex_);
143 observed_msg_queue_.clear();
144 published_msg_queue_.clear();
145 }
146 {
147 std::lock_guard<std::mutex> lock(cb_mutex_);
148 published_callbacks_.clear();
149 }
150}
151
152template <typename T>
154 std::lock_guard<std::mutex> lock(msg_mutex_);
155 observed_msg_queue_.clear();
156}
157
158template <typename T>
160 std::lock_guard<std::mutex> lock(msg_mutex_);
161 published_msg_queue_.clear();
162}
163
164template <typename T>
166 std::lock_guard<std::mutex> lock(msg_mutex_);
167 observed_msg_queue_ = published_msg_queue_;
168}
169
170template <typename T>
172 std::lock_guard<std::mutex> lock(msg_mutex_);
173 return observed_msg_queue_.empty();
174}
175
176template <typename T>
178 std::lock_guard<std::mutex> lock(msg_mutex_);
179 return published_msg_queue_.empty();
180}
181
182template <typename T>
183bool Blocker<T>::Subscribe(const std::string& callback_id,
184 const Callback& callback) {
185 std::lock_guard<std::mutex> lock(cb_mutex_);
186 if (published_callbacks_.find(callback_id) != published_callbacks_.end()) {
187 return false;
188 }
189 published_callbacks_[callback_id] = callback;
190 return true;
191}
192
193template <typename T>
194bool Blocker<T>::Unsubscribe(const std::string& callback_id) {
195 std::lock_guard<std::mutex> lock(cb_mutex_);
196 return published_callbacks_.erase(callback_id) != 0;
197}
198
199template <typename T>
201 std::lock_guard<std::mutex> lock(msg_mutex_);
202 if (observed_msg_queue_.empty()) {
203 return dummy_msg_;
204 }
205 return *observed_msg_queue_.front();
206}
207
208template <typename T>
210 std::lock_guard<std::mutex> lock(msg_mutex_);
211 if (observed_msg_queue_.empty()) {
212 return nullptr;
213 }
214 return observed_msg_queue_.front();
215}
216
217template <typename T>
219 std::lock_guard<std::mutex> lock(msg_mutex_);
220 if (observed_msg_queue_.empty()) {
221 return nullptr;
222 }
223 return observed_msg_queue_.back();
224}
225
226template <typename T>
228 std::lock_guard<std::mutex> lock(msg_mutex_);
229 if (published_msg_queue_.empty()) {
230 return nullptr;
231 }
232 return published_msg_queue_.front();
233}
234
235template <typename T>
237 return observed_msg_queue_.begin();
238}
239
240template <typename T>
242 return observed_msg_queue_.end();
243}
244
245template <typename T>
246size_t Blocker<T>::capacity() const {
247 return attr_.capacity;
248}
249
250template <typename T>
251void Blocker<T>::set_capacity(size_t capacity) {
252 std::lock_guard<std::mutex> lock(msg_mutex_);
253 attr_.capacity = capacity;
254 while (published_msg_queue_.size() > capacity) {
255 published_msg_queue_.pop_back();
256 }
257}
258
259template <typename T>
260const std::string& Blocker<T>::channel_name() const {
261 return attr_.channel_name;
262}
263
264template <typename T>
265void Blocker<T>::Enqueue(const MessagePtr& msg) {
266 if (attr_.capacity == 0) {
267 return;
268 }
269 std::lock_guard<std::mutex> lock(msg_mutex_);
270 published_msg_queue_.push_front(msg);
271 while (published_msg_queue_.size() > attr_.capacity) {
272 published_msg_queue_.pop_back();
273 }
274}
275
276template <typename T>
277void Blocker<T>::Notify(const MessagePtr& msg) {
278 std::lock_guard<std::mutex> lock(cb_mutex_);
279 for (const auto& item : published_callbacks_) {
280 item.second(msg);
281 }
282}
283
284} // namespace blocker
285} // namespace cyber
286} // namespace apollo
287
288#endif // CYBER_BLOCKER_BLOCKER_H_
virtual const std::string & channel_name() const =0
virtual bool IsPublishedEmpty() const =0
virtual size_t capacity() const =0
virtual bool Unsubscribe(const std::string &callback_id)=0
virtual void set_capacity(size_t capacity)=0
virtual bool IsObservedEmpty() const =0
Blocker(const BlockerAttr &attr)
Definition blocker.h:119
size_t capacity() const override
Definition blocker.h:246
void ClearObserved() override
Definition blocker.h:153
typename std::list< std::shared_ptr< T > >::const_iterator Iterator
Definition blocker.h:73
bool Unsubscribe(const std::string &callback_id) override
Definition blocker.h:194
std::list< MessagePtr > MessageQueue
Definition blocker.h:70
void ClearPublished() override
Definition blocker.h:159
Iterator ObservedEnd() const
Definition blocker.h:241
bool IsObservedEmpty() const override
Definition blocker.h:171
void Publish(const MessageType &msg)
Definition blocker.h:129
bool Subscribe(const std::string &callback_id, const Callback &callback)
Definition blocker.h:183
const MessagePtr GetOldestObservedPtr() const
Definition blocker.h:218
const MessagePtr GetLatestPublishedPtr() const
Definition blocker.h:227
const MessageType & GetLatestObserved() const
Definition blocker.h:200
std::function< void(const MessagePtr &)> Callback
Definition blocker.h:71
const MessagePtr GetLatestObservedPtr() const
Definition blocker.h:209
bool IsPublishedEmpty() const override
Definition blocker.h:177
std::unordered_map< std::string, Callback > CallbackMap
Definition blocker.h:72
std::shared_ptr< T > MessagePtr
Definition blocker.h:69
const std::string & channel_name() const override
Definition blocker.h:260
void set_capacity(size_t capacity) override
Definition blocker.h:251
Iterator ObservedBegin() const
Definition blocker.h:236
class register implement
Definition arena_queue.h:37
BlockerAttr(const BlockerAttr &attr)
Definition blocker.h:56
BlockerAttr(size_t cap, const std::string &channel)
Definition blocker.h:54
BlockerAttr(const std::string &channel)
Definition blocker.h:52