Apollo 10.0
自动驾驶开放平台
udp_bridge_receiver_component.cc 文件参考
udp_bridge_receiver_component.cc 的引用(Include)关系图:

浏览源代码.

命名空间

namespace  apollo
 class register implement
 
namespace  apollo::bridge
 

宏定义

#define BRIDGE_RECV_IMPL(pb_msg)    template class UDPBridgeReceiverComponent<pb_msg>
 

函数

 apollo::bridge::BRIDGE_RECV_IMPL (canbus::Chassis)
 

宏定义说明

◆ BRIDGE_RECV_IMPL

#define BRIDGE_RECV_IMPL (   pb_msg)     template class UDPBridgeReceiverComponent<pb_msg>

在文件 udp_bridge_receiver_component.cc26 行定义.

30 : monitor_logger_buffer_(common::monitor::MonitorMessageItem::CONTROL) {}
31
32template <typename T>
33UDPBridgeReceiverComponent<T>::~UDPBridgeReceiverComponent() {
34 for (auto proto : proto_list_) {
35 FREE_POINTER(proto);
36 }
37}
38
39template <typename T>
40bool UDPBridgeReceiverComponent<T>::Init() {
41 AINFO << "UDP bridge receiver init, startin...";
43 if (!this->GetProtoConfig(&udp_bridge_remote)) {
44 AINFO << "load udp bridge component proto param failed";
45 return false;
46 }
47 bind_port_ = udp_bridge_remote.bind_port();
48 proto_name_ = udp_bridge_remote.proto_name();
49 topic_name_ = udp_bridge_remote.topic_name();
50 enable_timeout_ = udp_bridge_remote.enable_timeout();
51 ADEBUG << "UDP Bridge remote port is: " << bind_port_;
52 ADEBUG << "UDP Bridge for Proto is: " << proto_name_;
53 writer_ = node_->CreateWriter<T>(topic_name_.c_str());
54
55 if (!InitSession((uint16_t)bind_port_)) {
56 return false;
57 }
58 ADEBUG << "initialize session successful.";
59 MsgDispatcher();
60 return true;
61}
62
63template <typename T>
64bool UDPBridgeReceiverComponent<T>::InitSession(uint16_t port) {
65 return listener_->Initialize(this, &UDPBridgeReceiverComponent<T>::MsgHandle,
66 port);
67}
68
69template <typename T>
70void UDPBridgeReceiverComponent<T>::MsgDispatcher() {
71 ADEBUG << "msg dispatcher start successful.";
72 listener_->Listen();
73}
74
75template <typename T>
76BridgeProtoDiserializedBuf<T>
77 *UDPBridgeReceiverComponent<T>::CreateBridgeProtoBuf(
78 const BridgeHeader &header) {
79 if (IsTimeout(header.GetTimeStamp())) {
80 typename std::vector<BridgeProtoDiserializedBuf<T> *>::iterator itor =
81 proto_list_.begin();
82 for (; itor != proto_list_.end();) {
83 if ((*itor)->IsTheProto(header)) {
84 BridgeProtoDiserializedBuf<T> *tmp = *itor;
85 FREE_POINTER(tmp);
86 itor = proto_list_.erase(itor);
87 break;
88 }
89 ++itor;
90 }
91 return nullptr;
92 }
93
94 for (auto proto : proto_list_) {
95 if (proto->IsTheProto(header)) {
96 return proto;
97 }
98 }
99 BridgeProtoDiserializedBuf<T> *proto_buf = new BridgeProtoDiserializedBuf<T>;
100 if (!proto_buf) {
101 return nullptr;
102 }
103 proto_buf->Initialize(header);
104 proto_list_.push_back(proto_buf);
105 return proto_buf;
106}
107
108template <typename T>
109bool UDPBridgeReceiverComponent<T>::IsProtoExist(const BridgeHeader &header) {
110 for (auto proto : proto_list_) {
111 if (proto->IsTheProto(header)) {
112 return true;
113 }
114 }
115 return false;
116}
117
118template <typename T>
119bool UDPBridgeReceiverComponent<T>::IsTimeout(double time_stamp) {
120 if (enable_timeout_ == false) {
121 return false;
122 }
123 double cur_time = apollo::cyber::Clock::NowInSeconds();
124 if (cur_time < time_stamp) {
125 return true;
126 }
127 if (FLAGS_timeout < cur_time - time_stamp) {
128 return true;
129 }
130 return false;
131}
132
133template <typename T>
134bool UDPBridgeReceiverComponent<T>::MsgHandle(int fd) {
135 struct sockaddr_in client_addr;
136 socklen_t sock_len = static_cast<socklen_t>(sizeof(client_addr));
137 int bytes = 0;
138 int total_recv = 2 * FRAME_SIZE;
139 char total_buf[2 * FRAME_SIZE] = {0};
140 bytes =
141 static_cast<int>(recvfrom(fd, total_buf, total_recv, 0,
142 (struct sockaddr *)&client_addr, &sock_len));
143 ADEBUG << "total recv " << bytes;
144 if (bytes <= 0 || bytes > total_recv) {
145 return false;
146 }
147 char header_flag[sizeof(BRIDGE_HEADER_FLAG) + 1] = {0};
148 size_t offset = 0;
149 memcpy(header_flag, total_buf, HEADER_FLAG_SIZE);
150 if (strcmp(header_flag, BRIDGE_HEADER_FLAG) != 0) {
151 AINFO << "header flag not match!";
152 return false;
153 }
154 offset += sizeof(BRIDGE_HEADER_FLAG) + 1;
155
156 char header_size_buf[sizeof(hsize) + 1] = {0};
157 const char *cursor = total_buf + offset;
158 memcpy(header_size_buf, cursor, sizeof(hsize));
159 hsize header_size = *(reinterpret_cast<hsize *>(header_size_buf));
160 offset += sizeof(hsize) + 1;
161 if (header_size > FRAME_SIZE || header_size < offset) {
162 AINFO << "header size is more than FRAME_SIZE or less than offset!";
163 return false;
164 }
165
166 BridgeHeader header;
167 size_t buf_size = header_size - offset;
168 cursor = total_buf + offset;
169 if (!header.Diserialize(cursor, buf_size)) {
170 AINFO << "header diserialize failed!";
171 return false;
172 }
173
174 ADEBUG << "proto name : " << header.GetMsgName().c_str();
175 ADEBUG << "proto sequence num: " << header.GetMsgID();
176 ADEBUG << "proto total frames: " << header.GetTotalFrames();
177 ADEBUG << "proto frame index: " << header.GetIndex();
178
179 std::lock_guard<std::mutex> lock(mutex_);
180 BridgeProtoDiserializedBuf<T> *proto_buf = CreateBridgeProtoBuf(header);
181 if (!proto_buf) {
182 return false;
183 }
184
185 cursor = total_buf + header_size;
186 if (header.GetFramePos() > header.GetMsgSize()) {
187 return false;
188 }
189 char *buf = proto_buf->GetBuf(header.GetFramePos());
190 // check cursor size
191 if (header.GetFrameSize() < 0 ||
192 header.GetFrameSize() > (total_recv - header_size)) {
193 return false;
194 }
195 // check buf size
196 if (header.GetFrameSize() > (header.GetMsgSize() - header.GetFramePos())) {
197 return false;
198 }
199 memcpy(buf, cursor, header.GetFrameSize());
200 proto_buf->UpdateStatus(header.GetIndex());
201 if (proto_buf->IsReadyDiserialize()) {
202 auto pb_msg = std::make_shared<T>();
203 proto_buf->Diserialized(pb_msg);
204 writer_->Write(pb_msg);
205 RemoveInvalidBuf(proto_buf->GetMsgID());
206 RemoveItem(&proto_list_, proto_buf);
207 }
208 return true;
209}
210
211template <typename T>
212bool UDPBridgeReceiverComponent<T>::RemoveInvalidBuf(uint32_t msg_id) {
213 if (msg_id == 0) {
214 return false;
215 }
216 typename std::vector<BridgeProtoDiserializedBuf<T> *>::iterator itor =
217 proto_list_.begin();
218 for (; itor != proto_list_.end();) {
219 if ((*itor)->GetMsgID() < msg_id) {
220 BridgeProtoDiserializedBuf<T> *tmp = *itor;
221 FREE_POINTER(tmp);
222 itor = proto_list_.erase(itor);
223 continue;
224 }
225 ++itor;
226 }
227 return true;
228}
229
230BRIDGE_RECV_IMPL(canbus::Chassis);
231} // namespace bridge
232} // namespace apollo
static double NowInSeconds()
gets the current time in second.
Definition clock.cc:56
#define ADEBUG
Definition log.h:41
#define AINFO
Definition log.h:42
#define FREE_POINTER(p)
Definition macro.h:30
constexpr uint32_t FRAME_SIZE
Definition macro.h:36
constexpr char BRIDGE_HEADER_FLAG[]
bool RemoveItem(std::vector< T * > *list, const T *t)
Definition util.h:47
#define BRIDGE_RECV_IMPL(pb_msg)