Apollo 11.0
自动驾驶开放平台
socket_input.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2019 The Apollo Authors. All Rights Reserved.
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#include <arpa/inet.h>
18#include <fcntl.h>
19#include <poll.h>
20#include <sys/file.h>
21#include <sys/socket.h>
22#include <unistd.h>
23
24#include <cerrno>
25#include <memory>
26
27#include "cyber/cyber.h"
30
31namespace apollo {
32namespace drivers {
33namespace video {
34
36// InputSocket class implementation
38
44SocketInput::SocketInput() : sockfd_(-1), port_(0) {
45 pkg_num_ = 0;
46 bytes_num_ = 0;
47 frame_id_ = 0;
48}
49
52 if (buf_) {
53 delete[] buf_;
54 }
55 if (pdu_) {
56 delete[] pdu_;
57 }
58 (void)close(sockfd_);
59}
60
61void SocketInput::Init(uint32_t port) {
62 if (sockfd_ != -1) {
63 (void)close(sockfd_);
64 }
65
66 sockfd_ = socket(AF_INET, SOCK_DGRAM, 0);
67 if (sockfd_ < 0) {
68 AERROR << "Failed to create socket fd.";
69 return;
70 }
71
72 // Connect to camera UDP port
73 AINFO << "Opening UDP socket on port: " << uint16_t(port);
74 port_ = port;
75 sockaddr_in my_addr;
76 memset(&my_addr, '\0', sizeof(my_addr));
77 my_addr.sin_family = AF_INET;
78 my_addr.sin_port = htons(uint16_t(port));
79 my_addr.sin_addr.s_addr = INADDR_ANY;
80 if (bind(sockfd_, reinterpret_cast<sockaddr *>(&my_addr),
81 sizeof(sockaddr_in)) < 0) {
82 AERROR << "Failed to bind socket on local address.";
83 }
84
85 if (fcntl(sockfd_, F_SETFL, O_NONBLOCK | FASYNC) < 0) {
86 AERROR << "Failed to enable non-blocking I/O.";
87 }
88
89 const int rbuf = 4 * 1024 * 1024;
90 if (setsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &rbuf, sizeof(int)) < 0) {
91 AERROR << "Failed to enable socket receive buffer.";
92 }
93
94 int enable = 1;
95 if (setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) {
96 AERROR << "Failed to enable socket reuseable address.";
97 }
98
99 buf_ = new uint8_t[H265_FRAME_PACKAGE_SIZE];
100 if (!buf_) {
101 AERROR << "Failed to allocate H265 frame package buffer.";
102 }
103 pdu_ = new uint8_t[H265_PDU_SIZE];
104 if (!pdu_) {
105 AERROR << "Failed to allocate H265 PDU buffer.";
106 }
107 // _metric_controller->Init();
108 AINFO << "Camera socket fd: " << sockfd_ << ", port: " << port_;
109}
110
112int SocketInput::GetFramePacket(std::shared_ptr<CompressedImage> h265Pb) {
113 uint8_t *frame_data = &buf_[0];
114 uint8_t *pdu_data = &pdu_[0];
115 int total = 0;
116 int pkg_len = 0;
117 size_t frame_len = 0;
118 uint16_t pre_seq = 0;
119
120 do {
121 if (!InputAvailable(POLL_TIMEOUT)) {
122 return SOCKET_TIMEOUT;
123 }
124 // Receive packets that should now be available from the socket using
125 // a blocking read.
126 ssize_t pdu_len = recvfrom(sockfd_, pdu_data, H265_PDU_SIZE, 0, NULL, NULL);
127 if (pdu_len < 0) {
128 if (errno != EWOULDBLOCK) {
129 AERROR << "Failed to receive package from port: " << port_;
130 return RECEIVE_FAIL;
131 }
132 }
133
134 AINFO << "Received pdu length: " << pdu_len << " from port: " << port_;
135 HwPduPacket *pdu_pkg = reinterpret_cast<HwPduPacket *>(&pdu_[0]);
136 uint16_t local_seq = ntohs(pdu_pkg->rtp_header.seq);
137 AINFO << "Package seq number: " << local_seq;
138 if (local_seq - pre_seq != 1 && pre_seq > 1 && local_seq > 0) {
139 AERROR << "Error! port: " << port_
140 << ", package sequence is wrong. curent/pre " << local_seq << "/"
141 << pre_seq;
142 }
143 pre_seq = local_seq;
144
145 if (ntohl(pdu_pkg->header.magic0) == HW_CAMERA_MAGIC0 &&
146 ntohl(pdu_pkg->header.magic1) == HW_CAMERA_MAGIC1) {
147 // Receive camera frame head
148 if (total) {
149 AERROR << "Error! lost package for last frame, left bytes: " << total;
150 }
151 AINFO << "Received new frame from port: " << port_;
152
153 uint32_t frame_id = ntohl(pdu_pkg->header.frame_id);
154 if (frame_id - frame_id_ != 1 && frame_id_ > 1 && frame_id > 1) {
155 AERROR << "Error! port: " << port_
156 << ", lose Frame. pre_frame_id/frame_id " << frame_id_ << "/"
157 << frame_id;
158 }
159 frame_id_ = frame_id;
160
161 cyber::Time image_time(ntohl(pdu_pkg->header.ts_sec),
162 1000 * ntohl(pdu_pkg->header.ts_usec));
163 // AINFO << "image_time second: " << ntohl(pdu_pkg->header.ts_sec) <<
164 // " usec: " << ntohl(pdu_pkg->header.ts_usec);
165 uint64_t camera_timestamp = image_time.ToNanosecond();
166 h265Pb->mutable_header()->set_camera_timestamp(camera_timestamp);
167 h265Pb->set_measurement_time(image_time.ToSecond());
168 h265Pb->set_format("h265");
169 h265Pb->set_frame_type(static_cast<int>(pdu_pkg->header.frame_type));
170 AINFO << "Port: " << port_
171 << ", received frame size: " << ntohl(pdu_pkg->header.frame_size)
172 << ", frame type: " << static_cast<int>(pdu_pkg->header.frame_type)
173 << ", PhyNo: " << static_cast<int>(pdu_pkg->header.PhyNo)
174 << ", frame id: " << frame_id;
175
176 frame_len = ntohl(pdu_pkg->header.frame_size);
177 total = static_cast<int>(frame_len);
178 frame_data = &buf_[0];
179 continue;
180 }
181 // Receive camera frame data
182 if (total > 0) {
183 pkg_len = static_cast<int>(pdu_len - sizeof(RtpHeader));
184 memcpy(frame_data, pdu_data + sizeof(RtpHeader), pkg_len);
185 total -= pkg_len;
186 frame_data += pkg_len;
187 // AINFO << "receive pkg data " << pkg_len << "/" << total << "/"
188 // << frame_len;
189 }
190 if (total <= 0) {
191 total = 0;
192 // AINFO << "receive frame data " << pkg_len << "/" << total << "/"
193 // << frame_len;
194 if (frame_len > 0) {
195 h265Pb->set_data(buf_, frame_len);
196 break;
197 }
198 AERROR << "Error! frame info is wrong. frame length: " << frame_len;
199 }
200 } while (true);
201
202 return 0;
203}
204
205bool SocketInput::InputAvailable(int timeout) {
206 (void)timeout;
207 struct pollfd fds[1];
208 fds[0].fd = sockfd_;
209 fds[0].events = POLLIN;
210 // Unfortunately, the Linux kernel recvfrom() implementation
211 // uses a non-interruptible sleep() when waiting for data,
212 // which would cause this method to hang if the device is not
213 // providing data. We poll() the device first to make sure
214 // the recvfrom() will not block.
215 // Note that, however, there is a known Linux kernel bug:
216 // Under Linux, select() may report a socket file descriptor
217 // as "ready for reading", while nevertheless a subsequent
218 // read blocks. This could for example happen when data has
219 // arrived but upon examination has wrong checksum and is
220 // discarded. There may be other circumstances in which a
221 // file descriptor is spuriously reported as ready. Thus it
222 // may be safer to use O_NONBLOCK on sockets that should not
223 // block.
224
225 // poll() until input available
226 do {
227 int ret = poll(fds, 1, POLL_TIMEOUT);
228 if (ret < 0) {
229 if (errno != EINTR) {
230 AERROR << "H265 camera port: " << port_
231 << "poll() error: " << strerror(errno);
232 }
233 return false;
234 }
235
236 // Timeout
237 if (ret == 0) {
238 return false;
239 }
240
241 if ((fds[0].revents & POLLERR) || (fds[0].revents & POLLHUP) ||
242 (fds[0].revents & POLLNVAL)) {
243 AERROR << "Error! poll failed on H265 camera port: " << port_;
244 return false;
245 }
246 } while (!(fds[0].revents & POLLIN));
247
248 return true;
249}
250
251} // namespace video
252} // namespace drivers
253} // namespace apollo
Cyber has builtin time type Time.
Definition time.h:31
uint64_t ToNanosecond() const
convert time to nanosecond.
Definition time.cc:83
double ToSecond() const
convert time to second.
Definition time.cc:77
int GetFramePacket(std::shared_ptr< CompressedImage > h265)
Get one camera packet.
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
class register implement
Definition arena_queue.h:37
@ POLL_TIMEOUT
#define HW_CAMERA_MAGIC1
Definition input.h:42
#define HW_CAMERA_MAGIC0
Definition input.h:41