Apollo 10.0
自动驾驶开放平台
posix_segment.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <fcntl.h>
20#include <sys/mman.h>
21#include <sys/stat.h>
22
23#include "cyber/common/log.h"
24#include "cyber/common/util.h"
28
29namespace apollo {
30namespace cyber {
31namespace transport {
32
33PosixSegment::PosixSegment(uint64_t channel_id) : Segment(channel_id) {
34 shm_name_ = std::to_string(channel_id);
35}
36
38
39bool PosixSegment::OpenOrCreate() {
40 if (init_) {
41 return true;
42 }
43
44 // create managed_shm_
45 int fd = shm_open(shm_name_.c_str(), O_RDWR | O_CREAT | O_EXCL, 0644);
46 if (fd < 0) {
47 if (EEXIST == errno) {
48 ADEBUG << "shm already exist, open only.";
49 return OpenOnly();
50 } else {
51 AERROR << "create shm failed, error: " << strerror(errno);
52 return false;
53 }
54 }
55
56 if (ftruncate(fd, conf_.managed_shm_size()) < 0) {
57 AERROR << "ftruncate failed: " << strerror(errno);
58 close(fd);
59 return false;
60 }
61
62 // attach managed_shm_
63 managed_shm_ = mmap(nullptr, conf_.managed_shm_size(), PROT_READ | PROT_WRITE,
64 MAP_SHARED, fd, 0);
65 if (managed_shm_ == MAP_FAILED) {
66 AERROR << "attach shm failed:" << strerror(errno);
67 close(fd);
68 shm_unlink(shm_name_.c_str());
69 return false;
70 }
71
72 close(fd);
73
74 // create field state_
76 if (state_ == nullptr) {
77 AERROR << "create state failed.";
79 managed_shm_ = nullptr;
80 shm_unlink(shm_name_.c_str());
81 return false;
82 }
83
85
86 // create field blocks_
87 blocks_ = new (static_cast<char*>(managed_shm_) + sizeof(State))
88 Block[conf_.block_num()];
89 if (blocks_ == nullptr) {
90 AERROR << "create blocks failed.";
91 state_->~State();
92 state_ = nullptr;
94 managed_shm_ = nullptr;
95 shm_unlink(shm_name_.c_str());
96 return false;
97 }
98
99 // create field arena_blocks_
100 arena_blocks_ = new (static_cast<char*>(managed_shm_) + sizeof(State) + \
101 conf_.block_num() * sizeof(Block)) Block[
103 if (arena_blocks_ == nullptr) {
104 AERROR << "create blocks failed.";
105 state_->~State();
106 state_ = nullptr;
108 managed_shm_ = nullptr;
109 shm_unlink(shm_name_.c_str());
110 return false;
111 }
112
113 // create block buf
114 uint32_t i = 0;
115 for (; i < conf_.block_num(); ++i) {
116 uint8_t* addr =
117 new (static_cast<char*>(managed_shm_) + sizeof(State) + \
118 conf_.block_num() * sizeof(Block) + \
119 ShmConf::ARENA_BLOCK_NUM * sizeof(Block) + \
120 i * conf_.block_buf_size()) uint8_t[conf_.block_buf_size()];
121
122 if (addr == nullptr) {
123 break;
124 }
125
126 std::lock_guard<std::mutex> lg(block_buf_lock_);
127 block_buf_addrs_[i] = addr;
128 }
129
130 // create arena block buf
131 uint32_t ai = 0;
132 for (; ai < ShmConf::ARENA_BLOCK_NUM; ++ai) {
133 uint8_t* addr = \
134 new(static_cast<char*>(managed_shm_) + sizeof(State) + \
135 conf_.block_num() * sizeof(Block) + \
136 ShmConf::ARENA_BLOCK_NUM * sizeof(Block) + \
138 ai * ShmConf::ARENA_MESSAGE_SIZE) uint8_t[
140 if (addr == nullptr) {
141 break;
142 }
143
144 std::lock_guard<std::mutex> _g(arena_block_buf_lock_);
145 arena_block_buf_addrs_[ai] = addr;
146 }
147
148 if (ai != ShmConf::ARENA_BLOCK_NUM || i != conf_.block_num()) {
149 AERROR << "create arena block buf failed.";
150 state_->~State();
151 state_ = nullptr;
152 blocks_ = nullptr;
153 arena_blocks_ = nullptr;
154 {
155 std::lock_guard<std::mutex> lg(block_buf_lock_);
156 block_buf_addrs_.clear();
157 }
158 {
159 std::lock_guard<std::mutex> lg(arena_block_buf_lock_);
161 }
163 managed_shm_ = nullptr;
164 shm_unlink(shm_name_.c_str());
165 return false;
166 }
167
169 init_ = true;
170 return true;
171}
172
173bool PosixSegment::OpenOnly() {
174 if (init_) {
175 return true;
176 }
177
178 // get managed_shm_
179 int fd = shm_open(shm_name_.c_str(), O_RDWR, 0644);
180 if (fd == -1) {
181 AERROR << "get shm failed: " << strerror(errno);
182 return false;
183 }
184
185 struct stat file_attr;
186 if (fstat(fd, &file_attr) < 0) {
187 AERROR << "fstat failed: " << strerror(errno);
188 close(fd);
189 return false;
190 }
191
192 // attach managed_shm_
193 managed_shm_ = mmap(nullptr, file_attr.st_size, PROT_READ | PROT_WRITE,
194 MAP_SHARED, fd, 0);
195 if (managed_shm_ == MAP_FAILED) {
196 AERROR << "attach shm failed: " << strerror(errno);
197 close(fd);
198 return false;
199 }
200
201 close(fd);
202 // get field state_
203 state_ = reinterpret_cast<State*>(managed_shm_);
204 if (state_ == nullptr) {
205 AERROR << "get state failed.";
206 munmap(managed_shm_, file_attr.st_size);
207 managed_shm_ = nullptr;
208 return false;
209 }
210
212
213 // get field blocks_
214 blocks_ = reinterpret_cast<Block*>(static_cast<char*>(managed_shm_) +
215 sizeof(State));
216 if (blocks_ == nullptr) {
217 AERROR << "get blocks failed.";
218 state_ = nullptr;
220 managed_shm_ = nullptr;
221 return false;
222 }
223
224 // get field arena_blocks_
225 arena_blocks_ = reinterpret_cast<Block*>(
226 static_cast<char*>(managed_shm_) + sizeof(State) +
227 sizeof(Block) * conf_.block_num());
228 if (blocks_ == nullptr) {
229 AERROR << "get arena blocks failed.";
230 state_ = nullptr;
232 managed_shm_ = nullptr;
233 return false;
234 }
235
236 // get block buf
237 uint32_t i = 0;
238 for (; i < conf_.block_num(); ++i) {
239 uint8_t* addr = reinterpret_cast<uint8_t*>(
240 static_cast<char*>(managed_shm_) + sizeof(State) +
241 conf_.block_num() * sizeof(Block) +
242 ShmConf::ARENA_BLOCK_NUM * sizeof(Block) +
243 i * conf_.block_buf_size());
244
245 if (addr == nullptr) {
246 break;
247 }
248 std::lock_guard<std::mutex> lg(block_buf_lock_);
249 block_buf_addrs_[i] = addr;
250 }
251
252 // get arena block buf
253 uint32_t ai = 0;
254 for (; i < ShmConf::ARENA_BLOCK_NUM; ++ai) {
255 uint8_t* addr = reinterpret_cast<uint8_t*>(
256 static_cast<char*>(managed_shm_) + sizeof(State) + \
257 conf_.block_num() * sizeof(Block) + ShmConf::ARENA_BLOCK_NUM * \
258 sizeof(Block) + conf_.block_num() * conf_.block_buf_size() + \
260
261 if (addr == nullptr) {
262 break;
263 }
264 std::lock_guard<std::mutex> _g(arena_block_buf_lock_);
265 arena_block_buf_addrs_[ai] = addr;
266 }
267
268 if (i != conf_.block_num() || ai != ShmConf::ARENA_BLOCK_NUM) {
269 AERROR << "open only failed.";
270 state_->~State();
271 state_ = nullptr;
272 blocks_ = nullptr;
273 arena_blocks_ = nullptr;
274 {
275 std::lock_guard<std::mutex> lg(block_buf_lock_);
276 block_buf_addrs_.clear();
277 }
278 {
279 std::lock_guard<std::mutex> lg(arena_block_buf_lock_);
281 }
283 managed_shm_ = nullptr;
284 shm_unlink(shm_name_.c_str());
285 return false;
286 }
287
289 init_ = true;
290 ADEBUG << "open only true.";
291 return true;
292}
293
294bool PosixSegment::Remove() {
295 if (shm_unlink(shm_name_.c_str()) < 0) {
296 AERROR << "shm_unlink failed: " << strerror(errno);
297 return false;
298 }
299 return true;
300}
301
302void PosixSegment::Reset() {
303 state_ = nullptr;
304 blocks_ = nullptr;
305 arena_blocks_ = nullptr;
306 {
307 std::lock_guard<std::mutex> lg(block_buf_lock_);
308 block_buf_addrs_.clear();
309 }
310 {
311 std::lock_guard<std::mutex> lg(arena_block_buf_lock_);
313 }
314 if (managed_shm_ != nullptr) {
316 managed_shm_ = nullptr;
317 return;
318 }
319}
320
321} // namespace transport
322} // namespace cyber
323} // namespace apollo
std::unordered_map< uint32_t, uint8_t * > block_buf_addrs_
Definition segment.h:90
std::unordered_map< uint32_t, uint8_t * > arena_block_buf_addrs_
Definition segment.h:91
static const uint32_t ARENA_BLOCK_NUM
Definition shm_conf.h:41
const uint64_t & ceiling_msg_size()
Definition shm_conf.h:35
const uint64_t & managed_shm_size()
Definition shm_conf.h:38
const uint32_t & block_num()
Definition shm_conf.h:37
static const uint64_t ARENA_MESSAGE_SIZE
Definition shm_conf.h:42
void Update(const uint64_t &real_msg_size)
Definition shm_conf.cc:30
const uint64_t & block_buf_size()
Definition shm_conf.h:36
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
class register implement
Definition arena_queue.h:37