Apollo 10.0
自动驾驶开放平台
udp_bridge_multi_receiver_component.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
17
18#include "cyber/time/clock.h"
22
23namespace apollo {
24namespace bridge {
25
27 : monitor_logger_buffer_(common::monitor::MonitorMessageItem::CONTROL) {}
28
30 AINFO << "UDP bridge multi :receiver init, startin...";
32 if (!this->GetProtoConfig(&udp_bridge_remote)) {
33 AINFO << "load udp bridge component proto param failed";
34 return false;
35 }
36 bind_port_ = udp_bridge_remote.bind_port();
37 enable_timeout_ = udp_bridge_remote.enable_timeout();
38 ADEBUG << "UDP Bridge remote port is: " << bind_port_;
39
40 if (!InitSession((uint16_t)bind_port_)) {
41 return false;
42 }
43 ADEBUG << "initialize session successful.";
45 return true;
46}
47
49 return listener_->Initialize(
51}
52
54 ADEBUG << "msg dispatcher start successful.";
55 listener_->Listen();
56}
57
58std::shared_ptr<ProtoDiserializedBufBase>
60 const BridgeHeader &header) {
61 std::shared_ptr<ProtoDiserializedBufBase> proto_buf;
62 if (IsTimeout(header.GetTimeStamp())) {
63 std::vector<std::shared_ptr<ProtoDiserializedBufBase>>::iterator itor =
64 proto_list_.begin();
65 for (; itor != proto_list_.end();) {
66 if ((*itor)->IsTheProto(header)) {
67 itor = proto_list_.erase(itor);
68 break;
69 }
70 ++itor;
71 }
72 return proto_buf;
73 }
74
75 for (auto proto : proto_list_) {
76 if (proto->IsTheProto(header)) {
77 return proto;
78 }
79 }
80
82 if (!proto_buf) {
83 return proto_buf;
84 }
85 proto_buf->Initialize(header, node_);
86 proto_list_.push_back(proto_buf);
87 return proto_buf;
88}
89
91 for (auto proto : proto_list_) {
92 if (proto->IsTheProto(header)) {
93 return true;
94 }
95 }
96 return false;
97}
98
100 if (enable_timeout_ == false) {
101 return false;
102 }
103 double cur_time = apollo::cyber::Clock::NowInSeconds();
104 if (cur_time < time_stamp) {
105 return true;
106 }
107 if (FLAGS_timeout < cur_time - time_stamp) {
108 return true;
109 }
110 return false;
111}
112
114 struct sockaddr_in client_addr;
115 socklen_t sock_len = static_cast<socklen_t>(sizeof(client_addr));
116 int total_recv = 2 * FRAME_SIZE;
117 char total_buf[2 * FRAME_SIZE] = {0};
118 int bytes =
119 static_cast<int>(recvfrom(fd, total_buf, total_recv, 0,
120 (struct sockaddr *)&client_addr, &sock_len));
121 if (bytes <= 0 || bytes > total_recv) {
122 return false;
123 }
124
125 if (strncmp(total_buf, BRIDGE_HEADER_FLAG, HEADER_FLAG_SIZE) != 0) {
126 AERROR << "Header flag didn't match!";
127 return false;
128 }
129 size_t offset = HEADER_FLAG_SIZE + 1;
130
131 const char *cursor = total_buf + offset;
132 hsize header_size = *(reinterpret_cast<const hsize *>(cursor));
133 offset += sizeof(hsize) + 1;
134
135 if (header_size < offset || header_size > FRAME_SIZE) {
136 AERROR << "header size is more than FRAME_SIZE!";
137 return false;
138 }
139
140 cursor = total_buf + offset;
141 size_t buf_size = header_size - offset;
142 BridgeHeader header;
143 if (!header.Diserialize(cursor, buf_size)) {
144 AERROR << "header diserialize failed!";
145 return false;
146 }
147
148 ADEBUG << "proto name : " << header.GetMsgName().c_str();
149 ADEBUG << "proto sequence num: " << header.GetMsgID();
150 ADEBUG << "proto total frames: " << header.GetTotalFrames();
151 ADEBUG << "proto frame index: " << header.GetIndex();
152
153 std::lock_guard<std::mutex> lock(mutex_);
154 std::shared_ptr<ProtoDiserializedBufBase> proto_buf =
155 CreateBridgeProtoBuf(header);
156 if (!proto_buf) {
157 return false;
158 }
159
160 cursor = total_buf + header_size;
161 if (header.GetFramePos() > header.GetMsgSize()) {
162 return false;
163 }
164 char *buf = proto_buf->GetBuf(header.GetFramePos());
165 // check cursor size
166 if (header.GetFrameSize() < 0 ||
167 header.GetFrameSize() > (total_recv - header_size)) {
168 return false;
169 }
170 // check buf size
171 if (header.GetFrameSize() > (header.GetMsgSize() - header.GetFramePos())) {
172 return false;
173 }
174 memcpy(buf, cursor, header.GetFrameSize());
175 proto_buf->UpdateStatus(header.GetIndex());
176 if (proto_buf->IsReadyDiserialize()) {
177 proto_buf->DiserializedAndPub();
178 RemoveInvalidBuf(proto_buf->GetMsgID(), proto_buf->GetMsgName());
179 RemoveItem(&proto_list_, proto_buf);
180 }
181 return true;
182}
183
184bool UDPBridgeMultiReceiverComponent::RemoveInvalidBuf(
185 uint32_t msg_id, const std::string &msg_name) {
186 if (msg_id == 0) {
187 return false;
188 }
189 std::vector<std::shared_ptr<ProtoDiserializedBufBase>>::iterator itor =
190 proto_list_.begin();
191 for (; itor != proto_list_.end();) {
192 if ((*itor)->GetMsgID() < msg_id &&
193 strcmp((*itor)->GetMsgName().c_str(), msg_name.c_str()) == 0) {
194 itor = proto_list_.erase(itor);
195 continue;
196 }
197 ++itor;
198 }
199 return true;
200}
201
202} // namespace bridge
203} // namespace apollo
std::string GetMsgName() const
bool Diserialize(const char *buf, size_t buf_size)
uint32_t GetTotalFrames() const
static std::shared_ptr< ProtoDiserializedBufBase > CreateObj(const BridgeHeader &header)
std::shared_ptr< ProtoDiserializedBufBase > CreateBridgeProtoBuf(const BridgeHeader &header)
static double NowInSeconds()
gets the current time in second.
Definition clock.cc:56
bool GetProtoConfig(T *config) const
std::shared_ptr< Node > node_
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
constexpr uint32_t FRAME_SIZE
Definition macro.h:36
constexpr char BRIDGE_HEADER_FLAG[]
constexpr size_t HEADER_FLAG_SIZE
bool RemoveItem(std::vector< T * > *list, const T *t)
Definition util.h:47
class register implement
Definition arena_queue.h:37