Apollo 10.0
自动驾驶开放平台
apollo::cyber::transport::ArenaSegment类 参考

#include <protobuf_arena_manager.h>

apollo::cyber::transport::ArenaSegment 的协作图:

Public 成员函数

 ArenaSegment ()
 
 ArenaSegment (uint64_t channel_id)
 
 ArenaSegment (uint64_t channel_id, void *base_address)
 
 ArenaSegment (uint64_t channel_id, uint64_t message_size, uint64_t block_num, void *base_address)
 
 ~ArenaSegment ()
 
bool Init (uint64_t message_size, uint64_t block_num)
 
bool Open (uint64_t message_size, uint64_t block_num)
 
bool OpenOrCreate (uint64_t message_size, uint64_t block_num)
 
void * GetShmAddress ()
 
uint64_t GetNextWritableBlockIndex ()
 
bool AddBlockWriteLock (uint64_t block_index)
 
void RemoveBlockWriteLock (uint64_t block_index)
 
bool AddBlockReadLock (uint64_t block_index)
 
void RemoveBlockReadLock (uint64_t block_index)
 
bool AcquireBlockToWrite (uint64_t size, ArenaSegmentBlockInfo *block_info)
 
void ReleaseWrittenBlock (const ArenaSegmentBlockInfo &block_info)
 
bool AcquireBlockToRead (ArenaSegmentBlockInfo *block_info)
 
void ReleaseReadBlock (const ArenaSegmentBlockInfo &block_info)
 

Public 属性

ArenaSegmentStatestate_
 
ArenaSegmentBlockblocks_
 
std::vector< std::shared_ptr< google::protobuf::Arena > > arenas_
 
std::vector< uint64_t > arena_block_address_
 
uint64_t channel_id_
 
uint64_t key_id_
 
void * base_address_
 
void * shm_address_
 
std::shared_ptr< google::protobuf::Arena > shared_buffer_arena_
 
void * arena_buffer_address_ = nullptr
 
uint64_t message_capacity_
 

详细描述

在文件 protobuf_arena_manager.h78 行定义.

构造及析构函数说明

◆ ArenaSegment() [1/4]

apollo::cyber::transport::ArenaSegment::ArenaSegment ( )

◆ ArenaSegment() [2/4]

apollo::cyber::transport::ArenaSegment::ArenaSegment ( uint64_t  channel_id)
explicit

在文件 protobuf_arena_manager.cc40 行定义.

41 : channel_id_(channel_id),
42 key_id_(std::hash<std::string>{}("/apollo/__arena__/" +
43 std::to_string(channel_id))) {}

◆ ArenaSegment() [3/4]

apollo::cyber::transport::ArenaSegment::ArenaSegment ( uint64_t  channel_id,
void *  base_address 
)

在文件 protobuf_arena_manager.cc45 行定义.

46 : channel_id_(channel_id),
47 key_id_(std::hash<std::string>{}("/apollo/__arena__/" +
48 std::to_string(channel_id))),
49 base_address_(base_address) {}

◆ ArenaSegment() [4/4]

apollo::cyber::transport::ArenaSegment::ArenaSegment ( uint64_t  channel_id,
uint64_t  message_size,
uint64_t  block_num,
void *  base_address 
)

在文件 protobuf_arena_manager.cc51 行定义.

53 : channel_id_(channel_id),
54 key_id_(std::hash<std::string>{}("/apollo/__arena__/" +
55 std::to_string(channel_id))),
56 base_address_(base_address) {
57 Init(message_size, block_num);
58}
bool Init(uint64_t message_size, uint64_t block_num)
int message_size

◆ ~ArenaSegment()

apollo::cyber::transport::ArenaSegment::~ArenaSegment ( )

在文件 protobuf_arena_manager.cc60 行定义.

60{}

成员函数说明

◆ AcquireBlockToRead()

bool apollo::cyber::transport::ArenaSegment::AcquireBlockToRead ( ArenaSegmentBlockInfo block_info)

在文件 protobuf_arena_manager.cc290 行定义.

290 {
291 if (!block_info) {
292 return false;
293 }
294 if (!state_ || !blocks_) {
295 return false;
296 }
297
298 if (block_info->block_index_ >= state_->struct_.block_num_.load()) {
299 return false;
300 }
301
302 // TODO(all): support dynamic block size
303
304 if (!AddBlockReadLock(block_info->block_index_)) {
305 return false;
306 }
307 uint64_t block_num = state_->struct_.block_num_.load();
308 uint64_t block_size = state_->struct_.message_size_.load();
309
310 block_info->block_ = &blocks_[block_info->block_index_];
311 block_info->block_buffer_address_ = reinterpret_cast<void*>(
312 reinterpret_cast<uint64_t>(shm_address_) + sizeof(ArenaSegmentState) +
313 block_num * sizeof(ArenaSegmentBlock) +
314 block_info->block_index_ * block_size);
315 return true;
316}
struct apollo::cyber::transport::ArenaSegmentState::@3 struct_

