30 : monitor_logger_buffer_(common::monitor::MonitorMessageItem::CONTROL) {}
31
32template <typename T>
33UDPBridgeReceiverComponent<T>::~UDPBridgeReceiverComponent() {
34 for (auto proto : proto_list_) {
36 }
37}
38
39template <typename T>
40bool UDPBridgeReceiverComponent<T>::Init() {
41 AINFO <<
"UDP bridge receiver init, startin...";
43 if (!this->GetProtoConfig(&udp_bridge_remote)) {
44 AINFO <<
"load udp bridge component proto param failed";
45 return false;
46 }
47 bind_port_ = udp_bridge_remote.
bind_port();
51 ADEBUG <<
"UDP Bridge remote port is: " << bind_port_;
52 ADEBUG <<
"UDP Bridge for Proto is: " << proto_name_;
53 writer_ = node_->CreateWriter<T>(topic_name_.c_str());
54
55 if (!InitSession((uint16_t)bind_port_)) {
56 return false;
57 }
58 ADEBUG <<
"initialize session successful.";
59 MsgDispatcher();
60 return true;
61}
62
63template <typename T>
64bool UDPBridgeReceiverComponent<T>::InitSession(uint16_t port) {
65 return listener_->Initialize(this, &UDPBridgeReceiverComponent<T>::MsgHandle,
66 port);
67}
68
69template <typename T>
70void UDPBridgeReceiverComponent<T>::MsgDispatcher() {
71 ADEBUG <<
"msg dispatcher start successful.";
72 listener_->Listen();
73}
74
75template <typename T>
76BridgeProtoDiserializedBuf<T>
77 *UDPBridgeReceiverComponent<T>::CreateBridgeProtoBuf(
78 const BridgeHeader &header) {
79 if (IsTimeout(header.GetTimeStamp())) {
80 typename std::vector<BridgeProtoDiserializedBuf<T> *>::iterator itor =
81 proto_list_.begin();
82 for (; itor != proto_list_.end();) {
83 if ((*itor)->IsTheProto(header)) {
84 BridgeProtoDiserializedBuf<T> *tmp = *itor;
86 itor = proto_list_.erase(itor);
87 break;
88 }
89 ++itor;
90 }
91 return nullptr;
92 }
93
94 for (auto proto : proto_list_) {
95 if (proto->IsTheProto(header)) {
96 return proto;
97 }
98 }
99 BridgeProtoDiserializedBuf<T> *proto_buf = new BridgeProtoDiserializedBuf<T>;
100 if (!proto_buf) {
101 return nullptr;
102 }
103 proto_buf->Initialize(header);
104 proto_list_.push_back(proto_buf);
105 return proto_buf;
106}
107
108template <typename T>
109bool UDPBridgeReceiverComponent<T>::IsProtoExist(const BridgeHeader &header) {
110 for (auto proto : proto_list_) {
111 if (proto->IsTheProto(header)) {
112 return true;
113 }
114 }
115 return false;
116}
117
118template <typename T>
119bool UDPBridgeReceiverComponent<T>::IsTimeout(double time_stamp) {
120 if (enable_timeout_ == false) {
121 return false;
122 }
124 if (cur_time < time_stamp) {
125 return true;
126 }
127 if (FLAGS_timeout < cur_time - time_stamp) {
128 return true;
129 }
130 return false;
131}
132
133template <typename T>
134bool UDPBridgeReceiverComponent<T>::MsgHandle(int fd) {
135 struct sockaddr_in client_addr;
136 socklen_t sock_len = static_cast<socklen_t>(sizeof(client_addr));
137 int bytes = 0;
140 bytes =
141 static_cast<int>(recvfrom(fd, total_buf, total_recv, 0,
142 (struct sockaddr *)&client_addr, &sock_len));
143 ADEBUG <<
"total recv " << bytes;
144 if (bytes <= 0 || bytes > total_recv) {
145 return false;
146 }
148 size_t offset = 0;
149 memcpy(header_flag, total_buf, HEADER_FLAG_SIZE);
150 if (strcmp(header_flag, BRIDGE_HEADER_FLAG) != 0) {
151 AINFO <<
"header flag not match!";
152 return false;
153 }
155
156 char header_size_buf[
sizeof(
hsize) + 1] = {0};
157 const char *cursor = total_buf + offset;
158 memcpy(header_size_buf, cursor, sizeof(hsize));
159 hsize header_size = *(
reinterpret_cast<hsize *
>(header_size_buf));
160 offset +=
sizeof(
hsize) + 1;
161 if (header_size > FRAME_SIZE || header_size < offset) {
162 AINFO <<
"header size is more than FRAME_SIZE or less than offset!";
163 return false;
164 }
165
166 BridgeHeader header;
167 size_t buf_size = header_size - offset;
168 cursor = total_buf + offset;
169 if (!header.Diserialize(cursor, buf_size)) {
170 AINFO <<
"header diserialize failed!";
171 return false;
172 }
173
174 ADEBUG <<
"proto name : " << header.GetMsgName().c_str();
175 ADEBUG <<
"proto sequence num: " << header.GetMsgID();
176 ADEBUG <<
"proto total frames: " << header.GetTotalFrames();
177 ADEBUG <<
"proto frame index: " << header.GetIndex();
178
179 std::lock_guard<std::mutex> lock(mutex_);
180 BridgeProtoDiserializedBuf<T> *proto_buf = CreateBridgeProtoBuf(header);
181 if (!proto_buf) {
182 return false;
183 }
184
185 cursor = total_buf + header_size;
186 if (header.GetFramePos() > header.GetMsgSize()) {
187 return false;
188 }
189 char *buf = proto_buf->GetBuf(header.GetFramePos());
190
191 if (header.GetFrameSize() < 0 ||
192 header.GetFrameSize() > (total_recv - header_size)) {
193 return false;
194 }
195
196 if (header.GetFrameSize() > (header.GetMsgSize() - header.GetFramePos())) {
197 return false;
198 }
199 memcpy(buf, cursor, header.GetFrameSize());
200 proto_buf->UpdateStatus(header.GetIndex());
201 if (proto_buf->IsReadyDiserialize()) {
202 auto pb_msg = std::make_shared<T>();
203 proto_buf->Diserialized(pb_msg);
204 writer_->Write(pb_msg);
205 RemoveInvalidBuf(proto_buf->GetMsgID());
207 }
208 return true;
209}
210
211template <typename T>
212bool UDPBridgeReceiverComponent<T>::RemoveInvalidBuf(uint32_t msg_id) {
213 if (msg_id == 0) {
214 return false;
215 }
216 typename std::vector<BridgeProtoDiserializedBuf<T> *>::iterator itor =
217 proto_list_.begin();
218 for (; itor != proto_list_.end();) {
219 if ((*itor)->GetMsgID() < msg_id) {
220 BridgeProtoDiserializedBuf<T> *tmp = *itor;
222 itor = proto_list_.erase(itor);
223 continue;
224 }
225 ++itor;
226 }
227 return true;
228}
229
231}
232}
static double NowInSeconds()
gets the current time in second.
constexpr uint32_t FRAME_SIZE
constexpr char BRIDGE_HEADER_FLAG[]
bool RemoveItem(std::vector< T * > *list, const T *t)
optional string topic_name
optional string proto_name
optional bool enable_timeout
#define BRIDGE_RECV_IMPL(pb_msg)