Apollo 10.0
自动驾驶开放平台
tcp_stream.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2017 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#include <arpa/inet.h>
18#include <fcntl.h>
19#include <netinet/in.h>
20#include <netinet/tcp.h>
21#include <sys/socket.h>
22#include <sys/types.h>
23#include <unistd.h>
24
25#include <cerrno>
26#include <cinttypes>
27#include <iostream>
28
29#include "cyber/cyber.h"
32
33namespace apollo {
34namespace drivers {
35namespace gnss {
36
37TcpStream::TcpStream(const char* address, uint16_t port, uint32_t timeout_usec,
38 bool auto_reconnect)
39 : sockfd_(-1), errno_(0), auto_reconnect_(auto_reconnect) {
40 peer_addr_ = inet_addr(address);
41 peer_port_ = htons(port);
42 timeout_usec_ = timeout_usec;
43}
44
45TcpStream::~TcpStream() { this->close(); }
46
47void TcpStream::open() {
48 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
49 if (fd < 0) {
50 // error
51 AERROR << "create socket failed, errno: " << errno << ", "
52 << strerror(errno);
53 return;
54 }
55
56 sockfd_ = fd;
57}
58
59bool TcpStream::InitSocket() {
60 if (sockfd_ < 0) {
61 return false;
62 }
63
64 // block or not block
65 if (timeout_usec_ != 0) {
66 int flags = fcntl(sockfd_, F_GETFL, 0);
67 if (flags == -1) {
68 ::close(sockfd_);
69 AERROR << "fcntl get flag failed, error: " << strerror(errno);
70 return false;
71 }
72
73 if (fcntl(sockfd_, F_SETFL, flags & ~O_NONBLOCK) == -1) {
74 ::close(sockfd_);
75 AERROR << "fcntl set block failed, error: " << strerror(errno);
76 return false;
77 }
78
79 timeval block_to = {timeout_usec_ / 1000000, timeout_usec_ % 1000000};
80 if (setsockopt(sockfd_, SOL_SOCKET, SO_RCVTIMEO, &block_to,
81 sizeof(block_to)) < 0) {
82 ::close(sockfd_);
83 AERROR << "setsockopt set rcv timeout failed, error: " << strerror(errno);
84 return false;
85 }
86
87 if (setsockopt(sockfd_, SOL_SOCKET, SO_SNDTIMEO, &block_to,
88 sizeof(block_to)) < 0) {
89 ::close(sockfd_);
90 AERROR << "setsockopt set snd timeout failed, error: " << strerror(errno);
91 return false;
92 }
93 } else {
94 int flags = fcntl(sockfd_, F_GETFL, 0);
95 if (flags == -1) {
96 ::close(sockfd_);
97 AERROR << "fcntl get flag failed, error: " << strerror(errno);
98 return false;
99 }
100
101 if (fcntl(sockfd_, F_SETFL, flags | O_NONBLOCK) == -1) {
102 ::close(sockfd_);
103 AERROR << "fcntl set non block failed, error: " << strerror(errno);
104 return false;
105 }
106 }
107
108 // disable Nagle
109 int ret = 0;
110 int enable = 1;
111 ret = setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY,
112 reinterpret_cast<void*>(&enable), sizeof(enable));
113 if (ret == -1) {
114 ::close(sockfd_);
115 AERROR << "setsockopt disable Nagle failed, errno: " << errno << ", "
116 << strerror(errno);
117 return false;
118 }
119
120 return true;
121}
122
123void TcpStream::close() {
124 if (sockfd_ > 0) {
125 ::close(sockfd_);
126 sockfd_ = -1;
128 }
129}
130
131bool TcpStream::Reconnect() {
132 if (auto_reconnect_) {
133 Disconnect();
134 if (Connect()) {
135 return true;
136 }
137 }
138 return false;
139}
140
142 if (sockfd_ < 0) {
143 this->open();
144 if (sockfd_ < 0) {
145 // error
146 return false;
147 }
148 }
149
151 return true;
152 }
153
154 fd_set fds;
155 timeval timeo = {10, 0};
156 int ret = 0;
157 sockaddr_in peer_addr;
158
159 bzero(&peer_addr, sizeof(peer_addr));
160 peer_addr.sin_family = AF_INET;
161 peer_addr.sin_port = peer_port_;
162 peer_addr.sin_addr.s_addr = peer_addr_;
163
164 int fd_flags = fcntl(sockfd_, F_GETFL);
165 if (fd_flags < 0 || fcntl(sockfd_, F_SETFL, fd_flags | O_NONBLOCK) < 0) {
166 AERROR << "Failed to set noblock, error: " << strerror(errno);
167 return false;
168 }
169
170 while ((ret = ::connect(sockfd_, reinterpret_cast<sockaddr*>(&peer_addr),
171 sizeof(peer_addr))) < 0) {
172 if (errno == EINTR) {
173 AINFO << "Tcp connect return EINTR, continue.";
174 continue;
175 } else {
176 if ((errno != EISCONN) && (errno != EINPROGRESS) && (errno != EALREADY)) {
178 errno_ = errno;
179 AERROR << "Connect failed, error: " << strerror(errno);
180 return false;
181 }
182
183 FD_ZERO(&fds);
184 FD_SET(sockfd_, &fds);
185 ret = select(sockfd_ + 1, NULL, &fds, NULL, &timeo);
186 if (ret < 0) {
188 errno_ = errno;
189 AERROR << "Wait connect failed, error: " << strerror(errno);
190 return false;
191 } else if (ret == 0) {
192 AINFO << "Tcp connect timeout.";
193 return false;
194 } else if (FD_ISSET(sockfd_, &fds)) {
195 int error = 0;
196 socklen_t len = sizeof(int);
197
198 if (getsockopt(sockfd_, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
200 errno_ = errno;
201 AERROR << "Getsockopt failed, error: " << strerror(errno);
202 return false;
203 }
204 if (error != 0) {
206 errno_ = errno;
207 AERROR << "Socket error: " << strerror(errno);
208 return false;
209 }
210
211 // connect successfully
212 break;
213 } else {
215 errno_ = errno;
216 AERROR << "Should not be here.";
217 return false;
218 }
219 }
220 }
221
222 if (!InitSocket()) {
223 close();
225 errno_ = errno;
226 AERROR << "Failed to init socket.";
227 return false;
228 }
229 AINFO << "Tcp connect success.";
231 Login();
232 return true;
233}
234
236 if (sockfd_ < 0) {
237 // not open
238 return false;
239 }
240
241 this->close();
242 return true;
243}
244
245size_t TcpStream::read(uint8_t* buffer, size_t max_length) {
246 ssize_t ret = 0;
247
249 Reconnect();
251 return 0;
252 }
253 }
254
255 if (!Readable(10000)) {
256 return 0;
257 }
258
259 while ((ret = ::recv(sockfd_, buffer, max_length, 0)) < 0) {
260 if (errno == EINTR) {
261 continue;
262 } else {
263 // error
264 if (errno != EAGAIN) {
266 errno_ = errno;
267 AERROR << "Read errno " << errno << ", error " << strerror(errno);
268 }
269 }
270
271 return 0;
272 }
273
274 if (ret == 0) {
276 errno_ = errno;
277 AERROR << "Remote closed.";
278 if (Reconnect()) {
279 AINFO << "Reconnect tcp success.";
280 }
281 }
282
283 return ret;
284}
285
286size_t TcpStream::write(const uint8_t* buffer, size_t length) {
287 size_t total_nsent = 0;
288
290 Reconnect();
292 return 0;
293 }
294 }
295
296 while (length > 0) {
297 ssize_t nsent = ::send(sockfd_, buffer, length, 0);
298 if (nsent < 0) {
299 if (errno == EINTR) {
300 continue;
301 } else {
302 // error
303 if (errno == EPIPE || errno == ECONNRESET) {
305 errno_ = errno;
306 } else if (errno != EAGAIN) {
308 errno_ = errno;
309 }
310 return total_nsent;
311 }
312 }
313
314 total_nsent += nsent;
315 length -= nsent;
316 buffer += nsent;
317 }
318
319 return total_nsent;
320}
321
322bool TcpStream::Readable(uint32_t timeout_us) {
323 // Setup a select call to block for serial data or a timeout
324 timespec timeout_ts;
325 fd_set readfds;
326 FD_ZERO(&readfds);
327 FD_SET(sockfd_, &readfds);
328
329 timeout_ts.tv_sec = timeout_us / 1000000;
330 timeout_ts.tv_nsec = (timeout_us % 1000000) * 1000;
331 int r = pselect(sockfd_ + 1, &readfds, NULL, NULL, &timeout_ts, NULL);
332 if (r < 0) {
334 errno_ = errno;
335 AERROR << "Failed to wait tcp data: " << errno << ", " << strerror(errno);
336 return false;
337 } else if (r == 0 || !FD_ISSET(sockfd_, &readfds)) {
338 return false;
339 }
340 // Data available to read.
341 return true;
342}
343
344Stream* Stream::create_tcp(const char* address, uint16_t port,
345 uint32_t timeout_usec) {
346 return new TcpStream(address, port, timeout_usec);
347}
348
349} // namespace gnss
350} // namespace drivers
351} // namespace apollo
static Stream * create_tcp(const char *address, uint16_t port, uint32_t timeout_usec=1000000)
virtual size_t read(uint8_t *buffer, size_t max_length)
virtual size_t write(const uint8_t *data, size_t length)
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
class register implement
Definition arena_queue.h:37