17#ifndef CYBER_NODE_READER_BASE_H_
18#define CYBER_NODE_READER_BASE_H_
23#include <unordered_map>
118 virtual void GetWriters(std::vector<proto::RoleAttributes>* writers) {}
167template <
typename MessageT>
179 typename std::shared_ptr<transport::Receiver<MessageT>>;
182 std::unordered_map<std::string,
183 typename std::shared_ptr<transport::Receiver<MessageT>>>
185 std::mutex receiver_map_mutex_;
195template <
typename MessageT>
196ReceiverManager<MessageT>::ReceiverManager() {}
198template <
typename MessageT>
201 typename std::shared_ptr<transport::Receiver<MessageT>> {
202 std::lock_guard<std::mutex> lock(receiver_map_mutex_);
205 const std::string& channel_name = role_attr.channel_name();
206 if (receiver_map_.count(channel_name) == 0) {
207 receiver_map_[channel_name] =
208 transport::Transport::Instance()->CreateReceiver<MessageT>(
209 role_attr, [](
const std::shared_ptr<MessageT>& msg,
214 PerfEventCache::Instance()->AddTransportEvent(
215 TransPerf::DISPATCH, reader_attr.channel_id(),
218 reader_attr.channel_id(), msg);
219 PerfEventCache::Instance()->AddTransportEvent(
220 TransPerf::NOTIFY, reader_attr.channel_id(),
224 return receiver_map_[channel_name];
Base Class for Reader Reader is identified by one apollo::cyber::proto::RoleAttribute,...
virtual bool HasWriter()
Query is there any writer that publish the subscribed channel
std::atomic< bool > init_
bool IsInit() const
Query whether the Reader is initialized
virtual void Shutdown()=0
Shutdown the Reader object
virtual void Observe()=0
Get stored data
virtual bool HasReceived() const =0
Query whether we have received data since last clear
const std::string & GetChannelName() const
Get Reader's Channel name
virtual bool Init()=0
Init the Reader object
virtual void GetWriters(std::vector< proto::RoleAttributes > *writers)
Get all writers pushlish the channel we subscribes
virtual void ClearData()=0
Clear local data
const proto::QosProfile & QosProfile() const
Get qos profile.
uint64_t ChannelId() const
Get Reader's Channel id
virtual bool Empty() const =0
Query whether the Reader has data to be handled
ReaderBase(const proto::RoleAttributes &role_attr)
virtual double GetDelaySec() const =0
Get time interval of since last receive message
proto::RoleAttributes role_attr_
virtual uint32_t PendingQueueSize() const =0
Get the value of pending queue size
One Channel is related to one Receiver.
auto GetReceiver(const proto::RoleAttributes &role_attr) -> typename std::shared_ptr< transport::Receiver< MessageT > >
Get the Receiver object
bool Dispatch(const uint64_t channel_id, const std::shared_ptr< T > &msg)
#define DECLARE_SINGLETON(classname)
optional QosProfile qos_profile
optional string channel_name
optional uint64 channel_id