Apollo 10.0
自动驾驶开放平台
udp_listener.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#pragma once
18
19#include <arpa/inet.h>
20#include <fcntl.h>
21#include <netinet/in.h>
22#include <pthread.h>
23#include <sys/epoll.h>
24#include <sys/resource.h>
25#include <sys/socket.h>
26#include <sys/types.h>
27#include <sys/wait.h>
28#include <unistd.h>
29
30namespace apollo {
31namespace bridge {
32
33constexpr int MAXEPOLLSIZE = 100;
34
35template <typename T>
37 public:
38 typedef bool (T::*func)(int fd);
40 UDPListener(T *receiver, uint16_t port, func msg_handle) {
41 receiver_ = receiver;
42 listened_port_ = port;
43 msg_handle_ = msg_handle;
44 }
46 if (listener_sock_ != -1) {
47 close(listener_sock_);
48 }
49 }
50
51 void SetMsgHandle(func msg_handle) { msg_handle_ = msg_handle; }
52 bool Initialize(T *receiver, func msg_handle, uint16_t port);
53 bool Listen();
54
55 static void *pthread_handle_message(void *param);
56
57 public:
58 struct Param {
59 int fd_ = 0;
61 };
62
63 private:
64 bool setnonblocking(int sockfd);
65 void MessageHandle(int fd);
66
67 private:
68 T *receiver_ = nullptr;
69 uint16_t listened_port_ = 0;
70 int listener_sock_ = -1;
71 func msg_handle_ = nullptr;
72 int kdpfd_ = 0;
73};
74
75template <typename T>
76bool UDPListener<T>::Initialize(T *receiver, func msg_handle, uint16_t port) {
77 msg_handle_ = msg_handle;
78 if (!msg_handle_) {
79 return false;
80 }
81
82 receiver_ = receiver;
83 if (!receiver_) {
84 return false;
85 }
86 listened_port_ = port;
87 struct rlimit rt;
88 rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE;
89 if (setrlimit(RLIMIT_NOFILE, &rt) == -1) {
90 return false;
91 }
92
93 listener_sock_ = socket(AF_INET, SOCK_DGRAM, 0);
94 if (listener_sock_ == -1) {
95 return false;
96 }
97 int opt = SO_REUSEADDR;
98 setsockopt(listener_sock_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
99 setnonblocking(listener_sock_);
100
101 struct sockaddr_in serv_addr;
102 serv_addr.sin_family = PF_INET;
103 serv_addr.sin_port = htons((uint16_t)listened_port_);
104 serv_addr.sin_addr.s_addr = INADDR_ANY;
105 if (bind(listener_sock_, (struct sockaddr *)&serv_addr,
106 sizeof(struct sockaddr)) == -1) {
107 close(listener_sock_);
108 return false;
109 }
110 kdpfd_ = epoll_create(MAXEPOLLSIZE);
111 struct epoll_event ev;
112 ev.events = EPOLLIN | EPOLLET;
113 ev.data.fd = listener_sock_;
114 if (epoll_ctl(kdpfd_, EPOLL_CTL_ADD, listener_sock_, &ev) < 0) {
115 close(listener_sock_);
116 return false;
117 }
118 return true;
119}
120
121template <typename T>
123 int nfds = -1;
124 bool res = true;
125 struct epoll_event events[MAXEPOLLSIZE];
126 while (true) {
127 nfds = epoll_wait(kdpfd_, events, 10000, -1);
128 if (nfds == -1) {
129 res = false;
130 break;
131 }
132
133 for (int i = 0; i < nfds; ++i) {
134 if (events[i].data.fd == listener_sock_) {
135 pthread_t thread;
136 pthread_attr_t attr;
137 pthread_attr_init(&attr);
138 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
139 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
140 Param *par = new Param;
141 par->fd_ = events[i].data.fd;
142 par->listener_ = this;
143 if (pthread_create(&thread, &attr,
145 reinterpret_cast<void *>(par))) {
146 res = false;
147 return res;
148 }
149 }
150 }
151 }
152 close(listener_sock_);
153 return res;
154}
155
156template <typename T>
157bool UDPListener<T>::setnonblocking(int sockfd) {
158 if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0) | O_NONBLOCK) == -1) {
159 return false;
160 }
161 return true;
162}
163
164template <typename T>
166 Param *par = static_cast<Param *>(param);
167 int fd = par->fd_;
168 UDPListener<T> *listener = par->listener_;
169 if (par) {
170 delete par;
171 }
172 par = nullptr;
173 if (!listener) {
174 pthread_exit(nullptr);
175 }
176 listener->MessageHandle(fd);
177 pthread_exit(nullptr);
178}
179
180template <typename T>
182 if (!receiver_ || !msg_handle_) {
183 return;
184 }
185 (receiver_->*msg_handle_)(fd);
186}
187
188} // namespace bridge
189} // namespace apollo
static void * pthread_handle_message(void *param)
bool Initialize(T *receiver, func msg_handle, uint16_t port)
void SetMsgHandle(func msg_handle)
bool(T::* func)(int fd)
UDPListener(T *receiver, uint16_t port, func msg_handle)
constexpr int MAXEPOLLSIZE
class register implement
Definition arena_queue.h:37