Apollo 10.0
自动驾驶开放平台
reader_base.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_NODE_READER_BASE_H_
18#define CYBER_NODE_READER_BASE_H_
19
20#include <atomic>
21#include <memory>
22#include <string>
23#include <unordered_map>
24#include <vector>
25
26#include "cyber/common/macros.h"
27#include "cyber/common/util.h"
30
31namespace apollo {
32namespace cyber {
33
37
47 public:
48 explicit ReaderBase(const proto::RoleAttributes& role_attr)
49 : role_attr_(role_attr), init_(false) {}
50 virtual ~ReaderBase() {}
51
58 virtual bool Init() = 0;
59
63 virtual void Shutdown() = 0;
64
68 virtual void ClearData() = 0;
69
73 virtual void Observe() = 0;
74
81 virtual bool Empty() const = 0;
82
89 virtual bool HasReceived() const = 0;
90
96 virtual double GetDelaySec() const = 0;
97
103 virtual uint32_t PendingQueueSize() const = 0;
104
111 virtual bool HasWriter() { return false; }
112
118 virtual void GetWriters(std::vector<proto::RoleAttributes>* writers) {}
119
125 const std::string& GetChannelName() const {
126 return role_attr_.channel_name();
127 }
128
134 uint64_t ChannelId() const { return role_attr_.channel_id(); }
135
142 return role_attr_.qos_profile();
143 }
144
151 bool IsInit() const { return init_.load(); }
152
153 protected:
155 std::atomic<bool> init_;
156};
157
167template <typename MessageT>
169 public:
170 ~ReceiverManager() { receiver_map_.clear(); }
171
178 auto GetReceiver(const proto::RoleAttributes& role_attr) ->
179 typename std::shared_ptr<transport::Receiver<MessageT>>;
180
181 private:
182 std::unordered_map<std::string,
183 typename std::shared_ptr<transport::Receiver<MessageT>>>
184 receiver_map_;
185 std::mutex receiver_map_mutex_;
186
188};
189
195template <typename MessageT>
196ReceiverManager<MessageT>::ReceiverManager() {}
197
198template <typename MessageT>
200 const proto::RoleAttributes& role_attr) ->
201 typename std::shared_ptr<transport::Receiver<MessageT>> {
202 std::lock_guard<std::mutex> lock(receiver_map_mutex_);
203 // because multi reader for one channel will write datacache multi times,
204 // so reader for datacache we use map to keep one instance for per channel
205 const std::string& channel_name = role_attr.channel_name();
206 if (receiver_map_.count(channel_name) == 0) {
207 receiver_map_[channel_name] =
208 transport::Transport::Instance()->CreateReceiver<MessageT>(
209 role_attr, [](const std::shared_ptr<MessageT>& msg,
210 const transport::MessageInfo& msg_info,
211 const proto::RoleAttributes& reader_attr) {
212 (void)msg_info;
213 (void)reader_attr;
214 PerfEventCache::Instance()->AddTransportEvent(
215 TransPerf::DISPATCH, reader_attr.channel_id(),
216 msg_info.seq_num());
218 reader_attr.channel_id(), msg);
219 PerfEventCache::Instance()->AddTransportEvent(
220 TransPerf::NOTIFY, reader_attr.channel_id(),
221 msg_info.seq_num());
222 });
223 }
224 return receiver_map_[channel_name];
225}
226
227} // namespace cyber
228} // namespace apollo
229
230#endif // CYBER_NODE_READER_BASE_H_
Base Class for Reader Reader is identified by one apollo::cyber::proto::RoleAttribute,...
Definition reader_base.h:46
virtual bool HasWriter()
Query is there any writer that publish the subscribed channel
std::atomic< bool > init_
bool IsInit() const
Query whether the Reader is initialized
virtual void Shutdown()=0
Shutdown the Reader object
virtual void Observe()=0
Get stored data
virtual bool HasReceived() const =0
Query whether we have received data since last clear
const std::string & GetChannelName() const
Get Reader's Channel name
virtual bool Init()=0
Init the Reader object
virtual void GetWriters(std::vector< proto::RoleAttributes > *writers)
Get all writers pushlish the channel we subscribes
virtual void ClearData()=0
Clear local data
const proto::QosProfile & QosProfile() const
Get qos profile.
uint64_t ChannelId() const
Get Reader's Channel id
virtual bool Empty() const =0
Query whether the Reader has data to be handled
ReaderBase(const proto::RoleAttributes &role_attr)
Definition reader_base.h:48
virtual double GetDelaySec() const =0
Get time interval of since last receive message
proto::RoleAttributes role_attr_
virtual uint32_t PendingQueueSize() const =0
Get the value of pending queue size
One Channel is related to one Receiver.
auto GetReceiver(const proto::RoleAttributes &role_attr) -> typename std::shared_ptr< transport::Receiver< MessageT > >
Get the Receiver object
bool Dispatch(const uint64_t channel_id, const std::shared_ptr< T > &msg)
#define DECLARE_SINGLETON(classname)
Definition macros.h:52
class register implement
Definition arena_queue.h:37