Apollo 10.0
自动驾驶开放平台
subscriber.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2024 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
17
18namespace apollo {
19namespace cyber {
20namespace transport {
21
22Subscriber::Subscriber(const std::string& name, const proto::QosProfile& qos,
23 eprosima::fastdds::dds::DomainParticipant* participant,
24 const rtps::subsciber_callback& callback)
25 : channel_name_(name),
26 qos_(qos),
27 shutdown_(false),
28 callback_(callback),
29 subscriber_listener_(nullptr),
30 participant_(participant),
31 subscriber_(nullptr),
32 topic_(nullptr),
33 reader_(nullptr) {}
34
36
38 eprosima::fastdds::dds::SubscriberQos sub_qos;
40 !QosFiller::FillInSubQos(this->channel_name_, this->qos_, &sub_qos),
41 false);
42 subscriber_ = participant_->create_subscriber(sub_qos);
43 if (subscriber_ == nullptr) {
44 AINFO << "something went wrong while creating the fastdds subscriber...";
45 return false;
46 }
47
48 if (!EnsureCreateTopic(this->channel_name_)) {
49 AINFO << "something went wrong while creating the fastdds topic...";
50 return false;
51 }
52
53 eprosima::fastdds::dds::DataReaderQos reader_qos;
55 !QosFiller::FillInReaderQos(this->channel_name_, this->qos_, &reader_qos),
56 false);
57 subscriber_listener_ = new dispatcher::SubscriberListener(this->callback_);
58
59 reader_ =
60 subscriber_->create_datareader(topic_, reader_qos, subscriber_listener_);
61 if (reader_ == nullptr) {
62 AINFO << "something went wrong while creating the fastdds datareader...";
63 return false;
64 }
65
66 ADEBUG << "dds reader: " << reader_ << ", subscriber: " << this
67 << ", reader guid: " << reader_->guid()
68 << ", channel: " << this->channel_name_;
69 return true;
70}
71
73 RETURN_IF(shutdown_.exchange(true));
74
75 if (subscriber_ != nullptr && reader_ != nullptr) {
76 subscriber_->delete_datareader(reader_);
77 reader_ = nullptr;
78 }
79 if (participant_ != nullptr && subscriber_ != nullptr) {
80 if (participant_->delete_subscriber(subscriber_) ==
81 eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK) {
82 subscriber_ = nullptr;
83 } else {
84 AERROR << channel_name_ << ": Failed to delete the subscriber.";
85 }
86 }
87 if (participant_ != nullptr && topic_ != nullptr) {
88 participant_->delete_topic(topic_);
89 topic_ = nullptr;
90 }
91
92 if (subscriber_listener_ != nullptr) {
93 delete subscriber_listener_;
94 }
95}
96
97bool Subscriber::EnsureCreateTopic(const std::string& channel_name) {
98 topic_ = dynamic_cast<eprosima::fastdds::dds::Topic*>(
99 participant_->lookup_topicdescription(channel_name));
100 if (topic_ == nullptr) {
101 eprosima::fastdds::dds::TopicQos topic_qos;
103 !QosFiller::FillInTopicQos(channel_name, this->qos_, &topic_qos),
104 false);
105 topic_ =
106 participant_->create_topic(channel_name, "UnderlayMessage", topic_qos);
107 }
108 return (topic_ != nullptr);
109}
110
111} // namespace transport
112} // namespace cyber
113} // namespace apollo
static bool FillInTopicQos(const std::string &channel_name, const proto::QosProfile &qos, eprosima::fastdds::dds::TopicQos *topic_qos)
static bool FillInSubQos(const std::string &channel_name, const proto::QosProfile &qos, eprosima::fastdds::dds::SubscriberQos *sub_qos)
static bool FillInReaderQos(const std::string &channel_name, const proto::QosProfile &qos, eprosima::fastdds::dds::DataReaderQos *reader_qos)
Subscriber(const std::string &name, const proto::QosProfile &qos, eprosima::fastdds::dds::DomainParticipant *participant, const rtps::subsciber_callback &callback)
Definition subscriber.cc:22
#define RETURN_VAL_IF(condition, val)
Definition log.h:114
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
#define RETURN_IF(condition)
Definition log.h:106
#define AINFO
Definition log.h:42
std::function< void(const std::shared_ptr< std::string > &msg_str, uint64_t channel_id, const MessageInfo &msg_info)> subsciber_callback
Definition common_type.h:30
class register implement
Definition arena_queue.h:37