Apollo 10.0
自动驾驶开放平台
xsi_segment.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <sys/ipc.h>
20#include <sys/shm.h>
21#include <sys/types.h>
22
23#include "cyber/common/log.h"
24#include "cyber/common/util.h"
27
28namespace apollo {
29namespace cyber {
30namespace transport {
31
32XsiSegment::XsiSegment(uint64_t channel_id) : Segment(channel_id) {
33 key_ = static_cast<key_t>(channel_id);
34}
35
37
38bool XsiSegment::OpenOrCreate() {
39 if (init_) {
40 return true;
41 }
42
43 // create managed_shm_
44 int retry = 0;
45 int shmid = 0;
46 while (retry < 2) {
47 shmid = shmget(key_, conf_.managed_shm_size(), 0644 | IPC_CREAT | IPC_EXCL);
48 if (shmid != -1) {
49 break;
50 }
51
52 if (EINVAL == errno) {
53 AINFO << "need larger space, recreate.";
54 Reset();
55 Remove();
56 ++retry;
57 } else if (EEXIST == errno) {
58 ADEBUG << "shm already exist, open only.";
59 return OpenOnly();
60 } else {
61 break;
62 }
63 }
64
65 if (shmid == -1) {
66 AERROR << "create shm failed, error code: " << strerror(errno);
67 return false;
68 }
69
70 // attach managed_shm_
71 managed_shm_ = shmat(shmid, nullptr, 0);
72 if (managed_shm_ == reinterpret_cast<void*>(-1)) {
73 AERROR << "attach shm failed, error: " << strerror(errno);
74 shmctl(shmid, IPC_RMID, 0);
75 return false;
76 }
77
78 // create field state_
80 if (state_ == nullptr) {
81 AERROR << "create state failed.";
82 shmdt(managed_shm_);
83 managed_shm_ = nullptr;
84 shmctl(shmid, IPC_RMID, 0);
85 return false;
86 }
87
89
90 // create field blocks_
91 blocks_ = new (static_cast<char*>(managed_shm_) + sizeof(State))
92 Block[conf_.block_num()];
93 if (blocks_ == nullptr) {
94 AERROR << "create blocks failed.";
95 state_->~State();
96 state_ = nullptr;
97 shmdt(managed_shm_);
98 managed_shm_ = nullptr;
99 shmctl(shmid, IPC_RMID, 0);
100 return false;
101 }
102
103 // create field arena_blocks_
104 arena_blocks_ = new (static_cast<char*>(managed_shm_) + sizeof(State) + \
105 conf_.block_num() * sizeof(Block)) Block[
107 if (arena_blocks_ == nullptr) {
108 AERROR << "create arena blocks failed.";
109 state_->~State();
110 state_ = nullptr;
111 shmdt(managed_shm_);
112 managed_shm_ = nullptr;
113 shmctl(shmid, IPC_RMID, 0);
114 return false;
115 }
116
117 // create block buf
118 uint32_t i = 0;
119 for (; i < conf_.block_num(); ++i) {
120 uint8_t* addr = \
121 new (static_cast<char*>(managed_shm_) + sizeof(State) + \
122 conf_.block_num() * sizeof(Block) + \
123 ShmConf::ARENA_BLOCK_NUM * sizeof(Block) + \
124 i * conf_.block_buf_size()) uint8_t[conf_.block_buf_size()];
125 std::lock_guard<std::mutex> _g(block_buf_lock_);
126 block_buf_addrs_[i] = addr;
127 }
128
129 // create arena block buf
130 uint32_t ai = 0;
131 for (; ai < ShmConf::ARENA_BLOCK_NUM; ++ai) {
132 uint8_t* addr = \
133 new(static_cast<char*>(managed_shm_) + sizeof(State) + \
134 conf_.block_num() * sizeof(Block) + \
135 ShmConf::ARENA_BLOCK_NUM * sizeof(Block) + \
137 ai * ShmConf::ARENA_MESSAGE_SIZE) uint8_t[
139 std::lock_guard<std::mutex> _g(arena_block_buf_lock_);
140 arena_block_buf_addrs_[ai] = addr;
141 }
142
143 if (ai != ShmConf::ARENA_BLOCK_NUM || i != conf_.block_num()) {
144 AERROR << "create arena block or block buf failed.";
145 state_->~State();
146 state_ = nullptr;
147 blocks_ = nullptr;
148 arena_blocks_ = nullptr;
149 {
150 std::lock_guard<std::mutex> _g(block_buf_lock_);
151 block_buf_addrs_.clear();
152 }
153 {
154 std::lock_guard<std::mutex> _g(arena_block_buf_lock_);
156 }
157 shmdt(managed_shm_);
158 managed_shm_ = nullptr;
159 shmctl(shmid, IPC_RMID, 0);
160 return false;
161 }
162
164 init_ = true;
165 ADEBUG << "open or create true.";
166 return true;
167}
168
169bool XsiSegment::OpenOnly() {
170 if (init_) {
171 return true;
172 }
173
174 // get managed_shm_
175 int shmid = shmget(key_, 0, 0644);
176 if (shmid == -1) {
177 AERROR << "get shm failed. error: " << strerror(errno);
178 return false;
179 }
180
181 // attach managed_shm_
182 managed_shm_ = shmat(shmid, nullptr, 0);
183 if (managed_shm_ == reinterpret_cast<void*>(-1)) {
184 AERROR << "attach shm failed, error: " << strerror(errno);
185 return false;
186 }
187
188 // get field state_
189 state_ = reinterpret_cast<State*>(managed_shm_);
190 if (state_ == nullptr) {
191 AERROR << "get state failed.";
192 shmdt(managed_shm_);
193 managed_shm_ = nullptr;
194 return false;
195 }
196
198
199 // get field blocks_
200 blocks_ = reinterpret_cast<Block*>(static_cast<char*>(managed_shm_) +
201 sizeof(State));
202 if (blocks_ == nullptr) {
203 AERROR << "get blocks failed.";
204 state_ = nullptr;
205 shmdt(managed_shm_);
206 managed_shm_ = nullptr;
207 return false;
208 }
209
210 // get field arena_blocks_
211 arena_blocks_ = reinterpret_cast<Block*>(
212 static_cast<char*>(managed_shm_) + sizeof(State) + \
213 sizeof(Block) * conf_.block_num());
214 if (arena_blocks_ == nullptr) {
215 AERROR << "get blocks failed.";
216 state_ = nullptr;
217 shmdt(managed_shm_);
218 managed_shm_ = nullptr;
219 return false;
220 }
221
222 // get block buf
223 uint32_t i = 0;
224 for (; i < conf_.block_num(); ++i) {
225 uint8_t* addr = reinterpret_cast<uint8_t*>(
226 static_cast<char*>(managed_shm_) + sizeof(State) + \
227 conf_.block_num() * sizeof(Block) + \
228 ShmConf::ARENA_BLOCK_NUM * sizeof(Block) + \
229 i * conf_.block_buf_size());
230
231 if (addr == nullptr) {
232 break;
233 }
234 std::lock_guard<std::mutex> _g(block_buf_lock_);
235 block_buf_addrs_[i] = addr;
236 }
237
238 // get arena block buf
239 uint32_t ai = 0;
240 for (; ai < ShmConf::ARENA_BLOCK_NUM; ++ai) {
241 uint8_t* addr = reinterpret_cast<uint8_t*>(
242 static_cast<char*>(managed_shm_) + sizeof(State) + \
243 conf_.block_num() * sizeof(Block) + ShmConf::ARENA_BLOCK_NUM * \
244 sizeof(Block) + conf_.block_num() * conf_.block_buf_size() + \
246
247 if (addr == nullptr) {
248 break;
249 }
250 std::lock_guard<std::mutex> _g(arena_block_buf_lock_);
251 arena_block_buf_addrs_[ai] = addr;
252 }
253
254 if (i != conf_.block_num() || ai != ShmConf::ARENA_BLOCK_NUM) {
255 AERROR << "open only failed.";
256 state_->~State();
257 state_ = nullptr;
258 blocks_ = nullptr;
259 arena_blocks_ = nullptr;
260 {
261 std::lock_guard<std::mutex> _g(block_buf_lock_);
262 block_buf_addrs_.clear();
263 }
264 {
265 std::lock_guard<std::mutex> _g(arena_block_buf_lock_);
267 }
268 shmdt(managed_shm_);
269 managed_shm_ = nullptr;
270 shmctl(shmid, IPC_RMID, 0);
271 return false;
272 }
273
275 init_ = true;
276 ADEBUG << "open only true.";
277 return true;
278}
279
280bool XsiSegment::Remove() {
281 int shmid = shmget(key_, 0, 0644);
282 if (shmid == -1 || shmctl(shmid, IPC_RMID, 0) == -1) {
283 AERROR << "remove shm failed, error code: " << strerror(errno);
284 return false;
285 }
286 ADEBUG << "remove success.";
287 return true;
288}
289
290void XsiSegment::Reset() {
291 state_ = nullptr;
292 blocks_ = nullptr;
293 arena_blocks_ = nullptr;
294 {
295 std::lock_guard<std::mutex> _g(block_buf_lock_);
296 block_buf_addrs_.clear();
297 }
298 {
299 std::lock_guard<std::mutex> _g(arena_block_buf_lock_);
301 }
302 if (managed_shm_ != nullptr) {
303 shmdt(managed_shm_);
304 managed_shm_ = nullptr;
305 return;
306 }
307}
308
309} // namespace transport
310} // namespace cyber
311} // namespace apollo
std::unordered_map< uint32_t, uint8_t * > block_buf_addrs_
Definition segment.h:90
std::unordered_map< uint32_t, uint8_t * > arena_block_buf_addrs_
Definition segment.h:91
static const uint32_t ARENA_BLOCK_NUM
Definition shm_conf.h:41
const uint64_t & ceiling_msg_size()
Definition shm_conf.h:35
const uint64_t & managed_shm_size()
Definition shm_conf.h:38
const uint32_t & block_num()
Definition shm_conf.h:37
static const uint64_t ARENA_MESSAGE_SIZE
Definition shm_conf.h:42
void Update(const uint64_t &real_msg_size)
Definition shm_conf.cc:30
const uint64_t & block_buf_size()
Definition shm_conf.h:36
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
class register implement
Definition arena_queue.h:37
Definition future.h:29