◆ AcquireBlockToWrite()

bool apollo::cyber::transport::ArenaSegment::AcquireBlockToWrite ( uint64_t  size,
ArenaSegmentBlockInfo block_info 
)

在文件 protobuf_arena_manager.cc257 行定义.

258 {
259 if (!block_info) {
260 return false;
261 }
262 if (!state_ || !blocks_) {
263 return false;
264 }
265
266 // TODO(all): support dynamic block size
267
268 uint64_t block_num = state_->struct_.block_num_.load();
269 uint64_t block_size = state_->struct_.message_size_.load();
270 uint64_t block_index = GetNextWritableBlockIndex();
271 block_info->block_index_ = block_index;
272 block_info->block_ = &blocks_[block_index];
273 block_info->block_buffer_address_ = reinterpret_cast<void*>(
274 reinterpret_cast<uint64_t>(shm_address_) + sizeof(ArenaSegmentState) +
275 block_num * sizeof(ArenaSegmentBlock) + block_index * block_size);
276 return true;
277}

◆ AddBlockReadLock()

bool apollo::cyber::transport::ArenaSegment::AddBlockReadLock ( uint64_t  block_index)

在文件 protobuf_arena_manager.cc216 行定义.

216 {
217 // when multiple readers are reading an arena channel
218 // at the same time, using write locks can cause one
219 // or more readers to hang at the lock, whereas
220 // read locks do not have this problem
221 // base::ReadLockGuard<base::PthreadRWLock> lock(
222 // blocks_[block_index].read_write_mutex_);
223 // if (blocks_[block_index].writing_ref_count_.load() > 0) {
224 // return false;
225 // }
226 // blocks_[block_index].reading_ref_count_.fetch_add(1);
227 auto& block = blocks_[block_index];
228 int32_t lock_num = block.lock_num_.load();
229 if (lock_num < ArenaSegmentBlock::kRWLockFree) {
230 AINFO << "block is being written.";
231 return false;
232 }
233
234 int32_t try_times = 0;
235 while (!block.lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
236 std::memory_order_acq_rel,
237 std::memory_order_relaxed)) {
238 ++try_times;
239 if (try_times == ArenaSegmentBlock::kMaxTryLockTimes) {
240 AINFO << "fail to add read lock num, curr num: " << lock_num;
241 return false;
242 }
243
244 lock_num = block.lock_num_.load();
245 if (lock_num < ArenaSegmentBlock::kRWLockFree) {
246 AINFO << "block is being written.";
247 return false;
248 }
249 }
250 return true;
251}
#define AINFO
Definition log.h:42

◆ AddBlockWriteLock()

bool apollo::cyber::transport::ArenaSegment::AddBlockWriteLock ( uint64_t  block_index)

在文件 protobuf_arena_manager.cc191 行定义.

191 {
192 // base::WriteLockGuard<base::PthreadRWLock> lock(
193 // blocks_[block_index].read_write_mutex_);
194 // if (blocks_[block_index].writing_ref_count_.load() > 0) {
195 // return false;
196 // }
197 // if (blocks_[block_index].reading_ref_count_.load() > 0) {
198 // return false;
199 // }
200 // blocks_[block_index].writing_ref_count_.fetch_add(1);
201 auto& block = blocks_[block_index];
202 int32_t rw_lock_free = ArenaSegmentBlock::kRWLockFree;
203 if (!block.lock_num_.compare_exchange_weak(
205 std::memory_order_acq_rel, std::memory_order_relaxed)) {
206 ADEBUG << "lock num: " << block.lock_num_.load();
207 return false;
208 }
209 return true;
210}
#define ADEBUG
Definition log.h:41

◆ GetNextWritableBlockIndex()

uint64_t apollo::cyber::transport::ArenaSegment::GetNextWritableBlockIndex ( )

在文件 protobuf_arena_manager.cc180 行定义.

180 {
181 const auto block_num = state_->struct_.block_num_.load();
182 while (1) {
183 uint64_t next_idx = state_->struct_.message_seq_.fetch_add(1) % block_num;
184 if (AddBlockWriteLock(next_idx)) {
185 return next_idx;
186 }
187 }
188 return 0;
189}

◆ GetShmAddress()

void * apollo::cyber::transport::ArenaSegment::GetShmAddress ( )

在文件 protobuf_arena_manager.cc178 行定义.

178{ return shm_address_; }

◆ Init()

bool apollo::cyber::transport::ArenaSegment::Init ( uint64_t  message_size,
uint64_t  block_num 
)

在文件 protobuf_arena_manager.cc62 行定义.

62 {
63 uint64_t key_id = std::hash<std::string>{}("/apollo/__arena__/" +
64 std::to_string(channel_id_));
65 // fprintf(stderr, "channel_id: %lx, key_id: %lx\n", channel_id_, key_id);
66
67 for (uint32_t retry = 0; retry < 2 && !OpenOrCreate(message_size, block_num);
68 ++retry) {
69 }
70
71 return true;
72}
bool OpenOrCreate(uint64_t message_size, uint64_t block_num)

◆ Open()

bool apollo::cyber::transport::ArenaSegment::Open ( uint64_t  message_size,
uint64_t  block_num 
)

在文件 protobuf_arena_manager.cc137 行定义.

137 {
138 auto arena_conf =
139 cyber::common::GlobalData::Instance()->GetChannelArenaConf(channel_id_);
140 auto shared_buffer_size = arena_conf.shared_buffer_size();
141 auto shmid = shmget(static_cast<key_t>(key_id_), 0, 0644);
142 if (shmid == -1) {
143 // shm not exist
144 return false;
145 }
146 shm_address_ = shmat(shmid, base_address_, 0);
147 if (shm_address_ == reinterpret_cast<void*>(-1)) {
148 // shmat failed
149 return false;
150 }
152 state_ = reinterpret_cast<ArenaSegmentState*>(shm_address_);
153 state_->struct_.ref_count_.fetch_add(1);
154 blocks_ = reinterpret_cast<ArenaSegmentBlock*>(
155 reinterpret_cast<uint64_t>(shm_address_) + sizeof(ArenaSegmentState));
156
157 arenas_.resize(block_num, nullptr);
158 if (shared_buffer_size == 0) {
159 shared_buffer_arena_ = nullptr;
160 } else {
161 google::protobuf::ArenaOptions options;
162 options.start_block_size = shared_buffer_size;
163 options.max_block_size = shared_buffer_size;
164 options.initial_block = reinterpret_cast<char*>(
165 reinterpret_cast<uint64_t>(shm_address_) + sizeof(ArenaSegmentState) +
166 block_num * sizeof(ArenaSegmentBlock) + block_num * message_size);
167 options.initial_block_size = shared_buffer_size;
168 shared_buffer_arena_ = std::make_shared<google::protobuf::Arena>(options);
169 }
170 for (size_t i = 0; i < block_num; i++) {
171 arena_block_address_.push_back(
172 reinterpret_cast<uint64_t>(shm_address_) + sizeof(ArenaSegmentState) +
173 block_num * sizeof(ArenaSegmentBlock) + i * message_size);
174 }
175 return true;
176}
std::vector< std::shared_ptr< google::protobuf::Arena > > arenas_
std::shared_ptr< google::protobuf::Arena > shared_buffer_arena_

◆ OpenOrCreate()

bool apollo::cyber::transport::ArenaSegment::OpenOrCreate ( uint64_t  message_size,
uint64_t  block_num 
)

在文件 protobuf_arena_manager.cc74 行定义.

74 {
75 auto arena_conf =
76 cyber::common::GlobalData::Instance()->GetChannelArenaConf(channel_id_);
77 auto shared_buffer_size = arena_conf.shared_buffer_size();
78 auto size = sizeof(ArenaSegmentState) +
79 sizeof(ArenaSegmentBlockDescriptor) * block_num +
80 message_size * block_num + shared_buffer_size;
81 auto shmid =
82 shmget(static_cast<key_t>(key_id_), size, 0644 | IPC_CREAT | IPC_EXCL);
83 if (shmid == -1) {
84 if (errno == EINVAL) {
85 // TODO(all): need larger space, recreate
86 } else if (errno == EEXIST) {
87 // TODO(all): shm already exist, open only
88 return Open(message_size, block_num);
89 } else {
90 // create or open shm failed
91 return false;
92 }
93 }
95 shm_address_ = shmat(shmid, base_address_, 0);
96 if (shm_address_ == reinterpret_cast<void*>(-1)) {
97 // shmat failed
98 return false;
99 }
100
101 arenas_.resize(block_num, nullptr);
102 if (shared_buffer_size == 0) {
103 shared_buffer_arena_ = nullptr;
104 } else {
105 google::protobuf::ArenaOptions options;
106 options.start_block_size = shared_buffer_size;
107 options.max_block_size = shared_buffer_size;
108 options.initial_block = reinterpret_cast<char*>(
109 reinterpret_cast<uint64_t>(shm_address_) + sizeof(ArenaSegmentState) +
110 block_num * sizeof(ArenaSegmentBlock) + block_num * message_size);
111 options.initial_block_size = shared_buffer_size;
112 shared_buffer_arena_ = std::make_shared<google::protobuf::Arena>(options);
113 }
114 for (size_t i = 0; i < block_num; i++) {
115 arena_block_address_.push_back(
116 reinterpret_cast<uint64_t>(shm_address_) + sizeof(ArenaSegmentState) +
117 block_num * sizeof(ArenaSegmentBlock) + i * message_size);
118 }
119
120 state_ = reinterpret_cast<ArenaSegmentState*>(shm_address_);
121 state_->struct_.ref_count_.store(1);
122 state_->struct_.auto_extended_.store(false);
124 state_->struct_.block_num_.store(block_num);
125 state_->struct_.message_seq_.store(0);
126 blocks_ = reinterpret_cast<ArenaSegmentBlock*>(
127 reinterpret_cast<uint64_t>(shm_address_) + sizeof(ArenaSegmentState));
128 for (uint64_t i = 0; i < block_num; ++i) {
130 // blocks_[i].writing_ref_count_.store(0);
131 // blocks_[i].reading_ref_count_.store(0);
132 blocks_[i].lock_num_.store(0);
133 }
134 return true;
135}
bool Open(uint64_t message_size, uint64_t block_num)

