61 std::shared_ptr<ProtoDiserializedBufBase> proto_buf;
63 std::vector<std::shared_ptr<ProtoDiserializedBufBase>>::iterator itor =
65 for (; itor != proto_list_.end();) {
66 if ((*itor)->IsTheProto(header)) {
67 itor = proto_list_.erase(itor);
75 for (
auto proto : proto_list_) {
76 if (proto->IsTheProto(header)) {
85 proto_buf->Initialize(header,
node_);
86 proto_list_.push_back(proto_buf);
114 struct sockaddr_in client_addr;
115 socklen_t sock_len =
static_cast<socklen_t
>(
sizeof(client_addr));
119 static_cast<int>(recvfrom(fd, total_buf, total_recv, 0,
120 (
struct sockaddr *)&client_addr, &sock_len));
121 if (bytes <= 0 || bytes > total_recv) {
126 AERROR <<
"Header flag didn't match!";
131 const char *cursor = total_buf + offset;
132 hsize header_size = *(
reinterpret_cast<const hsize *
>(cursor));
133 offset +=
sizeof(
hsize) + 1;
135 if (header_size < offset || header_size >
FRAME_SIZE) {
136 AERROR <<
"header size is more than FRAME_SIZE!";
140 cursor = total_buf + offset;
141 size_t buf_size = header_size - offset;
144 AERROR <<
"header diserialize failed!";
153 std::lock_guard<std::mutex> lock(mutex_);
154 std::shared_ptr<ProtoDiserializedBufBase> proto_buf =
160 cursor = total_buf + header_size;
164 char *buf = proto_buf->GetBuf(header.
GetFramePos());
175 proto_buf->UpdateStatus(header.
GetIndex());
176 if (proto_buf->IsReadyDiserialize()) {
177 proto_buf->DiserializedAndPub();
178 RemoveInvalidBuf(proto_buf->GetMsgID(), proto_buf->GetMsgName());