349 {
350 auto input_msg = reinterpret_cast<const google::protobuf::Message*>(message);
353 auto arena_ptr = input_msg->GetArena();
354 google::protobuf::ArenaOptions options;
355
356 if (!segment) {
357 return nullptr;
358 }
359
360 void* msg_output = nullptr;
361 if (arena_ptr == nullptr) {
362 auto arena_conf =
363 cyber::common::GlobalData::Instance()->GetChannelArenaConf(channel_id);
364 google::protobuf::ArenaOptions options;
365 options.start_block_size = arena_conf.max_msg_size();
366 options.max_block_size = arena_conf.max_msg_size();
367
368 if (!segment) {
369 return nullptr;
370 }
371
373
374 ArenaSegmentBlockInfo wb;
375
376
377 uint64_t size = 0;
378 if (!segment->AcquireBlockToWrite(size, &wb)) {
379 return nullptr;
380 }
382 options.initial_block =
383 reinterpret_cast<char*>(segment->arena_block_address_[wb.block_index_]);
384 options.initial_block_size = segment->message_capacity_;
385 if (segment->arenas_[wb.block_index_] != nullptr) {
386 segment->arenas_[wb.block_index_] = nullptr;
387 }
388 segment->arenas_[wb.block_index_] =
389 std::make_shared<google::protobuf::Arena>(options);
390 auto msg = input_msg->New(segment->arenas_[wb.block_index_].get());
391 msg->CopyFrom(*input_msg);
393 wrapper, reinterpret_cast<uint64_t>(msg) -
394 reinterpret_cast<uint64_t>(segment->GetShmAddress()));
395 msg_output = reinterpret_cast<void*>(msg);
396 segment->ReleaseWrittenBlock(wb);
397 } else {
398 ArenaSegmentBlockInfo wb;
399 int block_index = -1;
400 for (size_t i = 0; i < segment->message_capacity_; i++) {
401 if (segment->arenas_[i].get() == arena_ptr) {
402 block_index = i;
403 break;
404 }
405 }
406 if (block_index == -1) {
407 return nullptr;
408 }
409 wb.block_index_ = block_index;
413 wrapper, reinterpret_cast<uint64_t>(input_msg) -
414 reinterpret_cast<uint64_t>(segment->GetShmAddress()));
415 msg_output = reinterpret_cast<void*>(
416 const_cast<google::protobuf::Message*>(input_msg));
417 segment->ReleaseWrittenBlock(wb);
418 }
419
420 return msg_output;
421}
void ResetMessageRelatedBlocks(message::ArenaMessageWrapper *wrapper)
void SetMessageAddressOffset(message::ArenaMessageWrapper *wrapper, uint64_t offset)
void AddMessageRelatedBlock(message::ArenaMessageWrapper *wrapper, uint64_t block_index)