Apollo 10.0
自动驾驶开放平台
participant.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2024 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
17
18#include <vector>
19
20namespace apollo {
21namespace cyber {
22namespace transport {
23
24auto Participant::CreateSubscriber(const std::string& channel_name,
25 const proto::QosProfile& qos,
26 const rtps::subsciber_callback& callback)
27 -> std::shared_ptr<transport::Subscriber> {
28 if (!participant_) {
29 AWARN << "DDSParticipant already released when the subscriber created, "
30 "channel:"
31 << channel_name;
32 return nullptr;
33 }
34 auto subscriber_ptr = std::make_shared<transport::Subscriber>(
35 channel_name, qos, participant_, callback);
36 RETURN_VAL_IF(!subscriber_ptr->Init(), nullptr);
37 std::lock_guard<std::mutex> lock(subscriber_mutex_);
38 subscriber_map_.emplace(channel_name, subscriber_ptr);
39 return subscriber_ptr;
40}
41
43 const std::string& name, int send_port,
44 eprosima::fastdds::dds::DomainParticipantListener* listener)
45 : shutdown_(false),
46 name_(name),
47 send_port_(send_port),
48 listener_(listener),
49 type_support_(new UnderlayMessageType()),
50 participant_(nullptr) {}
51
53
55 return CreateParticipant(name_, send_port_, listener_);
56}
57
59 if (shutdown_.exchange(true)) {
60 return;
61 }
62
63 publisher_map_.clear();
64 subscriber_map_.clear();
65 if (listener_ != nullptr) {
66 delete listener_;
67 listener_ = nullptr;
68 }
69}
70
71auto Participant::CreatePublisher(const std::string& channel_name,
72 const proto::QosProfile& qos)
73 -> std::shared_ptr<transport::Publisher> {
74 if (!participant_) {
75 AWARN << "DDSParticipant already released when the publisher created, "
76 "channel:"
77 << channel_name;
78 return nullptr;
79 }
80 auto publisher_ptr =
81 std::make_shared<transport::Publisher>(channel_name, qos, participant_);
82
83 RETURN_VAL_IF(!publisher_ptr->Init(), nullptr);
84 std::lock_guard<std::mutex> lock(publisher_mutex_);
85 publisher_map_.emplace(channel_name, publisher_ptr);
86 return publisher_ptr;
87}
88
89bool Participant::CreateParticipant(
90 const std::string& name, int send_port,
91 eprosima::fastdds::dds::DomainParticipantListener* listener) {
92 uint32_t domain_id = 80;
93 const char* val = ::getenv("CYBER_DOMAIN_ID");
94 if (val != nullptr) {
95 try {
96 domain_id = std::stoi(val);
97 } catch (const std::exception& e) {
98 AERROR << "convert domain_id error " << e.what();
99 return false;
100 }
101 }
102
103 std::string ip_env("127.0.0.1");
104 const char* ip_val = ::getenv("CYBER_IP");
105 if (ip_val != nullptr) {
106 ip_env = ip_val;
107 if (ip_env.empty()) {
108 AERROR << "invalid CYBER_IP (an empty string)";
109 return false;
110 }
111 }
112
113 auto part_attr_conf = std::make_shared<proto::RtpsParticipantAttr>();
114 auto& global_conf = common::GlobalData::Instance()->Config();
115
116 if (!global_conf.has_transport_conf() ||
117 !global_conf.transport_conf().has_participant_attr()) {
118 AERROR << "No rtps participant attr conf.";
119 return false;
120 }
121
122 part_attr_conf->CopyFrom(global_conf.transport_conf().participant_attr());
123
124 // set wire protocol
125 eprosima::fastdds::dds::WireProtocolConfigQos wire_protocol;
126 wire_protocol.port.domainIDGain =
127 static_cast<uint16_t>(part_attr_conf->domain_id_gain());
128 wire_protocol.port.portBase =
129 static_cast<uint16_t>(part_attr_conf->port_base());
130 wire_protocol.builtin.discovery_config.discoveryProtocol =
131 eprosima::fastrtps::rtps::DiscoveryProtocol_t::SIMPLE;
132 wire_protocol.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol =
133 true;
134 wire_protocol.builtin.discovery_config.m_simpleEDP
135 .use_PublicationReaderANDSubscriptionWriter = true;
136 wire_protocol.builtin.discovery_config.m_simpleEDP
137 .use_PublicationWriterANDSubscriptionReader = true;
138 wire_protocol.builtin.discovery_config.leaseDuration.seconds =
139 part_attr_conf->lease_duration();
140 wire_protocol.builtin.discovery_config.leaseDuration_announcementperiod
141 .seconds = part_attr_conf->announcement_period();
142 wire_protocol.builtin.discovery_config.ignoreParticipantFlags = static_cast<
143 eprosima::fastrtps::rtps::ParticipantFilteringFlags>(
144 eprosima::fastrtps::rtps::ParticipantFilteringFlags::FILTER_SAME_PROCESS);
145
146 // set transport locator
147 eprosima::fastrtps::rtps::Locator_t locator;
148 locator.port = 0;
149 RETURN_VAL_IF(!eprosima::fastrtps::rtps::IPLocator::setIPv4(locator, ip_env),
150 false);
151 locator.kind = LOCATOR_KIND_UDPv4;
152 wire_protocol.default_unicast_locator_list.push_back(locator);
153 wire_protocol.builtin.metatrafficUnicastLocatorList.push_back(locator);
154 eprosima::fastrtps::rtps::IPLocator::setIPv4(locator, 239, 255, 0, 1);
155 wire_protocol.builtin.metatrafficMulticastLocatorList.push_back(locator);
156
157 // set participant qos
158 eprosima::fastdds::dds::PropertyPolicyQos properties;
159 eprosima::fastdds::dds::DomainParticipantQos participant_qos;
160 participant_qos.name(this->name_.c_str());
161 participant_qos.wire_protocol(wire_protocol);
162 participant_qos.properties(properties);
163
164 // UDP
165 auto udp_transport =
166 std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
167 udp_transport->sendBufferSize = 1024 * 1025 * 10;
168 udp_transport->receiveBufferSize = 1024 * 1025 * 10;
169 udp_transport->interfaceWhiteList.push_back(ip_env);
170 participant_qos.transport().user_transports.push_back(udp_transport);
171 participant_qos.transport().use_builtin_transports = false;
172 AINFO << "part name: " << participant_qos.name() << ", port: " << send_port_;
173
174 participant_ =
175 eprosima::fastdds::dds::DomainParticipantFactory::get_instance()
176 ->create_participant(domain_id, participant_qos, listener_,
177 eprosima::fastdds::dds::StatusMask::none());
178 RETURN_VAL_IF_NULL(participant_, false);
179 if (type_support_.register_type(participant_) != ReturnCode_t::RETCODE_OK) {
180 AERROR << "Register type failed!";
181 return false;
182 }
183 return true;
184}
185
186bool Participant::CheckIPVaild(std::string ip_env) {
187 struct ifaddrs *ifap, *ifa;
188 struct sockaddr_in* sa;
189 char ip_address[INET_ADDRSTRLEN];
190 if (getifaddrs(&ifap) == -1) {
191 AERROR << "getifaddrs error";
192 return false;
193 }
194 std::vector<std::string> ip_vec;
195 std::stringstream ip_info;
196 ip_info << "All ip info: \n";
197 for (ifa = ifap; ifa != NULL; ifa = ifa->ifa_next) {
198 if (ifa->ifa_addr != NULL && ifa->ifa_addr->sa_family == AF_INET) {
199 sa = (struct sockaddr_in*)ifa->ifa_addr;
200 inet_ntop(AF_INET, &sa->sin_addr, ip_address, sizeof(ip_address));
201 ip_info << " " << ifa->ifa_name << " " << ip_address << "\n";
202 ip_vec.push_back(ip_address);
203 }
204 }
205 AINFO << ip_info.str();
206 freeifaddrs(ifap);
207
208 for (std::string ip_interface : ip_vec) {
209 if (ip_interface == ip_env) {
210 AINFO << "Find same the ip interface in host as cyber ip: " << ip_env;
211 return true;
212 }
213 }
214 AERROR << "The same ip interface in host as cyber ip was not found: "
215 << ip_env;
216 return false;
217}
218
219} // namespace transport
220} // namespace cyber
221} // namespace apollo
auto CreateSubscriber(const std::string &channel_name, const proto::QosProfile &qos, const rtps::subsciber_callback &callback=nullptr) -> std::shared_ptr< Subscriber >
auto CreatePublisher(const std::string &channel_name, const proto::QosProfile &qos) -> std::shared_ptr< transport::Publisher >
Participant(const std::string &name, int send_port, eprosima::fastdds::dds::DomainParticipantListener *listener=nullptr)
This class represents the TopicDataType of the type UnderlayMessage defined by the user in the IDL fi...
#define RETURN_VAL_IF_NULL(ptr, val)
Definition log.h:98
#define RETURN_VAL_IF(condition, val)
Definition log.h:114
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
#define AWARN
Definition log.h:43
std::function< void(const std::shared_ptr< std::string > &msg_str, uint64_t channel_id, const MessageInfo &msg_info)> subsciber_callback
Definition common_type.h:30
class register implement
Definition arena_queue.h:37