Apollo 10.0
自动驾驶开放平台
apollo::cyber::transport::IntraDispatcher类 参考

#include <intra_dispatcher.h>

类 apollo::cyber::transport::IntraDispatcher 继承关系图:
apollo::cyber::transport::IntraDispatcher 的协作图:

Public 成员函数

virtual ~IntraDispatcher ()
 
template<typename MessageT >
void OnMessage (uint64_t channel_id, const std::shared_ptr< MessageT > &message, const MessageInfo &message_info)
 
template<typename MessageT >
void AddListener (const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
 
template<typename MessageT >
void AddListener (const RoleAttributes &self_attr, const RoleAttributes &opposite_attr, const MessageListener< MessageT > &listener)
 
template<typename MessageT >
void RemoveListener (const RoleAttributes &self_attr)
 
template<typename MessageT >
void RemoveListener (const RoleAttributes &self_attr, const RoleAttributes &opposite_attr)
 
- Public 成员函数 继承自 apollo::cyber::transport::Dispatcher
 Dispatcher ()
 
virtual ~Dispatcher ()
 
virtual void Shutdown ()
 
template<typename MessageT >
void AddListener (const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
 
template<typename MessageT >
void AddListener (const RoleAttributes &self_attr, const RoleAttributes &opposite_attr, const MessageListener< MessageT > &listener)
 
template<typename MessageT >
void RemoveListener (const RoleAttributes &self_attr)
 
template<typename MessageT >
void RemoveListener (const RoleAttributes &self_attr, const RoleAttributes &opposite_attr)
 
bool HasChannel (uint64_t channel_id)
 

额外继承的成员函数

- Protected 属性 继承自 apollo::cyber::transport::Dispatcher
std::atomic< bool > is_shutdown_
 
AtomicHashMap< uint64_t, ListenerHandlerBasePtrmsg_listeners_
 
base::AtomicRWLock rw_lock_
 

详细描述

在文件 intra_dispatcher.h253 行定义.

构造及析构函数说明

◆ ~IntraDispatcher()

apollo::cyber::transport::IntraDispatcher::~IntraDispatcher ( )
virtual

在文件 intra_dispatcher.cc25 行定义.

25{}

成员函数说明

◆ AddListener() [1/2]

template<typename MessageT >
void apollo::cyber::transport::IntraDispatcher::AddListener ( const RoleAttributes self_attr,
const MessageListener< MessageT > &  listener 
)

在文件 intra_dispatcher.h347 行定义.

348 {
349 if (is_shutdown_.load()) {
350 return;
351 }
352
353 auto channel_id = self_attr.channel_id();
354 std::string message_type = message::GetMessageName<MessageT>();
355 uint64_t self_id = self_attr.id();
356
357 bool created =
358 chain_->AddListener(self_id, channel_id, message_type, listener);
359
360 auto handler = GetHandler<MessageT>(self_attr.channel_id());
361 if (handler && created) {
362 auto listener_wrapper =
363 [this, self_id, channel_id, message_type, self_attr](
364 const std::shared_ptr<MessageT>& message,
365 const MessageInfo& message_info) {
366 auto recv_time = Time::Now().ToMicrosecond();
367 statistics::Statistics::Instance()->SetProcStatus(self_attr, recv_time);
368 this->chain_->Run<MessageT>(self_id, channel_id, message_type, message,
369 message_info);
370 };
371 handler->Connect(self_id, listener_wrapper);
372 }
373}
uint64_t ToMicrosecond() const
convert time to microsecond (us).
Definition time.cc:85
static Time Now()
get the current time.
Definition time.cc:57

◆ AddListener() [2/2]

template<typename MessageT >
void apollo::cyber::transport::IntraDispatcher::AddListener ( const RoleAttributes self_attr,
const RoleAttributes opposite_attr,
const MessageListener< MessageT > &  listener 
)

在文件 intra_dispatcher.h376 行定义.

378 {
379 if (is_shutdown_.load()) {
380 return;
381 }
382
383 auto channel_id = self_attr.channel_id();
384 std::string message_type = message::GetMessageName<MessageT>();
385 uint64_t self_id = self_attr.id();
386 uint64_t oppo_id = opposite_attr.id();
387
388 bool created =
389 chain_->AddListener(self_id, oppo_id, channel_id, message_type, listener);
390
391 auto handler = GetHandler<MessageT>(self_attr.channel_id());
392 if (handler && created) {
393 auto listener_wrapper =
394 [this, self_id, oppo_id, channel_id, message_type, self_attr](
395 const std::shared_ptr<MessageT>& message,
396 const MessageInfo& message_info) {
397 auto recv_time = Time::Now().ToMicrosecond();
398 statistics::Statistics::Instance()->SetProcStatus(self_attr, recv_time);
399 this->chain_->Run<MessageT>(self_id, oppo_id, channel_id, message_type,
400 message, message_info);
401 };
402 handler->Connect(self_id, oppo_id, listener_wrapper);
403 }
404}

◆ OnMessage()

template<typename MessageT >
void apollo::cyber::transport::IntraDispatcher::OnMessage ( uint64_t  channel_id,
const std::shared_ptr< MessageT > &  message,
const MessageInfo message_info 
)

在文件 intra_dispatcher.h287 行定义.

289 {
290 if (is_shutdown_.load()) {
291 return;
292 }
293 ListenerHandlerBasePtr* handler_base = nullptr;
294 ADEBUG << "intra on message, channel:"
296 if (msg_listeners_.Get(channel_id, &handler_base)) {
297 auto handler =
298 std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
299 if (handler) {
300 handler->Run(message, message_info);
301 } else {
302 auto msg_size = message::FullByteSize(*message);
303 if (msg_size < 0) {
304 AERROR << "Failed to get message size. channel["
305 << common::GlobalData::GetChannelById(channel_id) << "]";
306 return;
307 }
308 std::string msg;
309 msg.resize(msg_size);
310 if (message::SerializeToHC(*message, const_cast<char*>(msg.data()),
311 msg_size)) {
312 (*handler_base)->RunFromString(msg, message_info);
313 } else {
314 AERROR << "Failed to serialize message. channel["
315 << common::GlobalData::GetChannelById(channel_id) << "]";
316 }
317 }
318 }
319}
static std::string GetChannelById(uint64_t id)
AtomicHashMap< uint64_t, ListenerHandlerBasePtr > msg_listeners_
Definition dispatcher.h:83
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
std::enable_if< HasSerializeToArray< T >::value, bool >::type SerializeToHC(const T &message, void *data, int size)
int FullByteSize(const T &message)
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr

◆ RemoveListener() [1/2]

template<typename MessageT >
void apollo::cyber::transport::IntraDispatcher::RemoveListener ( const RoleAttributes self_attr)

在文件 intra_dispatcher.h407 行定义.

407 {
408 if (is_shutdown_.load()) {
409 return;
410 }
411 Dispatcher::RemoveListener<MessageT>(self_attr);
412 chain_->RemoveListener<MessageT>(self_attr.id(), self_attr.channel_id(),
413 message::GetMessageName<MessageT>());
414}

◆ RemoveListener() [2/2]

template<typename MessageT >
void apollo::cyber::transport::IntraDispatcher::RemoveListener ( const RoleAttributes self_attr,
const RoleAttributes opposite_attr 
)

在文件 intra_dispatcher.h417 行定义.

418 {
419 if (is_shutdown_.load()) {
420 return;
421 }
422 Dispatcher::RemoveListener<MessageT>(self_attr, opposite_attr);
423 chain_->RemoveListener<MessageT>(self_attr.id(), opposite_attr.id(),
424 self_attr.channel_id(),
425 message::GetMessageName<MessageT>());
426}

该类的文档由以下文件生成: