Apollo 10.0
自动驾驶开放平台
apollo::cyber::transport::RtpsDispatcher类 参考

#include <rtps_dispatcher.h>

类 apollo::cyber::transport::RtpsDispatcher 继承关系图:
apollo::cyber::transport::RtpsDispatcher 的协作图:

Public 成员函数

virtual ~RtpsDispatcher ()
 
void Shutdown () override
 
template<typename MessageT >
void AddListener (const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
 
template<typename MessageT >
void AddListener (const RoleAttributes &self_attr, const RoleAttributes &opposite_attr, const MessageListener< MessageT > &listener)
 
void SetParticipant (const ParticipantPtr &participant)
 
- Public 成员函数 继承自 apollo::cyber::transport::Dispatcher
 Dispatcher ()
 
virtual ~Dispatcher ()
 
template<typename MessageT >
void AddListener (const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
 
template<typename MessageT >
void AddListener (const RoleAttributes &self_attr, const RoleAttributes &opposite_attr, const MessageListener< MessageT > &listener)
 
template<typename MessageT >
void RemoveListener (const RoleAttributes &self_attr)
 
template<typename MessageT >
void RemoveListener (const RoleAttributes &self_attr, const RoleAttributes &opposite_attr)
 
bool HasChannel (uint64_t channel_id)
 

额外继承的成员函数

- Protected 属性 继承自 apollo::cyber::transport::Dispatcher
std::atomic< bool > is_shutdown_
 
AtomicHashMap< uint64_t, ListenerHandlerBasePtrmsg_listeners_
 
base::AtomicRWLock rw_lock_
 

详细描述

在文件 rtps_dispatcher.h49 行定义.

构造及析构函数说明

◆ ~RtpsDispatcher()

apollo::cyber::transport::RtpsDispatcher::~RtpsDispatcher ( )
virtual

在文件 rtps_dispatcher.cc30 行定义.

成员函数说明

◆ AddListener() [1/2]

template<typename MessageT >
void apollo::cyber::transport::RtpsDispatcher::AddListener ( const RoleAttributes self_attr,
const MessageListener< MessageT > &  listener 
)

在文件 rtps_dispatcher.h85 行定义.

86 {
87 auto listener_adapter = [listener, self_attr](
88 const std::shared_ptr<std::string>& msg_str,
89 const MessageInfo& msg_info) {
90 auto msg = std::make_shared<MessageT>();
91 RETURN_IF(!message::ParseFromString(*msg_str, msg.get()));
92 uint64_t recv_time = Time::Now().ToNanosecond();
93 uint64_t send_time = msg_info.send_time();
94 if (send_time > recv_time) {
95 AWARN << "The message is received (" << recv_time
96 << ") earlier than the message is sent (" << send_time << ")";
97 } else {
98 uint64_t diff = recv_time - send_time;
99 // sample transport latency in microsecond
100 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
101 self_attr, diff);
102 }
103 statistics::Statistics::Instance()->SetProcStatus(self_attr, recv_time);
104 listener(msg, msg_info);
105 };
106
107 Dispatcher::AddListener<std::string>(self_attr, listener_adapter);
108 AddSubscriber(self_attr);
109}
uint64_t ToNanosecond() const
convert time to nanosecond.
Definition time.cc:83
static Time Now()
get the current time.
Definition time.cc:57
#define RETURN_IF(condition)
Definition log.h:106
#define AWARN
Definition log.h:43
std::enable_if< HasParseFromString< T >::value, bool >::type ParseFromString(const std::string &str, T *message)

◆ AddListener() [2/2]

template<typename MessageT >
void apollo::cyber::transport::RtpsDispatcher::AddListener ( const RoleAttributes self_attr,
const RoleAttributes opposite_attr,
const MessageListener< MessageT > &  listener 
)

在文件 rtps_dispatcher.h112 行定义.

114 {
115 auto listener_adapter = [listener, self_attr](
116 const std::shared_ptr<std::string>& msg_str,
117 const MessageInfo& msg_info) {
118 auto msg = std::make_shared<MessageT>();
119 RETURN_IF(!message::ParseFromString(*msg_str, msg.get()));
120 uint64_t recv_time = Time::Now().ToNanosecond();
121 uint64_t send_time = msg_info.send_time();
122 if (send_time > recv_time) {
123 AWARN << "The message is received (" << recv_time
124 << ") earlier than the message is sent (" << send_time << ")";
125 } else {
126 uint64_t diff = recv_time - send_time;
127 // sample transport latency in microsecond
128 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
129 self_attr, diff);
130 }
131 statistics::Statistics::Instance()->SetProcStatus(self_attr, recv_time);
132 listener(msg, msg_info);
133 };
134
135 Dispatcher::AddListener<std::string>(self_attr, opposite_attr,
136 listener_adapter);
137 AddSubscriber(self_attr);
138}

◆ SetParticipant()

void apollo::cyber::transport::RtpsDispatcher::SetParticipant ( const ParticipantPtr participant)
inline

在文件 rtps_dispatcher.h64 行定义.

64 {
65 participant_ = participant;
66 }

◆ Shutdown()

void apollo::cyber::transport::RtpsDispatcher::Shutdown ( )
overridevirtual

重载 apollo::cyber::transport::Dispatcher .

在文件 rtps_dispatcher.cc32 行定义.

32 {
33 if (is_shutdown_.exchange(true)) {
34 return;
35 }
36
37 {
38 std::lock_guard<std::mutex> lock(subs_mutex_);
39 for (auto& item : subs_) {
40 item.second = nullptr;
41 }
42 }
43
44 participant_ = nullptr;
45}

该类的文档由以下文件生成: