Apollo 10.0
自动驾驶开放平台
apollo::cyber::transport::ProtobufArenaManager类 参考

#include <protobuf_arena_manager.h>

类 apollo::cyber::transport::ProtobufArenaManager 继承关系图:
apollo::cyber::transport::ProtobufArenaManager 的协作图:

Public 类型

using ArenaAllocCallback = std::function< void *(uint64_t)>
 

Public 成员函数

 ~ProtobufArenaManager ()
 
uint64_t GetBaseAddress (const message::ArenaMessageWrapper *wrapper) override
 
bool Enable ()
 
bool EnableSegment (uint64_t channel_id)
 
bool Destroy ()
 
void SetMessageChannelId (message::ArenaMessageWrapper *wrapper, uint64_t channel_id)
 
uint64_t GetMessageChannelId (message::ArenaMessageWrapper *wrapper)
 
void SetMessageAddressOffset (message::ArenaMessageWrapper *wrapper, uint64_t offset)
 
uint64_t GetMessageAddressOffset (message::ArenaMessageWrapper *wrapper)
 
std::vector< uint64_t > GetMessageRelatedBlocks (message::ArenaMessageWrapper *wrapper)
 
void ResetMessageRelatedBlocks (message::ArenaMessageWrapper *wrapper)
 
void AddMessageRelatedBlock (message::ArenaMessageWrapper *wrapper, uint64_t block_index)
 
std::shared_ptr< ArenaSegmentGetSegment (uint64_t channel_id)
 
void * SetMessage (message::ArenaMessageWrapper *wrapper, const void *message) override
 
void * GetMessage (message::ArenaMessageWrapper *wrapper) override
 
void * GetAvailableBuffer (uint64_t channel_id)
 
template<typename T >
bool RegisterQueue (uint64_t channel_id, uint64_t size)
 
template<typename M , typename std::enable_if< google::protobuf::Arena::is_arena_constructable< M >::value, M >::type * = nullptr>
void AcquireArenaMessage (uint64_t channel_id, std::shared_ptr< M > &ret_msg)
 
template<typename M , typename std::enable_if< !google::protobuf::Arena::is_arena_constructable< M >::value, M >::type * = nullptr>
void AcquireArenaMessage (uint64_t channel_id, std::shared_ptr< M > &ret_msg)
 
- Public 成员函数 继承自 apollo::cyber::message::ArenaManagerBase
 ArenaManagerBase ()
 
virtual ~ArenaManagerBase ()
 
std::shared_ptr< message::ArenaMessageWrapperCreateMessageWrapper ()
 
template<typename MessageT >
MessageT * SetMessage (ArenaMessageWrapper *wrapper, const MessageT &message)
 
template<typename MessageT >
MessageT * GetMessage (ArenaMessageWrapper *wrapper)
 

详细描述

在文件 protobuf_arena_manager.h150 行定义.

成员类型定义说明

◆ ArenaAllocCallback

在文件 protobuf_arena_manager.h152 行定义.

构造及析构函数说明

◆ ~ProtobufArenaManager()

apollo::cyber::transport::ProtobufArenaManager::~ProtobufArenaManager ( )

成员函数说明

◆ AcquireArenaMessage() [1/2]

template<typename M , typename std::enable_if< google::protobuf::Arena::is_arena_constructable< M >::value, M >::type * = nullptr>
void apollo::cyber::transport::ProtobufArenaManager::AcquireArenaMessage ( uint64_t  channel_id,
std::shared_ptr< M > &  ret_msg 
)
inline

在文件 protobuf_arena_manager.h236 行定义.

