Apollo 10.0
自动驾驶开放平台
multicast_notifier.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <arpa/inet.h>
20#include <fcntl.h>
21#include <poll.h>
22#include <sys/socket.h>
23#include <sys/types.h>
24#include <unistd.h>
25#include <cstring>
26#include <string>
27
29#include "cyber/common/log.h"
30
31namespace apollo {
32namespace cyber {
33namespace transport {
34
35using common::GlobalData;
36
37MulticastNotifier::MulticastNotifier() {
38 if (!Init()) {
39 Shutdown();
40 }
41}
42
44
46 if (is_shutdown_.exchange(true)) {
47 return;
48 }
49
50 if (notify_fd_ != -1) {
51 close(notify_fd_);
52 notify_fd_ = -1;
53 }
54 memset(&notify_addr_, 0, sizeof(notify_addr_));
55
56 if (listen_fd_ != -1) {
57 close(listen_fd_);
58 listen_fd_ = -1;
59 }
60 memset(&listen_addr_, 0, sizeof(listen_addr_));
61}
62
64 if (is_shutdown_.load()) {
65 return false;
66 }
67
68 std::string info_str;
69 info.SerializeTo(&info_str);
70 ssize_t nbytes =
71 sendto(notify_fd_, info_str.c_str(), info_str.size(), 0,
72 (struct sockaddr*)&notify_addr_, sizeof(notify_addr_));
73 return nbytes > 0;
74}
75
76bool MulticastNotifier::Listen(int timeout_ms, ReadableInfo* info) {
77 if (is_shutdown_.load()) {
78 return false;
79 }
80
81 if (info == nullptr) {
82 AERROR << "info nullptr.";
83 return false;
84 }
85
86 struct pollfd fds;
87 fds.fd = listen_fd_;
88 fds.events = POLLIN;
89 int ready_num = poll(&fds, 1, timeout_ms);
90 if (ready_num > 0) {
91 char buf[32] = {0}; // larger than ReadableInfo::kSize
92 ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
93 if (nbytes == -1) {
94 AERROR << "fail to recvfrom, " << strerror(errno);
95 return false;
96 }
97 return info->DeserializeFrom(buf, nbytes);
98 } else if (ready_num == 0) {
99 ADEBUG << "timeout, no readableinfo.";
100 } else {
101 if (errno == EINTR) {
102 AINFO << "poll was interrupted.";
103 } else {
104 AERROR << "fail to poll, " << strerror(errno);
105 }
106 }
107 return false;
108}
109
110bool MulticastNotifier::Init() {
111 std::string mcast_ip("239.255.0.100");
112 uint16_t mcast_port = 8888;
113
114 auto& g_conf = GlobalData::Instance()->Config();
115 if (g_conf.has_transport_conf() && g_conf.transport_conf().has_shm_conf() &&
116 g_conf.transport_conf().shm_conf().has_shm_locator()) {
117 auto& locator = g_conf.transport_conf().shm_conf().shm_locator();
118 mcast_ip = locator.ip();
119 mcast_port = static_cast<uint16_t>(locator.port());
120 }
121
122 ADEBUG << "multicast notifier ip: " << mcast_ip;
123 ADEBUG << "multicast notifier port: " << mcast_port;
124
125 notify_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
126 if (notify_fd_ == -1) {
127 AERROR << "fail to create notify fd, " << strerror(errno);
128 return false;
129 }
130
131 memset(&notify_addr_, 0, sizeof(notify_addr_));
132 notify_addr_.sin_family = AF_INET;
133 notify_addr_.sin_addr.s_addr = inet_addr(mcast_ip.c_str());
134 notify_addr_.sin_port = htons(mcast_port);
135
136 listen_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
137 if (listen_fd_ == -1) {
138 AERROR << "fail to create listen fd, " << strerror(errno);
139 return false;
140 }
141
142 if (fcntl(listen_fd_, F_SETFL, O_NONBLOCK) == -1) {
143 AERROR << "fail to set listen fd nonblock, " << strerror(errno);
144 return false;
145 }
146
147 memset(&listen_addr_, 0, sizeof(listen_addr_));
148 listen_addr_.sin_family = AF_INET;
149 listen_addr_.sin_addr.s_addr = htonl(INADDR_ANY);
150 listen_addr_.sin_port = htons(mcast_port);
151
152 int yes = 1;
153 if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
154 AERROR << "fail to setsockopt SO_REUSEADDR, " << strerror(errno);
155 return false;
156 }
157
158 if (bind(listen_fd_, (struct sockaddr*)&listen_addr_, sizeof(listen_addr_)) <
159 0) {
160 AERROR << "fail to bind addr, " << strerror(errno);
161 return false;
162 }
163
164 int loop = 1;
165 if (setsockopt(listen_fd_, IPPROTO_IP, IP_MULTICAST_LOOP, &loop,
166 sizeof(loop)) < 0) {
167 AERROR << "fail to setsockopt IP_MULTICAST_LOOP, " << strerror(errno);
168 return false;
169 }
170
171 struct ip_mreq mreq;
172 mreq.imr_multiaddr.s_addr = inet_addr(mcast_ip.c_str());
173 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
174 if (setsockopt(listen_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
175 sizeof(mreq)) < 0) {
176 AERROR << "fail to setsockopt IP_ADD_MEMBERSHIP, " << strerror(errno);
177 return false;
178 }
179
180 return true;
181}
182
183} // namespace transport
184} // namespace cyber
185} // namespace apollo
bool Notify(const ReadableInfo &info) override
bool Listen(int timeout_ms, ReadableInfo *info) override
bool SerializeTo(std::string *dst) const
bool DeserializeFrom(const std::string &src)
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
class register implement
Definition arena_queue.h:37