24#include <google/protobuf/arena.h>
25#include <google/protobuf/message.h>
38 : channel_id_(0), key_id_(0), base_address_(nullptr) {}
41 : channel_id_(channel_id),
42 key_id_(
std::hash<
std::string>{}(
"/apollo/__arena__/" +
43 std::to_string(channel_id))) {}
46 : channel_id_(channel_id),
47 key_id_(
std::hash<
std::string>{}(
"/apollo/__arena__/" +
48 std::to_string(channel_id))),
49 base_address_(base_address) {}
52 uint64_t block_num,
void* base_address)
53 : channel_id_(channel_id),
54 key_id_(
std::hash<
std::string>{}(
"/apollo/__arena__/" +
55 std::to_string(channel_id))),
56 base_address_(base_address) {
63 uint64_t key_id = std::hash<std::string>{}(
"/apollo/__arena__/" +
76 cyber::common::GlobalData::Instance()->GetChannelArenaConf(
channel_id_);
77 auto shared_buffer_size = arena_conf.shared_buffer_size();
82 shmget(
static_cast<key_t
>(
key_id_), size, 0644 | IPC_CREAT | IPC_EXCL);
84 if (errno == EINVAL) {
86 }
else if (errno == EEXIST) {
101 arenas_.resize(block_num,
nullptr);
102 if (shared_buffer_size == 0) {
105 google::protobuf::ArenaOptions options;
106 options.start_block_size = shared_buffer_size;
107 options.max_block_size = shared_buffer_size;
108 options.initial_block =
reinterpret_cast<char*
>(
111 options.initial_block_size = shared_buffer_size;
114 for (
size_t i = 0; i < block_num; i++) {
128 for (uint64_t i = 0; i < block_num; ++i) {
139 cyber::common::GlobalData::Instance()->GetChannelArenaConf(
channel_id_);
140 auto shared_buffer_size = arena_conf.shared_buffer_size();
141 auto shmid = shmget(
static_cast<key_t
>(
key_id_), 0, 0644);
157 arenas_.resize(block_num,
nullptr);
158 if (shared_buffer_size == 0) {
161 google::protobuf::ArenaOptions options;
162 options.start_block_size = shared_buffer_size;
163 options.max_block_size = shared_buffer_size;
164 options.initial_block =
reinterpret_cast<char*
>(
167 options.initial_block_size = shared_buffer_size;
170 for (
size_t i = 0; i < block_num; i++) {
201 auto& block =
blocks_[block_index];
203 if (!block.lock_num_.compare_exchange_weak(
205 std::memory_order_acq_rel, std::memory_order_relaxed)) {
227 auto& block =
blocks_[block_index];
228 int32_t lock_num = block.
lock_num_.load();
230 AINFO <<
"block is being written.";
234 int32_t try_times = 0;
235 while (!block.lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
236 std::memory_order_acq_rel,
237 std::memory_order_relaxed)) {
240 AINFO <<
"fail to add read lock num, curr num: " << lock_num;
244 lock_num = block.lock_num_.load();
246 AINFO <<
"block is being written.";
328ProtobufArenaManager::ProtobufArenaManager() {
329 address_allocator_ = std::make_shared<ArenaAddressAllocator>();
340 uint64_t channel_id) {
341 std::lock_guard<std::mutex> lock(segments_mutex_);
342 if (segments_.find(channel_id) == segments_.end()) {
345 return segments_[channel_id];
349 const void* message) {
350 auto input_msg =
reinterpret_cast<const google::protobuf::Message*
>(message);
353 auto arena_ptr = input_msg->GetArena();
354 google::protobuf::ArenaOptions options;
360 void* msg_output =
nullptr;
361 if (arena_ptr ==
nullptr) {
363 cyber::common::GlobalData::Instance()->GetChannelArenaConf(channel_id);
364 google::protobuf::ArenaOptions options;
365 options.start_block_size = arena_conf.max_msg_size();
366 options.max_block_size = arena_conf.max_msg_size();
378 if (!segment->AcquireBlockToWrite(size, &wb)) {
382 options.initial_block =
383 reinterpret_cast<char*
>(segment->arena_block_address_[wb.
block_index_]);
384 options.initial_block_size = segment->message_capacity_;
389 std::make_shared<google::protobuf::Arena>(options);
390 auto msg = input_msg->New(segment->arenas_[wb.
block_index_].get());
391 msg->CopyFrom(*input_msg);
393 wrapper,
reinterpret_cast<uint64_t
>(msg) -
394 reinterpret_cast<uint64_t
>(segment->GetShmAddress()));
395 msg_output =
reinterpret_cast<void*
>(msg);
396 segment->ReleaseWrittenBlock(wb);
399 int block_index = -1;
400 for (
size_t i = 0; i < segment->message_capacity_; i++) {
401 if (segment->arenas_[i].get() == arena_ptr) {
406 if (block_index == -1) {
413 wrapper,
reinterpret_cast<uint64_t
>(input_msg) -
414 reinterpret_cast<uint64_t
>(segment->GetShmAddress()));
415 msg_output =
reinterpret_cast<void*
>(
416 const_cast<google::protobuf::Message*
>(input_msg));
417 segment->ReleaseWrittenBlock(wb);
429 auto address =
reinterpret_cast<uint64_t
>(segment->GetShmAddress()) +
432 return reinterpret_cast<void*
>(address);
447 if (segments_.find(channel_id) != segments_.end()) {
448 if (arena_buffer_callbacks_.find(channel_id) !=
449 arena_buffer_callbacks_.end()) {
450 arena_buffer_callbacks_[channel_id]();
458 auto cyber_config = apollo::cyber::common::GlobalData::Instance()->Config();
459 if (!cyber_config.has_transport_conf()) {
462 if (!cyber_config.transport_conf().has_shm_conf()) {
465 if (!cyber_config.transport_conf().shm_conf().has_arena_shm_conf()) {
468 if (!cyber::common::GlobalData::Instance()->IsChannelEnableArenaShm(
473 cyber::common::GlobalData::Instance()->GetChannelArenaConf(channel_id);
474 auto segment_shm_address = address_allocator_->Allocate(channel_id);
475 auto segment = std::make_shared<ArenaSegment>(
476 channel_id, arena_conf.max_msg_size(), arena_conf.max_pool_size(),
477 reinterpret_cast<void*
>(segment_shm_address));
478 segments_[channel_id] = segment;
479 if (arena_buffer_callbacks_.find(channel_id) !=
480 arena_buffer_callbacks_.end()) {
481 arena_buffer_callbacks_[channel_id]();
491 for (
auto& segment : segments_) {
492 address_allocator_->Deallocate(segment.first);
494 for (
auto& buffer : non_arena_buffers_) {
495 delete buffer.second;
526 std::vector<uint64_t> related_blocks;
528 for (uint64_t i = 0; i < extended->meta_.related_blocks_size_; ++i) {
529 related_blocks.push_back(extended->meta_.related_blocks_[i]);
531 return related_blocks;
545 if (extended->meta_.related_blocks_size_ >=
546 sizeof(extended->meta_.related_blocks_) /
sizeof(uint64_t)) {
556void* ProtobufArenaManager::ArenaAlloc(uint64_t size) {
557 return arena_alloc_cb_ ? arena_alloc_cb_(size) : nullptr;
560void ProtobufArenaManager::ArenaDealloc(
void* addr, uint64_t size) {}
void RemoveBlockReadLock(uint64_t block_index)
bool Init(uint64_t message_size, uint64_t block_num)
ArenaSegmentState * state_
bool Open(uint64_t message_size, uint64_t block_num)
std::vector< std::shared_ptr< google::protobuf::Arena > > arenas_
bool OpenOrCreate(uint64_t message_size, uint64_t block_num)
ArenaSegmentBlock * blocks_
uint64_t GetNextWritableBlockIndex()
std::shared_ptr< google::protobuf::Arena > shared_buffer_arena_
bool AddBlockReadLock(uint64_t block_index)
bool AcquireBlockToRead(ArenaSegmentBlockInfo *block_info)
void RemoveBlockWriteLock(uint64_t block_index)
void ReleaseReadBlock(const ArenaSegmentBlockInfo &block_info)
void ReleaseWrittenBlock(const ArenaSegmentBlockInfo &block_info)
bool AddBlockWriteLock(uint64_t block_index)
bool AcquireBlockToWrite(uint64_t size, ArenaSegmentBlockInfo *block_info)
std::vector< uint64_t > arena_block_address_
uint64_t message_capacity_
uint64_t GetBaseAddress(const message::ArenaMessageWrapper *wrapper) override
std::shared_ptr< ArenaSegment > GetSegment(uint64_t channel_id)
void ResetMessageRelatedBlocks(message::ArenaMessageWrapper *wrapper)
std::function< void *(uint64_t)> ArenaAllocCallback
void * GetMessage(message::ArenaMessageWrapper *wrapper) override
void SetMessageAddressOffset(message::ArenaMessageWrapper *wrapper, uint64_t offset)
uint64_t GetMessageAddressOffset(message::ArenaMessageWrapper *wrapper)
bool EnableSegment(uint64_t channel_id)
void SetMessageChannelId(message::ArenaMessageWrapper *wrapper, uint64_t channel_id)
uint64_t GetMessageChannelId(message::ArenaMessageWrapper *wrapper)
void * SetMessage(message::ArenaMessageWrapper *wrapper, const void *message) override
void AddMessageRelatedBlock(message::ArenaMessageWrapper *wrapper, uint64_t block_index)
std::vector< uint64_t > GetMessageRelatedBlocks(message::ArenaMessageWrapper *wrapper)
bool Init(const char *binary_name, const std::string &dag_info)
void * block_buffer_address_
ArenaSegmentBlock * block_
static const int32_t kMaxTryLockTimes
static const int32_t kRWLockFree
std::atomic< int32_t > lock_num_
static const int32_t kWriteExclusive
std::atomic< uint64_t > message_seq_
std::atomic< bool > auto_extended_
struct apollo::cyber::transport::ArenaSegmentState::@3 struct_
std::atomic< uint64_t > ref_count_
std::atomic< uint64_t > message_size_
std::atomic< uint64_t > block_num_
uint64_t related_blocks_[4]
struct apollo::cyber::transport::ExtendedStruct::@5 meta_
uint64_t related_blocks_size_