Apollo 10.0
自动驾驶开放平台
blocker_manager.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_BLOCKER_BLOCKER_MANAGER_H_
18#define CYBER_BLOCKER_BLOCKER_MANAGER_H_
19
20#include <memory>
21#include <mutex>
22#include <string>
23#include <unordered_map>
24
26
27namespace apollo {
28namespace cyber {
29namespace blocker {
30
32 public:
33 using BlockerMap =
34 std::unordered_map<std::string, std::shared_ptr<BlockerBase>>;
35
36 virtual ~BlockerManager();
37
38 static const std::shared_ptr<BlockerManager>& Instance() {
39 static auto instance =
40 std::shared_ptr<BlockerManager>(new BlockerManager());
41 return instance;
42 }
43
44 template <typename T>
45 bool Publish(const std::string& channel_name,
46 const typename Blocker<T>::MessagePtr& msg);
47
48 template <typename T>
49 bool Publish(const std::string& channel_name,
50 const typename Blocker<T>::MessageType& msg);
51
52 template <typename T>
53 bool Subscribe(const std::string& channel_name, size_t capacity,
54 const std::string& callback_id,
55 const typename Blocker<T>::Callback& callback);
56
57 template <typename T>
58 bool Unsubscribe(const std::string& channel_name,
59 const std::string& callback_id);
60
61 template <typename T>
62 std::shared_ptr<Blocker<T>> GetBlocker(const std::string& channel_name);
63
64 template <typename T>
65 std::shared_ptr<Blocker<T>> GetOrCreateBlocker(const BlockerAttr& attr);
66
67 void Observe();
68 void Reset();
69
70 private:
72 BlockerManager(const BlockerManager&) = delete;
73 BlockerManager& operator=(const BlockerManager&) = delete;
74
75 BlockerMap blockers_;
76 std::mutex blocker_mutex_;
77};
78
79template <typename T>
80bool BlockerManager::Publish(const std::string& channel_name,
81 const typename Blocker<T>::MessagePtr& msg) {
82 auto blocker = GetOrCreateBlocker<T>(BlockerAttr(channel_name));
83 if (blocker == nullptr) {
84 return false;
85 }
86 blocker->Publish(msg);
87 return true;
88}
89
90template <typename T>
91bool BlockerManager::Publish(const std::string& channel_name,
92 const typename Blocker<T>::MessageType& msg) {
93 auto blocker = GetOrCreateBlocker<T>(BlockerAttr(channel_name));
94 if (blocker == nullptr) {
95 return false;
96 }
97 blocker->Publish(msg);
98 return true;
99}
100
101template <typename T>
102bool BlockerManager::Subscribe(const std::string& channel_name, size_t capacity,
103 const std::string& callback_id,
104 const typename Blocker<T>::Callback& callback) {
105 auto blocker = GetOrCreateBlocker<T>(BlockerAttr(capacity, channel_name));
106 if (blocker == nullptr) {
107 return false;
108 }
109 return blocker->Subscribe(callback_id, callback);
110}
111
112template <typename T>
113bool BlockerManager::Unsubscribe(const std::string& channel_name,
114 const std::string& callback_id) {
115 auto blocker = GetBlocker<T>(channel_name);
116 if (blocker == nullptr) {
117 return false;
118 }
119 return blocker->Unsubscribe(callback_id);
120}
121
122template <typename T>
123std::shared_ptr<Blocker<T>> BlockerManager::GetBlocker(
124 const std::string& channel_name) {
125 std::shared_ptr<Blocker<T>> blocker = nullptr;
126 {
127 std::lock_guard<std::mutex> lock(blocker_mutex_);
128 auto search = blockers_.find(channel_name);
129 if (search != blockers_.end()) {
130 blocker = std::dynamic_pointer_cast<Blocker<T>>(search->second);
131 }
132 }
133 return blocker;
134}
135
136template <typename T>
137std::shared_ptr<Blocker<T>> BlockerManager::GetOrCreateBlocker(
138 const BlockerAttr& attr) {
139 std::shared_ptr<Blocker<T>> blocker = nullptr;
140 {
141 std::lock_guard<std::mutex> lock(blocker_mutex_);
142 auto search = blockers_.find(attr.channel_name);
143 if (search != blockers_.end()) {
144 blocker = std::dynamic_pointer_cast<Blocker<T>>(search->second);
145 } else {
146 blocker = std::make_shared<Blocker<T>>(attr);
147 blockers_[attr.channel_name] = blocker;
148 }
149 }
150 return blocker;
151}
152
153} // namespace blocker
154} // namespace cyber
155} // namespace apollo
156
157#endif // CYBER_BLOCKER_BLOCKER_MANAGER_H_
bool Publish(const std::string &channel_name, const typename Blocker< T >::MessagePtr &msg)
static const std::shared_ptr< BlockerManager > & Instance()
std::unordered_map< std::string, std::shared_ptr< BlockerBase > > BlockerMap
std::shared_ptr< Blocker< T > > GetOrCreateBlocker(const BlockerAttr &attr)
bool Unsubscribe(const std::string &channel_name, const std::string &callback_id)
bool Subscribe(const std::string &channel_name, size_t capacity, const std::string &callback_id, const typename Blocker< T >::Callback &callback)
std::shared_ptr< Blocker< T > > GetBlocker(const std::string &channel_name)
std::function< void(const MessagePtr &)> Callback
Definition blocker.h:71
std::shared_ptr< T > MessagePtr
Definition blocker.h:69
class register implement
Definition arena_queue.h:37