22#include <sys/socket.h>
35using common::GlobalData;
37MulticastNotifier::MulticastNotifier() {
46 if (is_shutdown_.exchange(
true)) {
50 if (notify_fd_ != -1) {
54 memset(¬ify_addr_, 0,
sizeof(notify_addr_));
56 if (listen_fd_ != -1) {
60 memset(&listen_addr_, 0,
sizeof(listen_addr_));
64 if (is_shutdown_.load()) {
71 sendto(notify_fd_, info_str.c_str(), info_str.size(), 0,
72 (
struct sockaddr*)¬ify_addr_,
sizeof(notify_addr_));
77 if (is_shutdown_.load()) {
81 if (info ==
nullptr) {
89 int ready_num = poll(&fds, 1, timeout_ms);
92 ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0,
nullptr,
nullptr);
94 AERROR <<
"fail to recvfrom, " << strerror(errno);
98 }
else if (ready_num == 0) {
99 ADEBUG <<
"timeout, no readableinfo.";
101 if (errno == EINTR) {
102 AINFO <<
"poll was interrupted.";
104 AERROR <<
"fail to poll, " << strerror(errno);
110bool MulticastNotifier::Init() {
111 std::string mcast_ip(
"239.255.0.100");
112 uint16_t mcast_port = 8888;
114 auto& g_conf = GlobalData::Instance()->Config();
115 if (g_conf.has_transport_conf() && g_conf.transport_conf().has_shm_conf() &&
116 g_conf.transport_conf().shm_conf().has_shm_locator()) {
117 auto& locator = g_conf.transport_conf().shm_conf().shm_locator();
118 mcast_ip = locator.ip();
119 mcast_port =
static_cast<uint16_t
>(locator.port());
122 ADEBUG <<
"multicast notifier ip: " << mcast_ip;
123 ADEBUG <<
"multicast notifier port: " << mcast_port;
125 notify_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
126 if (notify_fd_ == -1) {
127 AERROR <<
"fail to create notify fd, " << strerror(errno);
131 memset(¬ify_addr_, 0,
sizeof(notify_addr_));
132 notify_addr_.sin_family = AF_INET;
133 notify_addr_.sin_addr.s_addr = inet_addr(mcast_ip.c_str());
134 notify_addr_.sin_port = htons(mcast_port);
136 listen_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
137 if (listen_fd_ == -1) {
138 AERROR <<
"fail to create listen fd, " << strerror(errno);
142 if (fcntl(listen_fd_, F_SETFL, O_NONBLOCK) == -1) {
143 AERROR <<
"fail to set listen fd nonblock, " << strerror(errno);
147 memset(&listen_addr_, 0,
sizeof(listen_addr_));
148 listen_addr_.sin_family = AF_INET;
149 listen_addr_.sin_addr.s_addr = htonl(INADDR_ANY);
150 listen_addr_.sin_port = htons(mcast_port);
153 if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &yes,
sizeof(yes)) < 0) {
154 AERROR <<
"fail to setsockopt SO_REUSEADDR, " << strerror(errno);
158 if (bind(listen_fd_, (
struct sockaddr*)&listen_addr_,
sizeof(listen_addr_)) <
160 AERROR <<
"fail to bind addr, " << strerror(errno);
165 if (setsockopt(listen_fd_, IPPROTO_IP, IP_MULTICAST_LOOP, &loop,
167 AERROR <<
"fail to setsockopt IP_MULTICAST_LOOP, " << strerror(errno);
172 mreq.imr_multiaddr.s_addr = inet_addr(mcast_ip.c_str());
173 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
174 if (setsockopt(listen_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
176 AERROR <<
"fail to setsockopt IP_ADD_MEMBERSHIP, " << strerror(errno);
virtual ~MulticastNotifier()
bool Notify(const ReadableInfo &info) override
bool Listen(int timeout_ms, ReadableInfo *info) override
bool SerializeTo(std::string *dst) const
bool DeserializeFrom(const std::string &src)