236 {
237 auto arena_conf =
238 cyber::common::GlobalData::Instance()->GetChannelArenaConf(channel_id);
239 google::protobuf::ArenaOptions options;
240 options.start_block_size = arena_conf.max_msg_size();
241 options.max_block_size = arena_conf.max_msg_size();
242
243 auto segment = GetSegment(channel_id);
244 if (!segment) {
245 return;
246 }
247
248 ArenaSegmentBlockInfo wb;
249 // TODO(All): size should be send to
250 // AcquireBlockToWrite for dynamic adjust block size
251 // auto size = input_msg->ByteSizeLong();
252 uint64_t size = 0;
253 if (!segment->AcquireBlockToWrite(size, &wb)) {
254 return;
255 }
256 options.initial_block =
257 reinterpret_cast<char*>(segment->arena_block_address_[wb.block_index_]);
258 options.initial_block_size = segment->message_capacity_;
259 if (segment->arenas_[wb.block_index_] != nullptr) {
260 segment->arenas_[wb.block_index_] = nullptr;
261 }
262 segment->arenas_[wb.block_index_] =
263 std::make_shared<google::protobuf::Arena>(options);
264
265 // deconstructor do nothing to avoid proto
266 // instance deconstructed before arena allocator
267 ret_msg = std::shared_ptr<M>(
268 google::protobuf::Arena::CreateMessage<M>(
269 segment->arenas_[wb.block_index_].get()),
270 [segment, wb](M* ptr) {
271 int32_t lock_num = segment->blocks_[wb.block_index_].lock_num_.load();
272 if (lock_num < ArenaSegmentBlock::kRWLockFree) {
273 segment->ReleaseWrittenBlock(wb);
274 }
275 });
276 return;
277 }
std::shared_ptr< ArenaSegment > GetSegment(uint64_t channel_id)

◆ AcquireArenaMessage() [2/2]

template<typename M , typename std::enable_if< !google::protobuf::Arena::is_arena_constructable< M >::value, M >::type * = nullptr>
void apollo::cyber::transport::ProtobufArenaManager::AcquireArenaMessage ( uint64_t  channel_id,
std::shared_ptr< M > &  ret_msg 
)
inline

在文件 protobuf_arena_manager.h283 行定义.

283 {
284 return;
285 }

◆ AddMessageRelatedBlock()

void apollo::cyber::transport::ProtobufArenaManager::AddMessageRelatedBlock ( message::ArenaMessageWrapper wrapper,
uint64_t  block_index 
)

在文件 protobuf_arena_manager.cc542 行定义.

543 {
544 auto extended = wrapper->GetExtended<ExtendedStruct>();
545 if (extended->meta_.related_blocks_size_ >=
546 sizeof(extended->meta_.related_blocks_) / sizeof(uint64_t)) {
547 return;
548 }
549 extended->meta_.related_blocks_[extended->meta_.related_blocks_size_++] =
550 block_index;
551}

◆ Destroy()

bool apollo::cyber::transport::ProtobufArenaManager::Destroy ( )

在文件 protobuf_arena_manager.cc486 行定义.

486 {
487 if (!init_) {
488 return true;
489 }
490
491 for (auto& segment : segments_) {
492 address_allocator_->Deallocate(segment.first);
493 }
494 for (auto& buffer : non_arena_buffers_) {
495 delete buffer.second;
496 }
497 segments_.clear();
498
499 init_ = false;
500 return true;
501}

◆ Enable()

bool apollo::cyber::transport::ProtobufArenaManager::Enable ( )

在文件 protobuf_arena_manager.cc435 行定义.

435 {
436 if (init_) {
437 return true;
438 }
439
440 // do something
441
442 init_ = true;
443 return true;
444}

◆ EnableSegment()

bool apollo::cyber::transport::ProtobufArenaManager::EnableSegment ( uint64_t  channel_id)

在文件 protobuf_arena_manager.cc446 行定义.

