Apollo 10.0
自动驾驶开放平台
rtps_dispatcher.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include "cyber/common/log.h"
23
24namespace apollo {
25namespace cyber {
26namespace transport {
27
28RtpsDispatcher::RtpsDispatcher() : participant_(nullptr) {}
29
30RtpsDispatcher::~RtpsDispatcher() { Shutdown(); }
31
32void RtpsDispatcher::Shutdown() {
33 if (is_shutdown_.exchange(true)) {
34 return;
35 }
36
37 {
38 std::lock_guard<std::mutex> lock(subs_mutex_);
39 for (auto& item : subs_) {
40 item.second = nullptr;
41 }
42 }
43
44 participant_ = nullptr;
45}
46
47void RtpsDispatcher::AddSubscriber(const RoleAttributes& self_attr) {
48 if (participant_ == nullptr) {
49 AWARN << "please set participant firstly.";
50 return;
51 }
52
53 uint64_t channel_id = self_attr.channel_id();
54 std::lock_guard<std::mutex> lock(subs_mutex_);
55 // subscriber exsit
56 if (subs_.count(channel_id) > 0) {
57 return;
58 }
59
60 auto listener_adapter =
61 [this, self_attr](const std::shared_ptr<std::string>& msg_str,
62 uint64_t channel_id, const MessageInfo& msg_info) {
63 statistics::Statistics::Instance()->AddRecvCount(self_attr,
64 msg_info.seq_num());
65 statistics::Statistics::Instance()->SetTotalMsgsStatus(
66 self_attr, msg_info.seq_num());
67 this->OnMessage(channel_id, msg_str, msg_info);
68 };
69
70 auto& qos = self_attr.qos_profile();
71 auto subscriber_ptr = participant_->CreateSubscriber(self_attr.channel_name(),
72 qos, listener_adapter);
73 RETURN_IF_NULL(subscriber_ptr);
74
75 subs_[channel_id] = subscriber_ptr;
76}
77
78void RtpsDispatcher::OnMessage(uint64_t channel_id,
79 const std::shared_ptr<std::string>& msg_str,
80 const MessageInfo& msg_info) {
81 if (is_shutdown_.load()) {
82 return;
83 }
84
85 ListenerHandlerBasePtr* handler_base = nullptr;
86 if (msg_listeners_.Get(channel_id, &handler_base)) {
87 auto handler =
88 std::dynamic_pointer_cast<ListenerHandler<std::string>>(*handler_base);
89 handler->Run(msg_str, msg_info);
90 }
91}
92
93} // namespace transport
94} // namespace cyber
95} // namespace apollo
#define RETURN_IF_NULL(ptr)
Definition log.h:90
#define AWARN
Definition log.h:43
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr
class register implement
Definition arena_queue.h:37