16#ifndef CYBER_TRANSPORT_SHM_PROTOBUF_ARENA_MANAGER_H_
17#define CYBER_TRANSPORT_SHM_PROTOBUF_ARENA_MANAGER_H_
21#include <unordered_map>
24#include <google/protobuf/arena.h>
111 std::vector<std::shared_ptr<google::protobuf::Arena>>
arenas_;
164 uint64_t channel_id);
173 uint64_t block_index);
175 std::shared_ptr<ArenaSegment>
GetSegment(uint64_t channel_id);
178 const void* message)
override;
184 if (non_arena_buffers_.find(channel_id) == non_arena_buffers_.end()) {
187 return non_arena_buffers_[channel_id];
189 if (segment->arena_buffer_address_ !=
nullptr) {
190 return segment->arena_buffer_address_;
192 return non_arena_buffers_[channel_id];
195 template <
typename T>
197 if (non_arena_buffers_.find(channel_id) == non_arena_buffers_.end() ||
198 arena_buffer_callbacks_.find(channel_id) ==
199 arena_buffer_callbacks_.end()) {
201 non_arena_buffer_ptr->Init(size);
202 non_arena_buffers_[channel_id] = non_arena_buffer_ptr;
203 arena_buffer_callbacks_[channel_id] = [
this, channel_id, size]() {
206 ADEBUG <<
"channel id '" << channel_id <<
"' not enable";
207 ADEBUG <<
"fallback to use nomarl queue";
210 if (segment->shared_buffer_arena_ ==
nullptr) {
211 ADEBUG <<
"Not enable arena shared buffer in channel id '"
212 << channel_id <<
"'";
213 ADEBUG <<
"fallback to use nomarl queue";
216 if (segment->arena_buffer_address_ ==
nullptr) {
217 auto ptr = google::protobuf::Arena::Create<base::ArenaQueue<T>>(
218 segment->shared_buffer_arena_.get());
219 ptr->Init(size, segment->shared_buffer_arena_.get());
220 segment->arena_buffer_address_ =
reinterpret_cast<void*
>(ptr);
227 arena_buffer_callbacks_[channel_id]();
232 template <
typename M,
233 typename std::enable_if<
234 google::protobuf::Arena::is_arena_constructable<M>::value,
238 cyber::common::GlobalData::Instance()->GetChannelArenaConf(channel_id);
239 google::protobuf::ArenaOptions options;
240 options.start_block_size = arena_conf.max_msg_size();
241 options.max_block_size = arena_conf.max_msg_size();
253 if (!segment->AcquireBlockToWrite(size, &wb)) {
256 options.initial_block =
257 reinterpret_cast<char*
>(segment->arena_block_address_[wb.
block_index_]);
258 options.initial_block_size = segment->message_capacity_;
263 std::make_shared<google::protobuf::Arena>(options);
267 ret_msg = std::shared_ptr<M>(
268 google::protobuf::Arena::CreateMessage<M>(
270 [segment, wb](M* ptr) {
271 int32_t lock_num = segment->blocks_[wb.block_index_].lock_num_.load();
272 if (lock_num < ArenaSegmentBlock::kRWLockFree) {
273 segment->ReleaseWrittenBlock(wb);
279 template <
typename M,
280 typename std::enable_if<
281 !google::protobuf::Arena::is_arena_constructable<M>::value,
289 std::unordered_map<uint64_t, std::shared_ptr<ArenaSegment>> segments_;
290 std::unordered_map<uint64_t, void*> non_arena_buffers_;
291 std::unordered_map<uint64_t, std::function<void()>> arena_buffer_callbacks_;
292 std::mutex segments_mutex_;
294 std::shared_ptr<ArenaAddressAllocator> address_allocator_;
296 static ArenaAllocCallback arena_alloc_cb_;
297 static void* ArenaAlloc(uint64_t size);
298 static void ArenaDealloc(
void* addr, uint64_t size);
300 std::mutex arena_alloc_cb_mutex_;
302 std::unordered_map<uint64_t, uint64_t> managed_wrappers_;
void RemoveBlockReadLock(uint64_t block_index)
bool Init(uint64_t message_size, uint64_t block_num)
ArenaSegmentState * state_
bool Open(uint64_t message_size, uint64_t block_num)
std::vector< std::shared_ptr< google::protobuf::Arena > > arenas_
bool OpenOrCreate(uint64_t message_size, uint64_t block_num)
ArenaSegmentBlock * blocks_
uint64_t GetNextWritableBlockIndex()
std::shared_ptr< google::protobuf::Arena > shared_buffer_arena_
bool AddBlockReadLock(uint64_t block_index)
bool AcquireBlockToRead(ArenaSegmentBlockInfo *block_info)
void RemoveBlockWriteLock(uint64_t block_index)
void ReleaseReadBlock(const ArenaSegmentBlockInfo &block_info)
void ReleaseWrittenBlock(const ArenaSegmentBlockInfo &block_info)
bool AddBlockWriteLock(uint64_t block_index)
bool AcquireBlockToWrite(uint64_t size, ArenaSegmentBlockInfo *block_info)
void * arena_buffer_address_
std::vector< uint64_t > arena_block_address_
uint64_t message_capacity_
uint64_t GetBaseAddress(const message::ArenaMessageWrapper *wrapper) override
std::shared_ptr< ArenaSegment > GetSegment(uint64_t channel_id)
void ResetMessageRelatedBlocks(message::ArenaMessageWrapper *wrapper)
void AcquireArenaMessage(uint64_t channel_id, std::shared_ptr< M > &ret_msg)
std::function< void *(uint64_t)> ArenaAllocCallback
bool RegisterQueue(uint64_t channel_id, uint64_t size)
void * GetMessage(message::ArenaMessageWrapper *wrapper) override
void * GetAvailableBuffer(uint64_t channel_id)
void SetMessageAddressOffset(message::ArenaMessageWrapper *wrapper, uint64_t offset)
uint64_t GetMessageAddressOffset(message::ArenaMessageWrapper *wrapper)
bool EnableSegment(uint64_t channel_id)
void SetMessageChannelId(message::ArenaMessageWrapper *wrapper, uint64_t channel_id)
uint64_t GetMessageChannelId(message::ArenaMessageWrapper *wrapper)
void * SetMessage(message::ArenaMessageWrapper *wrapper, const void *message) override
void AddMessageRelatedBlock(message::ArenaMessageWrapper *wrapper, uint64_t block_index)
std::vector< uint64_t > GetMessageRelatedBlocks(message::ArenaMessageWrapper *wrapper)
#define DECLARE_SINGLETON(classname)
void * block_buffer_address_
ArenaSegmentBlock * block_
static const int32_t kMaxTryLockTimes
static const int32_t kRWLockFree
std::atomic< int32_t > lock_num_
static const int32_t kWriteExclusive
std::atomic< uint64_t > reading_ref_count_
base::PthreadRWLock read_write_mutex_
std::atomic< uint64_t > writing_ref_count_
struct apollo::cyber::transport::ArenaSegmentBlockDescriptor::@4 struct_
std::atomic< uint64_t > message_seq_
std::atomic< bool > auto_extended_
struct apollo::cyber::transport::ArenaSegmentState::@3 struct_
std::atomic< uint64_t > ref_count_
std::atomic< uint64_t > message_size_
std::atomic< uint64_t > block_num_
uint64_t related_blocks_[4]
struct apollo::cyber::transport::ExtendedStruct::@5 meta_
uint64_t related_blocks_size_