◆ ReleaseReadBlock()

void apollo::cyber::transport::ArenaSegment::ReleaseReadBlock ( const ArenaSegmentBlockInfo block_info)

在文件 protobuf_arena_manager.cc318 行定义.

318 {
319 if (!state_ || !blocks_) {
320 return;
321 }
322 if (block_info.block_index_ >= state_->struct_.block_num_.load()) {
323 return;
324 }
325 RemoveBlockReadLock(block_info.block_index_);
326}

◆ ReleaseWrittenBlock()

void apollo::cyber::transport::ArenaSegment::ReleaseWrittenBlock ( const ArenaSegmentBlockInfo block_info)

在文件 protobuf_arena_manager.cc279 行定义.

280 {
281 if (!state_ || !blocks_) {
282 return;
283 }
284 if (block_info.block_index_ >= state_->struct_.block_num_.load()) {
285 return;
286 }
287 RemoveBlockWriteLock(block_info.block_index_);
288}

◆ RemoveBlockReadLock()

void apollo::cyber::transport::ArenaSegment::RemoveBlockReadLock ( uint64_t  block_index)

在文件 protobuf_arena_manager.cc253 行定义.

253 {
254 blocks_[block_index].lock_num_.fetch_sub(1);
255}

◆ RemoveBlockWriteLock()

void apollo::cyber::transport::ArenaSegment::RemoveBlockWriteLock ( uint64_t  block_index)

在文件 protobuf_arena_manager.cc212 行定义.

212 {
213 blocks_[block_index].lock_num_.fetch_add(1);
214}

类成员变量说明

◆ arena_block_address_

std::vector<uint64_t> apollo::cyber::transport::ArenaSegment::arena_block_address_

在文件 protobuf_arena_manager.h112 行定义.

◆ arena_buffer_address_

void* apollo::cyber::transport::ArenaSegment::arena_buffer_address_ = nullptr

在文件 protobuf_arena_manager.h118 行定义.

◆ arenas_

std::vector<std::shared_ptr<google::protobuf::Arena> > apollo::cyber::transport::ArenaSegment::arenas_

在文件 protobuf_arena_manager.h111 行定义.

◆ base_address_

void* apollo::cyber::transport::ArenaSegment::base_address_

在文件 protobuf_arena_manager.h115 行定义.

◆ blocks_

ArenaSegmentBlock* apollo::cyber::transport::ArenaSegment::blocks_

在文件 protobuf_arena_manager.h110 行定义.

◆ channel_id_

uint64_t apollo::cyber::transport::ArenaSegment::channel_id_

在文件 protobuf_arena_manager.h113 行定义.

◆ key_id_

uint64_t apollo::cyber::transport::ArenaSegment::key_id_

在文件 protobuf_arena_manager.h114 行定义.

◆ message_capacity_

uint64_t apollo::cyber::transport::ArenaSegment::message_capacity_

在文件 protobuf_arena_manager.h120 行定义.

◆ shared_buffer_arena_

std::shared_ptr<google::protobuf::Arena> apollo::cyber::transport::ArenaSegment::shared_buffer_arena_

在文件 protobuf_arena_manager.h117 行定义.

◆ shm_address_

void* apollo::cyber::transport::ArenaSegment::shm_address_

在文件 protobuf_arena_manager.h116 行定义.

◆ state_

ArenaSegmentState* apollo::cyber::transport::ArenaSegment::state_

在文件 protobuf_arena_manager.h109 行定义.


该类的文档由以下文件生成: