Apollo 10.0
自动驾驶开放平台
segment.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include "cyber/common/log.h"
20#include "cyber/common/util.h"
22
23namespace apollo {
24namespace cyber {
25namespace transport {
26
27Segment::Segment(uint64_t channel_id)
28 : init_(false),
29 conf_(),
30 channel_id_(channel_id),
31 state_(nullptr),
32 blocks_(nullptr),
33 arena_blocks_(nullptr),
34 managed_shm_(nullptr),
35 block_buf_lock_(),
36 arena_block_buf_lock_(),
37 block_buf_addrs_(),
38 arena_block_buf_addrs_() {}
39
40bool Segment::AcquireBlockToWrite(std::size_t msg_size,
41 WritableBlock* writable_block) {
42 RETURN_VAL_IF_NULL(writable_block, false);
43 if (!init_ && !OpenOrCreate()) {
44 AERROR << "create shm failed, can't write now.";
45 return false;
46 }
47
48 bool result = true;
49 if (state_->need_remap()) {
50 result = Remap();
51 }
52
53 if (msg_size > conf_.ceiling_msg_size()) {
54 AINFO << "msg_size: " << msg_size
55 << " larger than current shm_buffer_size: "
56 << conf_.ceiling_msg_size() << " , need recreate.";
57 result = Recreate(msg_size);
58 }
59
60 if (!result) {
61 AERROR << "segment update failed.";
62 return false;
63 }
64
65 uint32_t index = GetNextWritableBlockIndex();
66 writable_block->index = index;
67 writable_block->block = &blocks_[index];
68 writable_block->buf = block_buf_addrs_[index];
69 return true;
70}
71
72bool Segment::AcquireArenaBlockToWrite(std::size_t msg_size,
73 WritableBlock* writable_block) {
74 RETURN_VAL_IF_NULL(writable_block, false);
75 if (!init_ && !OpenOrCreate()) {
76 AERROR << "create shm failed, can't write now.";
77 return false;
78 }
79
80 if (state_->need_remap()) {
81 Remap();
82 }
83
84 uint32_t index = GetNextArenaWritableBlockIndex();
85 writable_block->index = index;
86 writable_block->block = &arena_blocks_[index];
87 writable_block->buf = arena_block_buf_addrs_[index];
88 return true;
89}
90
91void Segment::ReleaseWrittenBlock(const WritableBlock& writable_block) {
92 auto index = writable_block.index;
93 if (index >= conf_.block_num()) {
94 return;
95 }
96 blocks_[index].ReleaseWriteLock();
97}
98
100 auto index = writable_block.index;
101 if (index >= ShmConf::ARENA_BLOCK_NUM) {
102 return;
103 }
104 arena_blocks_[index].ReleaseWriteLock();
105}
106
108 RETURN_VAL_IF_NULL(readable_block, false);
109 if (!init_ && !OpenOnly()) {
110 AERROR << "failed to open shared memory, can't read now.";
111 return false;
112 }
113
114 auto index = readable_block->index;
115 if (index >= conf_.block_num()) {
116 AERROR << "invalid block_index[" << index << "].";
117 return false;
118 }
119
120 bool result = true;
121 if (state_->need_remap()) {
122 result = Remap();
123 }
124
125 if (!result) {
126 AERROR << "segment update failed.";
127 return false;
128 }
129
130 if (!blocks_[index].TryLockForRead()) {
131 return false;
132 }
133 readable_block->block = blocks_ + index;
134 readable_block->buf = block_buf_addrs_[index];
135 return true;
136}
137
139 RETURN_VAL_IF_NULL(readable_block, false);
140 if (!init_ && !OpenOnly()) {
141 AERROR << "failed to open shared memory, can't read now.";
142 return false;
143 }
144
145 auto index = readable_block->index;
146 if (index >= ShmConf::ARENA_BLOCK_NUM) {
147 AERROR << "invalid arena block_index[" << index << "].";
148 return false;
149 }
150
151 bool result = true;
152 if (state_->need_remap()) {
153 result = Remap();
154 }
155
156 if (!result) {
157 AERROR << "segment update failed.";
158 return false;
159 }
160
161 if (!arena_blocks_[index].TryLockForRead()) {
162 return false;
163 }
164 readable_block->block = arena_blocks_ + index;
165 readable_block->buf = arena_block_buf_addrs_[index];
166 return true;
167}
168
170 auto index = readable_block.index;
171 if (index >= ShmConf::ARENA_BLOCK_NUM) {
172 return;
173 }
174 arena_blocks_[index].ReleaseReadLock();
175}
176
177void Segment::ReleaseReadBlock(const ReadableBlock& readable_block) {
178 auto index = readable_block.index;
179 if (index >= conf_.block_num()) {
180 return;
181 }
182 blocks_[index].ReleaseReadLock();
183}
184
186 if (init_) {
187 return true;
188 }
190 if (!OpenOrCreate()) {
191 return false;
192 }
193 return true;
194}
195
197
198bool Segment::LockBlockForWriteByIndex(uint64_t block_index) {
199 if (block_index >= conf_.block_num()) {
200 return false;
201 }
202 return blocks_[block_index].TryLockForWrite();
203}
204
205bool Segment::ReleaseBlockForWriteByIndex(uint64_t block_index) {
206 if (block_index >= conf_.block_num()) {
207 return false;
208 }
209 blocks_[block_index].ReleaseWriteLock();
210 return true;
211}
212
213bool Segment::LockBlockForReadByIndex(uint64_t block_index) {
214 if (block_index >= conf_.block_num()) {
215 return false;
216 }
217 return blocks_[block_index].TryLockForRead();
218}
219
220bool Segment::ReleaseBlockForReadByIndex(uint64_t block_index) {
221 if (block_index >= conf_.block_num()) {
222 return false;
223 }
224 blocks_[block_index].ReleaseReadLock();
225 return true;
226}
227
228bool Segment::LockArenaBlockForWriteByIndex(uint64_t block_index) {
229 if (block_index >= ShmConf::ARENA_BLOCK_NUM) {
230 return false;
231 }
232 return arena_blocks_[block_index].TryLockForWrite();
233}
234
236 if (block_index >= ShmConf::ARENA_BLOCK_NUM) {
237 return false;
238 }
239 arena_blocks_[block_index].ReleaseWriteLock();
240 return true;
241}
242
243bool Segment::LockArenaBlockForReadByIndex(uint64_t block_index) {
244 if (block_index >= ShmConf::ARENA_BLOCK_NUM) {
245 return false;
246 }
247 return arena_blocks_[block_index].TryLockForRead();
248}
249
251 if (block_index >= ShmConf::ARENA_BLOCK_NUM) {
252 return false;
253 }
254 arena_blocks_[block_index].ReleaseReadLock();
255 return true;
256}
257
259 if (!init_) {
260 return true;
261 }
262 init_ = false;
263
264 try {
266 uint32_t reference_counts = state_->reference_counts();
267 if (reference_counts == 0) {
268 return Remove();
269 }
270 } catch (...) {
271 AERROR << "exception.";
272 return false;
273 }
274 ADEBUG << "destroy.";
275 return true;
276}
277
278bool Segment::Remap() {
279 init_ = false;
280 ADEBUG << "before reset.";
281 Reset();
282 ADEBUG << "after reset.";
283 return OpenOnly();
284}
285
286bool Segment::Recreate(const uint64_t& msg_size) {
287 init_ = false;
288 state_->set_need_remap(true);
289 Reset();
290 Remove();
291 conf_.Update(msg_size);
292 return OpenOrCreate();
293}
294
295uint32_t Segment::GetNextWritableBlockIndex() {
296 const auto block_num = conf_.block_num();
297 while (1) {
298 uint32_t try_idx = state_->FetchAddSeq(1) % block_num;
299 if (blocks_[try_idx].TryLockForWrite()) {
300 return try_idx;
301 }
302 }
303 return 0;
304}
305
306uint32_t Segment::GetNextArenaWritableBlockIndex() {
307 const auto block_num = ShmConf::ARENA_BLOCK_NUM;
308 while (1) {
309 uint32_t try_idx = state_->FetchAddArenaSeq(1) % block_num;
310 if (arena_blocks_[try_idx].TryLockForWrite()) {
311 return try_idx;
312 }
313 }
314 return 0;
315}
316
317} // namespace transport
318} // namespace cyber
319} // namespace apollo
bool AcquireBlockToRead(ReadableBlock *readable_block)
Definition segment.cc:107
bool ReleaseArenaBlockForReadByIndex(uint64_t block_index)
Definition segment.cc:250
bool AcquireArenaBlockToWrite(std::size_t msg_size, WritableBlock *writable_block)
Definition segment.cc:72
Segment(uint64_t channel_id)
Definition segment.cc:27
bool LockArenaBlockForReadByIndex(uint64_t block_index)
Definition segment.cc:243
void ReleaseReadBlock(const ReadableBlock &readable_block)
Definition segment.cc:177
bool ReleaseBlockForReadByIndex(uint64_t block_index)
Definition segment.cc:220
bool InitOnly(uint64_t message_size)
Definition segment.cc:185
bool AcquireBlockToWrite(std::size_t msg_size, WritableBlock *writable_block)
Definition segment.cc:40
bool LockArenaBlockForWriteByIndex(uint64_t block_index)
Definition segment.cc:228
std::unordered_map< uint32_t, uint8_t * > block_buf_addrs_
Definition segment.h:90
bool LockBlockForWriteByIndex(uint64_t block_index)
Definition segment.cc:198
bool ReleaseArenaBlockForWriteByIndex(uint64_t block_index)
Definition segment.cc:235
bool AcquireArenaBlockToRead(ReadableBlock *readable_block)
Definition segment.cc:138
bool ReleaseBlockForWriteByIndex(uint64_t block_index)
Definition segment.cc:205
void ReleaseWrittenBlock(const WritableBlock &writable_block)
Definition segment.cc:91
bool LockBlockForReadByIndex(uint64_t block_index)
Definition segment.cc:213
std::unordered_map< uint32_t, uint8_t * > arena_block_buf_addrs_
Definition segment.h:91
void ReleaseArenaWrittenBlock(const WritableBlock &writable_block)
Definition segment.cc:99
void ReleaseArenaReadBlock(const ReadableBlock &readable_block)
Definition segment.cc:169
static const uint32_t ARENA_BLOCK_NUM
Definition shm_conf.h:41
const uint64_t & ceiling_msg_size()
Definition shm_conf.h:35
const uint32_t & block_num()
Definition shm_conf.h:37
void Update(const uint64_t &real_msg_size)
Definition shm_conf.cc:30
uint32_t FetchAddArenaSeq(uint32_t diff)
Definition state.h:49
void set_need_remap(bool need)
Definition state.h:54
uint32_t FetchAddSeq(uint32_t diff)
Definition state.h:46
int message_size
#define RETURN_VAL_IF_NULL(ptr, val)
Definition log.h:98
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
class register implement
Definition arena_queue.h:37