446 {
447 if (segments_.find(channel_id) != segments_.end()) {
448 if (arena_buffer_callbacks_.find(channel_id) !=
449 arena_buffer_callbacks_.end()) {
450 arena_buffer_callbacks_[channel_id]();
451 }
452 return true;
453 }
454
455 // uint64_t asociated_channel_id = channel_id + 1;
456 // auto segment = SegmentFactory::CreateSegment(asociated_channel_id);
457 // segment->InitOnly(10 * 1024);
458 auto cyber_config = apollo::cyber::common::GlobalData::Instance()->Config();
459 if (!cyber_config.has_transport_conf()) {
460 return false;
461 }
462 if (!cyber_config.transport_conf().has_shm_conf()) {
463 return false;
464 }
465 if (!cyber_config.transport_conf().shm_conf().has_arena_shm_conf()) {
466 return false;
467 }
468 if (!cyber::common::GlobalData::Instance()->IsChannelEnableArenaShm(
469 channel_id)) {
470 return false;
471 }
472 auto arena_conf =
473 cyber::common::GlobalData::Instance()->GetChannelArenaConf(channel_id);
474 auto segment_shm_address = address_allocator_->Allocate(channel_id);
475 auto segment = std::make_shared<ArenaSegment>(
476 channel_id, arena_conf.max_msg_size(), arena_conf.max_pool_size(),
477 reinterpret_cast<void*>(segment_shm_address));
478 segments_[channel_id] = segment;
479 if (arena_buffer_callbacks_.find(channel_id) !=
480 arena_buffer_callbacks_.end()) {
481 arena_buffer_callbacks_[channel_id]();
482 }
483 return true;
484}

◆ GetAvailableBuffer()

void * apollo::cyber::transport::ProtobufArenaManager::GetAvailableBuffer ( uint64_t  channel_id)
inline

在文件 protobuf_arena_manager.h181 行定义.

181 {
182 auto segment = this->GetSegment(channel_id);
183 if (!segment) {
184 if (non_arena_buffers_.find(channel_id) == non_arena_buffers_.end()) {
185 return nullptr;
186 }
187 return non_arena_buffers_[channel_id];
188 }
189 if (segment->arena_buffer_address_ != nullptr) {
190 return segment->arena_buffer_address_;
191 }
192 return non_arena_buffers_[channel_id];
193 }

◆ GetBaseAddress()

uint64_t apollo::cyber::transport::ProtobufArenaManager::GetBaseAddress ( const message::ArenaMessageWrapper wrapper)
overridevirtual

重载 apollo::cyber::message::ArenaManagerBase .

在文件 protobuf_arena_manager.cc334 行定义.

335 {
336 return 0;
337}

◆ GetMessage()

void * apollo::cyber::transport::ProtobufArenaManager::GetMessage ( message::ArenaMessageWrapper wrapper)
overridevirtual

实现了 apollo::cyber::message::ArenaManagerBase.

在文件 protobuf_arena_manager.cc423 行定义.

423 {
424 auto segment = GetSegment(GetMessageChannelId(wrapper));
425 if (!segment) {
426 return nullptr;
427 }
428
429 auto address = reinterpret_cast<uint64_t>(segment->GetShmAddress()) +
431
432 return reinterpret_cast<void*>(address);
433}
uint64_t GetMessageAddressOffset(message::ArenaMessageWrapper *wrapper)
uint64_t GetMessageChannelId(message::ArenaMessageWrapper *wrapper)

◆ GetMessageAddressOffset()

uint64_t apollo::cyber::transport::ProtobufArenaManager::GetMessageAddressOffset ( message::ArenaMessageWrapper wrapper)

在文件 protobuf_arena_manager.cc519 行定义.

520 {
521 return wrapper->GetExtended<ExtendedStruct>()->meta_.address_offset_;
522}

◆ GetMessageChannelId()

uint64_t apollo::cyber::transport::ProtobufArenaManager::GetMessageChannelId ( message::ArenaMessageWrapper wrapper)

在文件 protobuf_arena_manager.cc508 行定义.

509 {
510 return wrapper->GetExtended<ExtendedStruct>()->meta_.channel_id_;
511}

◆ GetMessageRelatedBlocks()

std::vector< uint64_t > apollo::cyber::transport::ProtobufArenaManager::GetMessageRelatedBlocks ( message::ArenaMessageWrapper wrapper)

在文件 protobuf_arena_manager.cc524 行定义.

525 {
526 std::vector<uint64_t> related_blocks;
527 auto extended = wrapper->GetExtended<ExtendedStruct>();
528 for (uint64_t i = 0; i < extended->meta_.related_blocks_size_; ++i) {
529 related_blocks.push_back(extended->meta_.related_blocks_[i]);
530 }
531 return related_blocks;
532}

◆ GetSegment()

std::shared_ptr< ArenaSegment > apollo::cyber::transport::ProtobufArenaManager::GetSegment ( uint64_t  channel_id)

在文件 protobuf_arena_manager.cc339 行定义.

340 {
341 std::lock_guard<std::mutex> lock(segments_mutex_);
342 if (segments_.find(channel_id) == segments_.end()) {
343 return nullptr;
344 }
345 return segments_[channel_id];
346}

◆ RegisterQueue()

template<typename T >
bool apollo::cyber::transport::ProtobufArenaManager::RegisterQueue ( uint64_t  channel_id,
uint64_t  size 
)
inline

在文件 protobuf_arena_manager.h196 行定义.

196 {
197 if (non_arena_buffers_.find(channel_id) == non_arena_buffers_.end() ||
198 arena_buffer_callbacks_.find(channel_id) ==
199 arena_buffer_callbacks_.end()) {
200 auto non_arena_buffer_ptr = new apollo::cyber::base::ArenaQueue<T>();
201 non_arena_buffer_ptr->Init(size);
202 non_arena_buffers_[channel_id] = non_arena_buffer_ptr;
203 arena_buffer_callbacks_[channel_id] = [this, channel_id, size]() {
204 auto segment = this->GetSegment(channel_id);
205 if (!segment) {
206 ADEBUG << "channel id '" << channel_id << "' not enable";
207 ADEBUG << "fallback to use nomarl queue";
208 return;
209 }
210 if (segment->shared_buffer_arena_ == nullptr) {
211 ADEBUG << "Not enable arena shared buffer in channel id '"
212 << channel_id << "'";
213 ADEBUG << "fallback to use nomarl queue";
214 return;
215 }
216 if (segment->arena_buffer_address_ == nullptr) {
217 auto ptr = google::protobuf::Arena::Create<base::ArenaQueue<T>>(
218 segment->shared_buffer_arena_.get());
219 ptr->Init(size, segment->shared_buffer_arena_.get());
220 segment->arena_buffer_address_ = reinterpret_cast<void*>(ptr);
221 }
222 };
223 }
224 // try enable arena buffer
225 auto segment = GetSegment(channel_id);
226 if (segment) {
227 arena_buffer_callbacks_[channel_id]();
228 }
229 return true;
230 }
#define ADEBUG
Definition log.h:41

◆ ResetMessageRelatedBlocks()

void apollo::cyber::transport::ProtobufArenaManager::ResetMessageRelatedBlocks ( message::ArenaMessageWrapper wrapper)

在文件 protobuf_arena_manager.cc534 行定义.

535 {
536 auto extended = wrapper->GetExtended<ExtendedStruct>();
537 extended->meta_.related_blocks_size_ = 0;
538 // memset(extended->meta_.related_blocks_, 0,
539 // sizeof(extended->meta_.related_blocks_));
540}

◆ SetMessage()

void * apollo::cyber::transport::ProtobufArenaManager::SetMessage ( message::ArenaMessageWrapper wrapper,
const void *  message 
)
overridevirtual

实现了 apollo::cyber::message::ArenaManagerBase.

在文件 protobuf_arena_manager.cc348 行定义.

