26#define BRIDGE_RECV_IMPL(pb_msg) \
27 template class UDPBridgeReceiverComponent<pb_msg>
31 : monitor_logger_buffer_(common::monitor::MonitorMessageItem::CONTROL) {}
35 for (
auto proto : proto_list_) {
42 AINFO <<
"UDP bridge receiver init, startin...";
44 if (!this->GetProtoConfig(&udp_bridge_remote)) {
45 AINFO <<
"load udp bridge component proto param failed";
48 bind_port_ = udp_bridge_remote.
bind_port();
52 ADEBUG <<
"UDP Bridge remote port is: " << bind_port_;
53 ADEBUG <<
"UDP Bridge for Proto is: " << proto_name_;
54 writer_ = node_->CreateWriter<T>(topic_name_.c_str());
56 if (!InitSession((uint16_t)bind_port_)) {
59 ADEBUG <<
"initialize session successful.";
71void UDPBridgeReceiverComponent<T>::MsgDispatcher() {
72 ADEBUG <<
"msg dispatcher start successful.";
77BridgeProtoDiserializedBuf<T>
78 *UDPBridgeReceiverComponent<T>::CreateBridgeProtoBuf(
79 const BridgeHeader &header) {
80 if (IsTimeout(header.GetTimeStamp())) {
81 typename std::vector<BridgeProtoDiserializedBuf<T> *>::iterator itor =
83 for (; itor != proto_list_.end();) {
84 if ((*itor)->IsTheProto(header)) {
85 BridgeProtoDiserializedBuf<T> *tmp = *itor;
87 itor = proto_list_.erase(itor);
95 for (
auto proto : proto_list_) {
96 if (proto->IsTheProto(header)) {
100 BridgeProtoDiserializedBuf<T> *proto_buf =
new BridgeProtoDiserializedBuf<T>;
104 proto_buf->Initialize(header);
105 proto_list_.push_back(proto_buf);
110bool UDPBridgeReceiverComponent<T>::IsProtoExist(
const BridgeHeader &header) {
111 for (
auto proto : proto_list_) {
112 if (proto->IsTheProto(header)) {
120bool UDPBridgeReceiverComponent<T>::IsTimeout(
double time_stamp) {
121 if (enable_timeout_ ==
false) {
125 if (cur_time < time_stamp) {
128 if (FLAGS_timeout < cur_time - time_stamp) {
136 struct sockaddr_in client_addr;
137 socklen_t sock_len =
static_cast<socklen_t
>(
sizeof(client_addr));
142 static_cast<int>(recvfrom(fd, total_buf, total_recv, 0,
143 (
struct sockaddr *)&client_addr, &sock_len));
144 ADEBUG <<
"total recv " << bytes;
145 if (bytes <= 0 || bytes > total_recv) {
152 AINFO <<
"header flag not match!";
157 char header_size_buf[
sizeof(
hsize) + 1] = {0};
158 const char *cursor = total_buf + offset;
159 memcpy(header_size_buf, cursor,
sizeof(
hsize));
160 hsize header_size = *(
reinterpret_cast<hsize *
>(header_size_buf));
161 offset +=
sizeof(
hsize) + 1;
162 if (header_size >
FRAME_SIZE || header_size < offset) {
163 AINFO <<
"header size is more than FRAME_SIZE or less than offset!";
168 size_t buf_size = header_size - offset;
169 cursor = total_buf + offset;
171 AINFO <<
"header diserialize failed!";
180 std::lock_guard<std::mutex> lock(mutex_);
186 cursor = total_buf + header_size;
203 auto pb_msg = std::make_shared<T>();
205 writer_->Write(pb_msg);
206 RemoveInvalidBuf(proto_buf->
GetMsgID());
217 typename std::vector<BridgeProtoDiserializedBuf<T> *>::iterator itor =
219 for (; itor != proto_list_.end();) {
220 if ((*itor)->GetMsgID() < msg_id) {
221 BridgeProtoDiserializedBuf<T> *tmp = *itor;
223 itor = proto_list_.erase(itor);
virtual uint32_t GetMsgID() const
bool Diserialized(std::shared_ptr< T > proto)
virtual void UpdateStatus(uint32_t frame_index)
virtual bool IsReadyDiserialize() const
virtual char * GetBuf(size_t offset)
UDPBridgeReceiverComponent()
~UDPBridgeReceiverComponent()
static double NowInSeconds()
gets the current time in second.
bool Initialize(const ComponentConfig &config) override
init the component by protobuf object.
constexpr uint32_t FRAME_SIZE
constexpr char BRIDGE_HEADER_FLAG[]
constexpr size_t HEADER_FLAG_SIZE
bool RemoveItem(std::vector< T * > *list, const T *t)
optional string topic_name
optional string proto_name
optional bool enable_timeout
#define BRIDGE_RECV_IMPL(pb_msg)