275 {
276
277 if (cyber::common::GlobalData::Instance()->IsChannelEnableArenaShm(
278 self_attr.channel_id()) &&
279 self_attr.message_type() != message::MessageType<message::RawMessage>() &&
280 self_attr.message_type() !=
281 message::MessageType<message::PyMessageWrap>()) {
282 auto listener_adapter = [listener, self_attr](
283 const std::shared_ptr<ReadableBlock>& rb,
284 const MessageInfo& msg_info) {
285 auto msg = std::make_shared<MessageT>();
286 auto arena_manager = ProtobufArenaManager::Instance();
287 auto msg_wrapper = arena_manager->CreateMessageWrapper();
288 memcpy(msg_wrapper->GetData(), rb->buf, 1024);
289 MessageT* msg_p;
291 &msg_p)) {
292 AERROR <<
"ParseFromArenaMessageWrapper failed";
293 }
294
295
296 auto segment = arena_manager->GetSegment(self_attr.channel_id());
297 auto msg_addr = reinterpret_cast<uint64_t>(msg_p);
298 msg.reset(reinterpret_cast<MessageT*>(msg_addr),
299 [arena_manager, segment, msg_wrapper](MessageT* p) {
300
301
302
303
304
305
306
307 });
308 auto related_blocks_for_lock =
309 arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
310 for (int i = 0; i < related_blocks_for_lock.size(); ++i) {
311 auto block_index = related_blocks_for_lock[i];
312 if (!segment->AddBlockReadLock(block_index)) {
313 AWARN <<
"failed to acquire block for read, channel: "
314 << self_attr.channel_id() << " index: " << block_index;
315 for (int j = 0; j < i; ++j) {
316
317 segment->RemoveBlockReadLock(related_blocks_for_lock[j]);
318 }
319 return;
320 }
321 }
322
323 auto send_time = msg_info.send_time();
324
325 statistics::Statistics::Instance()->AddRecvCount(self_attr,
326 msg_info.seq_num());
327 statistics::Statistics::Instance()->SetTotalMsgsStatus(
328 self_attr, msg_info.seq_num());
329
331
332
333 auto tran_diff = (recv_time - send_time) / 1000;
334 if (tran_diff > 0) {
335 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
336 self_attr, tran_diff);
337 }
338 statistics::Statistics::Instance()->SetProcStatus(self_attr,
339 recv_time / 1000);
340
341 listener(msg, msg_info);
342 auto related_blocks =
343 arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
344 for (auto block_index : related_blocks) {
345
346 segment->RemoveBlockReadLock(block_index);
347 }
348 };
349
350 AddArenaListener<ReadableBlock>(self_attr, opposite_attr, listener_adapter);
351 } else {
352 auto listener_adapter = [listener, self_attr](
353 const std::shared_ptr<ReadableBlock>& rb,
354 const MessageInfo& msg_info) {
355 auto msg = std::make_shared<MessageT>();
357 rb->buf, static_cast<int>(rb->block->msg_size()), msg.get()));
358
359 auto send_time = msg_info.send_time();
360 auto msg_seq_num = msg_info.seq_num();
361
362 statistics::Statistics::Instance()->AddRecvCount(self_attr, msg_seq_num);
363 statistics::Statistics::Instance()->SetTotalMsgsStatus(self_attr,
364 msg_seq_num);
365
367
368
369 auto tran_diff = (recv_time - send_time) / 1000;
370 if (tran_diff > 0) {
371 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
372 self_attr, tran_diff);
373 }
374 statistics::Statistics::Instance()->SetProcStatus(self_attr,
375 recv_time / 1000);
376
377 listener(msg, msg_info);
378 };
379
380 Dispatcher::AddListener<ReadableBlock>(self_attr, opposite_attr,
381 listener_adapter);
382 }
383 AddSegment(self_attr);
384}