27 -> std::shared_ptr<transport::Subscriber> {
29 AWARN <<
"DDSParticipant already released when the subscriber created, "
34 auto subscriber_ptr = std::make_shared<transport::Subscriber>(
35 channel_name, qos, participant_, callback);
37 std::lock_guard<std::mutex> lock(subscriber_mutex_);
38 subscriber_map_.emplace(channel_name, subscriber_ptr);
39 return subscriber_ptr;
43 const std::string& name,
int send_port,
44 eprosima::fastdds::dds::DomainParticipantListener* listener)
47 send_port_(send_port),
50 participant_(nullptr) {}
55 return CreateParticipant(name_, send_port_, listener_);
59 if (shutdown_.exchange(
true)) {
63 publisher_map_.clear();
64 subscriber_map_.clear();
65 if (listener_ !=
nullptr) {
73 -> std::shared_ptr<transport::Publisher> {
75 AWARN <<
"DDSParticipant already released when the publisher created, "
81 std::make_shared<transport::Publisher>(channel_name, qos, participant_);
84 std::lock_guard<std::mutex> lock(publisher_mutex_);
85 publisher_map_.emplace(channel_name, publisher_ptr);
89bool Participant::CreateParticipant(
90 const std::string& name,
int send_port,
91 eprosima::fastdds::dds::DomainParticipantListener* listener) {
92 uint32_t domain_id = 80;
93 const char* val = ::getenv(
"CYBER_DOMAIN_ID");
96 domain_id = std::stoi(val);
97 }
catch (
const std::exception& e) {
98 AERROR <<
"convert domain_id error " << e.what();
103 std::string ip_env(
"127.0.0.1");
104 const char* ip_val = ::getenv(
"CYBER_IP");
105 if (ip_val !=
nullptr) {
107 if (ip_env.empty()) {
108 AERROR <<
"invalid CYBER_IP (an empty string)";
113 auto part_attr_conf = std::make_shared<proto::RtpsParticipantAttr>();
114 auto& global_conf = common::GlobalData::Instance()->Config();
116 if (!global_conf.has_transport_conf() ||
117 !global_conf.transport_conf().has_participant_attr()) {
118 AERROR <<
"No rtps participant attr conf.";
122 part_attr_conf->CopyFrom(global_conf.transport_conf().participant_attr());
125 eprosima::fastdds::dds::WireProtocolConfigQos wire_protocol;
126 wire_protocol.port.domainIDGain =
127 static_cast<uint16_t
>(part_attr_conf->domain_id_gain());
128 wire_protocol.port.portBase =
129 static_cast<uint16_t
>(part_attr_conf->port_base());
130 wire_protocol.builtin.discovery_config.discoveryProtocol =
131 eprosima::fastrtps::rtps::DiscoveryProtocol_t::SIMPLE;
132 wire_protocol.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol =
134 wire_protocol.builtin.discovery_config.m_simpleEDP
135 .use_PublicationReaderANDSubscriptionWriter =
true;
136 wire_protocol.builtin.discovery_config.m_simpleEDP
137 .use_PublicationWriterANDSubscriptionReader =
true;
138 wire_protocol.builtin.discovery_config.leaseDuration.seconds =
139 part_attr_conf->lease_duration();
140 wire_protocol.builtin.discovery_config.leaseDuration_announcementperiod
141 .seconds = part_attr_conf->announcement_period();
142 wire_protocol.builtin.discovery_config.ignoreParticipantFlags =
static_cast<
143 eprosima::fastrtps::rtps::ParticipantFilteringFlags
>(
144 eprosima::fastrtps::rtps::ParticipantFilteringFlags::FILTER_SAME_PROCESS);
147 eprosima::fastrtps::rtps::Locator_t locator;
149 RETURN_VAL_IF(!eprosima::fastrtps::rtps::IPLocator::setIPv4(locator, ip_env),
151 locator.kind = LOCATOR_KIND_UDPv4;
152 wire_protocol.default_unicast_locator_list.push_back(locator);
153 wire_protocol.builtin.metatrafficUnicastLocatorList.push_back(locator);
154 eprosima::fastrtps::rtps::IPLocator::setIPv4(locator, 239, 255, 0, 1);
155 wire_protocol.builtin.metatrafficMulticastLocatorList.push_back(locator);
158 eprosima::fastdds::dds::PropertyPolicyQos properties;
159 eprosima::fastdds::dds::DomainParticipantQos participant_qos;
160 participant_qos.name(this->name_.c_str());
161 participant_qos.wire_protocol(wire_protocol);
162 participant_qos.properties(properties);
166 std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
167 udp_transport->sendBufferSize = 1024 * 1025 * 10;
168 udp_transport->receiveBufferSize = 1024 * 1025 * 10;
169 udp_transport->interfaceWhiteList.push_back(ip_env);
170 participant_qos.transport().user_transports.push_back(udp_transport);
171 participant_qos.transport().use_builtin_transports =
false;
172 AINFO <<
"part name: " << participant_qos.name() <<
", port: " << send_port_;
175 eprosima::fastdds::dds::DomainParticipantFactory::get_instance()
176 ->create_participant(domain_id, participant_qos, listener_,
177 eprosima::fastdds::dds::StatusMask::none());
179 if (type_support_.register_type(participant_) != ReturnCode_t::RETCODE_OK) {
180 AERROR <<
"Register type failed!";
186bool Participant::CheckIPVaild(std::string ip_env) {
187 struct ifaddrs *ifap, *ifa;
188 struct sockaddr_in* sa;
189 char ip_address[INET_ADDRSTRLEN];
190 if (getifaddrs(&ifap) == -1) {
191 AERROR <<
"getifaddrs error";
194 std::vector<std::string> ip_vec;
195 std::stringstream ip_info;
196 ip_info <<
"All ip info: \n";
197 for (ifa = ifap; ifa != NULL; ifa = ifa->ifa_next) {
198 if (ifa->ifa_addr != NULL && ifa->ifa_addr->sa_family == AF_INET) {
199 sa = (
struct sockaddr_in*)ifa->ifa_addr;
200 inet_ntop(AF_INET, &sa->sin_addr, ip_address,
sizeof(ip_address));
201 ip_info <<
" " << ifa->ifa_name <<
" " << ip_address <<
"\n";
202 ip_vec.push_back(ip_address);
205 AINFO << ip_info.str();
208 for (std::string ip_interface : ip_vec) {
209 if (ip_interface == ip_env) {
210 AINFO <<
"Find same the ip interface in host as cyber ip: " << ip_env;
214 AERROR <<
"The same ip interface in host as cyber ip was not found: "
auto CreateSubscriber(const std::string &channel_name, const proto::QosProfile &qos, const rtps::subsciber_callback &callback=nullptr) -> std::shared_ptr< Subscriber >
auto CreatePublisher(const std::string &channel_name, const proto::QosProfile &qos) -> std::shared_ptr< transport::Publisher >
Participant(const std::string &name, int send_port, eprosima::fastdds::dds::DomainParticipantListener *listener=nullptr)
This class represents the TopicDataType of the type UnderlayMessage defined by the user in the IDL fi...
#define RETURN_VAL_IF_NULL(ptr, val)
#define RETURN_VAL_IF(condition, val)
std::function< void(const std::shared_ptr< std::string > &msg_str, uint64_t channel_id, const MessageInfo &msg_info)> subsciber_callback