28RtpsDispatcher::RtpsDispatcher() : participant_(nullptr) {}
30RtpsDispatcher::~RtpsDispatcher() { Shutdown(); }
32void RtpsDispatcher::Shutdown() {
33 if (is_shutdown_.exchange(
true)) {
38 std::lock_guard<std::mutex> lock(subs_mutex_);
39 for (
auto& item : subs_) {
40 item.second =
nullptr;
44 participant_ =
nullptr;
47void RtpsDispatcher::AddSubscriber(
const RoleAttributes& self_attr) {
48 if (participant_ ==
nullptr) {
49 AWARN <<
"please set participant firstly.";
54 std::lock_guard<std::mutex> lock(subs_mutex_);
56 if (subs_.count(channel_id) > 0) {
60 auto listener_adapter =
61 [
this, self_attr](
const std::shared_ptr<std::string>& msg_str,
62 uint64_t channel_id,
const MessageInfo& msg_info) {
63 statistics::Statistics::Instance()->AddRecvCount(self_attr,
65 statistics::Statistics::Instance()->SetTotalMsgsStatus(
66 self_attr, msg_info.seq_num());
67 this->OnMessage(channel_id, msg_str, msg_info);
71 auto subscriber_ptr = participant_->CreateSubscriber(self_attr.
channel_name(),
72 qos, listener_adapter);
75 subs_[channel_id] = subscriber_ptr;
78void RtpsDispatcher::OnMessage(uint64_t channel_id,
79 const std::shared_ptr<std::string>& msg_str,
80 const MessageInfo& msg_info) {
81 if (is_shutdown_.load()) {
86 if (msg_listeners_.Get(channel_id, &handler_base)) {
88 std::dynamic_pointer_cast<ListenerHandler<std::string>>(*handler_base);
89 handler->Run(msg_str, msg_info);
#define RETURN_IF_NULL(ptr)
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr
optional QosProfile qos_profile
optional string channel_name
optional uint64 channel_id