349 {
350 auto input_msg = reinterpret_cast<const google::protobuf::Message*>(message);
351 auto channel_id = GetMessageChannelId(wrapper);
352 auto segment = GetSegment(channel_id);
353 auto arena_ptr = input_msg->GetArena();
354 google::protobuf::ArenaOptions options;
355
356 if (!segment) {
357 return nullptr;
358 }
359
360 void* msg_output = nullptr;
361 if (arena_ptr == nullptr) {
362 auto arena_conf =
363 cyber::common::GlobalData::Instance()->GetChannelArenaConf(channel_id);
364 google::protobuf::ArenaOptions options;
365 options.start_block_size = arena_conf.max_msg_size();
366 options.max_block_size = arena_conf.max_msg_size();
367
368 if (!segment) {
369 return nullptr;
370 }
371
373
374 ArenaSegmentBlockInfo wb;
375 // TODO(all): AcquireBlockToWrite for dynamic adjust block
376 // auto size = input_msg->ByteSizeLong();
377 uint64_t size = 0;
378 if (!segment->AcquireBlockToWrite(size, &wb)) {
379 return nullptr;
380 }
381 this->AddMessageRelatedBlock(wrapper, wb.block_index_);
382 options.initial_block =
383 reinterpret_cast<char*>(segment->arena_block_address_[wb.block_index_]);
384 options.initial_block_size = segment->message_capacity_;
385 if (segment->arenas_[wb.block_index_] != nullptr) {
386 segment->arenas_[wb.block_index_] = nullptr;
387 }
388 segment->arenas_[wb.block_index_] =
389 std::make_shared<google::protobuf::Arena>(options);
390 auto msg = input_msg->New(segment->arenas_[wb.block_index_].get());
391 msg->CopyFrom(*input_msg);
393 wrapper, reinterpret_cast<uint64_t>(msg) -
394 reinterpret_cast<uint64_t>(segment->GetShmAddress()));
395 msg_output = reinterpret_cast<void*>(msg);
396 segment->ReleaseWrittenBlock(wb);
397 } else {
398 ArenaSegmentBlockInfo wb;
399 int block_index = -1;
400 for (size_t i = 0; i < segment->message_capacity_; i++) {
401 if (segment->arenas_[i].get() == arena_ptr) {
402 block_index = i;
403 break;
404 }
405 }
406 if (block_index == -1) {
407 return nullptr;
408 }
409 wb.block_index_ = block_index;
411 this->AddMessageRelatedBlock(wrapper, block_index);
413 wrapper, reinterpret_cast<uint64_t>(input_msg) -
414 reinterpret_cast<uint64_t>(segment->GetShmAddress()));
415 msg_output = reinterpret_cast<void*>(
416 const_cast<google::protobuf::Message*>(input_msg));
417 segment->ReleaseWrittenBlock(wb);
418 }
419
420 return msg_output;
421}
void ResetMessageRelatedBlocks(message::ArenaMessageWrapper *wrapper)
void SetMessageAddressOffset(message::ArenaMessageWrapper *wrapper, uint64_t offset)
void AddMessageRelatedBlock(message::ArenaMessageWrapper *wrapper, uint64_t block_index)

◆ SetMessageAddressOffset()

void apollo::cyber::transport::ProtobufArenaManager::SetMessageAddressOffset ( message::ArenaMessageWrapper wrapper,
uint64_t  offset 
)

在文件 protobuf_arena_manager.cc513 行定义.

514 {
515 wrapper->GetExtended<ExtendedStruct>()->meta_.address_offset_ =
516 address_offset;
517}

◆ SetMessageChannelId()

void apollo::cyber::transport::ProtobufArenaManager::SetMessageChannelId ( message::ArenaMessageWrapper wrapper,
uint64_t  channel_id 
)

在文件 protobuf_arena_manager.cc503 行定义.

504 {
505 wrapper->GetExtended<ExtendedStruct>()->meta_.channel_id_ = channel_id;
506}

该类的文档由以下文件生成: