Apollo 10.0
自动驾驶开放平台
udp_stream.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2017 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#include <arpa/inet.h>
18#include <fcntl.h>
19#include <netinet/in.h>
20#include <sys/socket.h>
21#include <sys/types.h>
22#include <unistd.h>
23
24#include <cerrno>
25
26#include "cyber/cyber.h"
28
29namespace apollo {
30namespace drivers {
31namespace gnss {
32
33class UdpStream : public Stream {
34 typedef uint16_t be16_t;
35 typedef uint32_t be32_t;
36
37 public:
38 UdpStream(const char* address, uint16_t port, uint32_t timeout_usec);
39 ~UdpStream();
40
41 virtual bool Connect();
42 virtual bool Disconnect();
43 virtual size_t read(uint8_t* buffer, size_t max_length);
44 virtual size_t write(const uint8_t* data, size_t length);
45
46 private:
47 UdpStream() {}
48 void open();
49 void close();
50 be16_t peer_port_ = 0;
51 be32_t peer_addr_ = 0;
52 uint32_t timeout_usec_ = 0;
53 int sockfd_ = -1;
54 int errno_ = 0;
55};
56
57Stream* Stream::create_udp(const char* address, uint16_t port,
58 uint32_t timeout_usec) {
59 return new UdpStream(address, port, timeout_usec);
60}
61
62UdpStream::UdpStream(const char* address, uint16_t port, uint32_t timeout_usec)
63 : sockfd_(-1), errno_(0) {
64 peer_addr_ = inet_addr(address);
65 peer_port_ = htons(port);
66 timeout_usec_ = timeout_usec;
67 // call open or call open in connect later
68}
69
70UdpStream::~UdpStream() { this->close(); }
71
72void UdpStream::open() {
73 int fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
74 if (fd < 0) {
75 // error
76 AERROR << "Create socket failed, errno: " << errno << ", "
77 << strerror(errno);
78 return;
79 }
80
81 struct sockaddr_in peer_sockaddr;
82 socklen_t socklenth = sizeof(peer_sockaddr);
83 bzero(&peer_sockaddr, sizeof(peer_sockaddr));
84 peer_sockaddr.sin_family = AF_INET;
85 peer_sockaddr.sin_port = peer_port_;
86 peer_sockaddr.sin_addr.s_addr = INADDR_ANY;
87 if (bind(fd, (struct sockaddr*)&peer_sockaddr, sizeof(peer_sockaddr)) == -1) {
88 AERROR << " Socket bind failed! Port " << peer_port_;
89 ::close(fd);
90 return;
91 }
92
93 // block or not block
94 if (timeout_usec_ != 0) {
95 int flags = fcntl(fd, F_GETFL, 0);
96 if (flags == -1) {
97 ::close(fd);
98 AERROR << "fcntl get flag failed, errno: " << errno << ", "
99 << strerror(errno);
100 return;
101 }
102
103 if (fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == -1) {
104 ::close(fd);
105 AERROR << "fcntl set block failed, errno: " << errno << ", "
106 << strerror(errno);
107 return;
108 }
109
110 struct timeval block_to = {timeout_usec_ / 1000000,
111 timeout_usec_ % 1000000};
112 if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO,
113 reinterpret_cast<char*>(&block_to), sizeof(block_to)) < 0) {
114 ::close(fd);
115 AERROR << "setsockopt set rcv timeout failed, errno: " << errno << ", "
116 << strerror(errno);
117 return;
118 }
119
120 if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO,
121 reinterpret_cast<char*>(&block_to), sizeof(block_to)) < 0) {
122 ::close(fd);
123 AERROR << "setsockopt set snd timeout failed, errno: " << errno << ", "
124 << strerror(errno);
125 return;
126 }
127 } else {
128 int flags = fcntl(fd, F_GETFL, 0);
129 if (flags == -1) {
130 ::close(fd);
131 AERROR << "fcntl get flag failed, errno: " << errno << ", "
132 << strerror(errno);
133 return;
134 }
135
136 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
137 ::close(fd);
138 AERROR << "fcntl set non block failed, errno: " << errno << ", "
139 << strerror(errno);
140 return;
141 }
142 }
143
144 sockfd_ = fd;
145}
146
147void UdpStream::close() {
148 if (sockfd_ > 0) {
149 ::close(sockfd_);
150 sockfd_ = -1;
152 }
153}
154
156 if (sockfd_ < 0) {
157 this->open();
158 if (sockfd_ < 0) {
159 return false;
160 }
161 }
162
164 return true;
165 }
166
167 // upper layer support ping method ??
168 // Login();
170 return true;
171}
172
174 if (sockfd_ < 0) {
175 // not open
176 return false;
177 }
178
179 this->close();
180 return true;
181}
182
183size_t UdpStream::read(uint8_t* buffer, size_t max_length) {
184 ssize_t ret = 0;
185 struct sockaddr_in peer_sockaddr;
186 socklen_t socklenth = sizeof(peer_sockaddr);
187 bzero(&peer_sockaddr, sizeof(peer_sockaddr));
188 peer_sockaddr.sin_family = AF_INET;
189 peer_sockaddr.sin_port = peer_port_;
190 peer_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
191
192 while ((ret = ::recvfrom(sockfd_, buffer, max_length, 0,
193 (struct sockaddr*)&peer_sockaddr,
194 reinterpret_cast<socklen_t*>(&socklenth))) < 0) {
195 if (errno == EINTR) {
196 continue;
197 } else {
198 // error
199 if (errno != EAGAIN) {
201 errno_ = errno;
202 }
203 }
204
205 return 0;
206 }
207
208 return ret;
209}
210
211size_t UdpStream::write(const uint8_t* data, size_t length) {
212 size_t total_nsent = 0;
213 struct sockaddr_in peer_sockaddr;
214 bzero(&peer_sockaddr, sizeof(peer_sockaddr));
215 peer_sockaddr.sin_family = AF_INET;
216 peer_sockaddr.sin_port = peer_port_;
217 peer_sockaddr.sin_addr.s_addr = peer_addr_;
218
219 while (length > 0) {
220 ssize_t nsent =
221 ::sendto(sockfd_, data, length, 0, (struct sockaddr*)&peer_sockaddr,
222 (socklen_t)sizeof(peer_sockaddr));
223 if (nsent < 0) { // error
224 if (errno == EINTR) {
225 continue;
226 } else {
227 // error
228 if (errno == EPIPE || errno == ECONNRESET) {
230 errno_ = errno;
231 } else if (errno != EAGAIN) {
233 errno_ = errno;
234 }
235 return total_nsent;
236 }
237 }
238
239 total_nsent += nsent;
240 length -= nsent;
241 data += nsent;
242 }
243
244 return total_nsent;
245}
246
247} // namespace gnss
248} // namespace drivers
249} // namespace apollo
static Stream * create_udp(const char *address, uint16_t port, uint32_t timeout_usec=1000000)
Definition udp_stream.cc:57
virtual size_t read(uint8_t *buffer, size_t max_length)
virtual size_t write(const uint8_t *data, size_t length)
#define AERROR
Definition log.h:44
class register implement
Definition arena_queue.h:37