Apollo 10.0
自动驾驶开放平台
udp_bridge_receiver_component.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include "cyber/time/clock.h"
22
23namespace apollo {
24namespace bridge {
25
26#define BRIDGE_RECV_IMPL(pb_msg) \
27 template class UDPBridgeReceiverComponent<pb_msg>
28
29template <typename T>
31 : monitor_logger_buffer_(common::monitor::MonitorMessageItem::CONTROL) {}
32
33template <typename T>
35 for (auto proto : proto_list_) {
36 FREE_POINTER(proto);
37 }
38}
39
40template <typename T>
42 AINFO << "UDP bridge receiver init, startin...";
44 if (!this->GetProtoConfig(&udp_bridge_remote)) {
45 AINFO << "load udp bridge component proto param failed";
46 return false;
47 }
48 bind_port_ = udp_bridge_remote.bind_port();
49 proto_name_ = udp_bridge_remote.proto_name();
50 topic_name_ = udp_bridge_remote.topic_name();
51 enable_timeout_ = udp_bridge_remote.enable_timeout();
52 ADEBUG << "UDP Bridge remote port is: " << bind_port_;
53 ADEBUG << "UDP Bridge for Proto is: " << proto_name_;
54 writer_ = node_->CreateWriter<T>(topic_name_.c_str());
55
56 if (!InitSession((uint16_t)bind_port_)) {
57 return false;
58 }
59 ADEBUG << "initialize session successful.";
60 MsgDispatcher();
61 return true;
62}
63
64template <typename T>
67 port);
68}
69
70template <typename T>
71void UDPBridgeReceiverComponent<T>::MsgDispatcher() {
72 ADEBUG << "msg dispatcher start successful.";
73 listener_->Listen();
74}
75
76template <typename T>
77BridgeProtoDiserializedBuf<T>
78 *UDPBridgeReceiverComponent<T>::CreateBridgeProtoBuf(
79 const BridgeHeader &header) {
80 if (IsTimeout(header.GetTimeStamp())) {
81 typename std::vector<BridgeProtoDiserializedBuf<T> *>::iterator itor =
82 proto_list_.begin();
83 for (; itor != proto_list_.end();) {
84 if ((*itor)->IsTheProto(header)) {
85 BridgeProtoDiserializedBuf<T> *tmp = *itor;
86 FREE_POINTER(tmp);
87 itor = proto_list_.erase(itor);
88 break;
89 }
90 ++itor;
91 }
92 return nullptr;
93 }
94
95 for (auto proto : proto_list_) {
96 if (proto->IsTheProto(header)) {
97 return proto;
98 }
99 }
100 BridgeProtoDiserializedBuf<T> *proto_buf = new BridgeProtoDiserializedBuf<T>;
101 if (!proto_buf) {
102 return nullptr;
103 }
104 proto_buf->Initialize(header);
105 proto_list_.push_back(proto_buf);
106 return proto_buf;
107}
108
109template <typename T>
110bool UDPBridgeReceiverComponent<T>::IsProtoExist(const BridgeHeader &header) {
111 for (auto proto : proto_list_) {
112 if (proto->IsTheProto(header)) {
113 return true;
114 }
115 }
116 return false;
117}
118
119template <typename T>
120bool UDPBridgeReceiverComponent<T>::IsTimeout(double time_stamp) {
121 if (enable_timeout_ == false) {
122 return false;
123 }
124 double cur_time = apollo::cyber::Clock::NowInSeconds();
125 if (cur_time < time_stamp) {
126 return true;
127 }
128 if (FLAGS_timeout < cur_time - time_stamp) {
129 return true;
130 }
131 return false;
132}
133
134template <typename T>
136 struct sockaddr_in client_addr;
137 socklen_t sock_len = static_cast<socklen_t>(sizeof(client_addr));
138 int bytes = 0;
139 int total_recv = 2 * FRAME_SIZE;
140 char total_buf[2 * FRAME_SIZE] = {0};
141 bytes =
142 static_cast<int>(recvfrom(fd, total_buf, total_recv, 0,
143 (struct sockaddr *)&client_addr, &sock_len));
144 ADEBUG << "total recv " << bytes;
145 if (bytes <= 0 || bytes > total_recv) {
146 return false;
147 }
148 char header_flag[sizeof(BRIDGE_HEADER_FLAG) + 1] = {0};
149 size_t offset = 0;
150 memcpy(header_flag, total_buf, HEADER_FLAG_SIZE);
151 if (strcmp(header_flag, BRIDGE_HEADER_FLAG) != 0) {
152 AINFO << "header flag not match!";
153 return false;
154 }
155 offset += sizeof(BRIDGE_HEADER_FLAG) + 1;
156
157 char header_size_buf[sizeof(hsize) + 1] = {0};
158 const char *cursor = total_buf + offset;
159 memcpy(header_size_buf, cursor, sizeof(hsize));
160 hsize header_size = *(reinterpret_cast<hsize *>(header_size_buf));
161 offset += sizeof(hsize) + 1;
162 if (header_size > FRAME_SIZE || header_size < offset) {
163 AINFO << "header size is more than FRAME_SIZE or less than offset!";
164 return false;
165 }
166
167 BridgeHeader header;
168 size_t buf_size = header_size - offset;
169 cursor = total_buf + offset;
170 if (!header.Diserialize(cursor, buf_size)) {
171 AINFO << "header diserialize failed!";
172 return false;
173 }
174
175 ADEBUG << "proto name : " << header.GetMsgName().c_str();
176 ADEBUG << "proto sequence num: " << header.GetMsgID();
177 ADEBUG << "proto total frames: " << header.GetTotalFrames();
178 ADEBUG << "proto frame index: " << header.GetIndex();
179
180 std::lock_guard<std::mutex> lock(mutex_);
181 BridgeProtoDiserializedBuf<T> *proto_buf = CreateBridgeProtoBuf(header);
182 if (!proto_buf) {
183 return false;
184 }
185
186 cursor = total_buf + header_size;
187 if (header.GetFramePos() > header.GetMsgSize()) {
188 return false;
189 }
190 char *buf = proto_buf->GetBuf(header.GetFramePos());
191 // check cursor size
192 if (header.GetFrameSize() < 0 ||
193 header.GetFrameSize() > (total_recv - header_size)) {
194 return false;
195 }
196 // check buf size
197 if (header.GetFrameSize() > (header.GetMsgSize() - header.GetFramePos())) {
198 return false;
199 }
200 memcpy(buf, cursor, header.GetFrameSize());
201 proto_buf->UpdateStatus(header.GetIndex());
202 if (proto_buf->IsReadyDiserialize()) {
203 auto pb_msg = std::make_shared<T>();
204 proto_buf->Diserialized(pb_msg);
205 writer_->Write(pb_msg);
206 RemoveInvalidBuf(proto_buf->GetMsgID());
207 RemoveItem(&proto_list_, proto_buf);
208 }
209 return true;
210}
211
212template <typename T>
214 if (msg_id == 0) {
215 return false;
216 }
217 typename std::vector<BridgeProtoDiserializedBuf<T> *>::iterator itor =
218 proto_list_.begin();
219 for (; itor != proto_list_.end();) {
220 if ((*itor)->GetMsgID() < msg_id) {
221 BridgeProtoDiserializedBuf<T> *tmp = *itor;
222 FREE_POINTER(tmp);
223 itor = proto_list_.erase(itor);
224 continue;
225 }
226 ++itor;
227 }
228 return true;
229}
230
232} // namespace bridge
233} // namespace apollo
std::string GetMsgName() const
bool Diserialize(const char *buf, size_t buf_size)
uint32_t GetTotalFrames() const
static double NowInSeconds()
gets the current time in second.
Definition clock.cc:56
bool Initialize(const ComponentConfig &config) override
init the component by protobuf object.
Definition component.h:152
#define ADEBUG
Definition log.h:41
#define AINFO
Definition log.h:42
#define FREE_POINTER(p)
Definition macro.h:30
constexpr uint32_t FRAME_SIZE
Definition macro.h:36
constexpr char BRIDGE_HEADER_FLAG[]
constexpr size_t HEADER_FLAG_SIZE
bool RemoveItem(std::vector< T * > *list, const T *t)
Definition util.h:47
class register implement
Definition arena_queue.h:37
#define BRIDGE_RECV_IMPL(pb_msg)