Apollo 10.0
自动驾驶开放平台
condition_notifier.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <sys/ipc.h>
20#include <sys/shm.h>
21#include <thread>
22
23#include "cyber/common/log.h"
24#include "cyber/common/util.h"
25
26namespace apollo {
27namespace cyber {
28namespace transport {
29
30using common::Hash;
31
32ConditionNotifier::ConditionNotifier() {
33 key_ = static_cast<key_t>(Hash("/apollo/cyber/transport/shm/notifier"));
34 ADEBUG << "condition notifier key: " << key_;
35 shm_size_ = sizeof(Indicator);
36
37 if (!Init()) {
38 AERROR << "fail to init condition notifier.";
39 is_shutdown_.store(true);
40 return;
41 }
42 next_seq_ = indicator_->next_seq.load();
43 ADEBUG << "next_seq: " << next_seq_;
44}
45
47
49 if (is_shutdown_.exchange(true)) {
50 return;
51 }
52
53 std::this_thread::sleep_for(std::chrono::milliseconds(100));
54 Reset();
55}
56
58 if (is_shutdown_.load()) {
59 ADEBUG << "notifier is shutdown.";
60 return false;
61 }
62
63 uint64_t seq = indicator_->next_seq.fetch_add(1);
64 uint64_t idx = seq % kBufLength;
65 indicator_->infos[idx] = info;
66 indicator_->seqs[idx] = seq;
67
68 return true;
69}
70
71bool ConditionNotifier::Listen(int timeout_ms, ReadableInfo* info) {
72 if (info == nullptr) {
73 AERROR << "info nullptr.";
74 return false;
75 }
76
77 if (is_shutdown_.load()) {
78 ADEBUG << "notifier is shutdown.";
79 return false;
80 }
81
82 int timeout_us = timeout_ms * 1000;
83 while (!is_shutdown_.load()) {
84 uint64_t seq = indicator_->next_seq.load();
85 if (seq != next_seq_) {
86 auto idx = next_seq_ % kBufLength;
87 auto actual_seq = indicator_->seqs[idx];
88 if (actual_seq >= next_seq_) {
89 next_seq_ = actual_seq;
90 *info = indicator_->infos[idx];
91 ++next_seq_;
92 return true;
93 } else {
94 ADEBUG << "seq[" << next_seq_ << "] is writing, can not read now.";
95 }
96 }
97
98 if (timeout_us > 0) {
99 std::this_thread::sleep_for(std::chrono::microseconds(50));
100 timeout_us -= 50;
101 } else {
102 return false;
103 }
104 }
105 return false;
106}
107
108bool ConditionNotifier::Init() { return OpenOrCreate(); }
109
110bool ConditionNotifier::OpenOrCreate() {
111 // create managed_shm_
112 int retry = 0;
113 int shmid = 0;
114 while (retry < 2) {
115 shmid = shmget(key_, shm_size_, 0644 | IPC_CREAT | IPC_EXCL);
116 if (shmid != -1) {
117 break;
118 }
119
120 if (EINVAL == errno) {
121 AINFO << "need larger space, recreate.";
122 Reset();
123 Remove();
124 ++retry;
125 } else if (EEXIST == errno) {
126 ADEBUG << "shm already exist, open only.";
127 return OpenOnly();
128 } else {
129 break;
130 }
131 }
132
133 if (shmid == -1) {
134 AERROR << "create shm failed, error code: " << strerror(errno);
135 return false;
136 }
137
138 // attach managed_shm_
139 managed_shm_ = shmat(shmid, nullptr, 0);
140 if (managed_shm_ == reinterpret_cast<void*>(-1)) {
141 AERROR << "attach shm failed.";
142 shmctl(shmid, IPC_RMID, 0);
143 return false;
144 }
145
146 // create indicator_
147 indicator_ = new (managed_shm_) Indicator();
148 if (indicator_ == nullptr) {
149 AERROR << "create indicator failed.";
150 shmdt(managed_shm_);
151 managed_shm_ = nullptr;
152 shmctl(shmid, IPC_RMID, 0);
153 return false;
154 }
155
156 ADEBUG << "open or create true.";
157 return true;
158}
159
160bool ConditionNotifier::OpenOnly() {
161 // get managed_shm_
162 int shmid = shmget(key_, 0, 0644);
163 if (shmid == -1) {
164 AERROR << "get shm failed, error: " << strerror(errno);
165 return false;
166 }
167
168 // attach managed_shm_
169 managed_shm_ = shmat(shmid, nullptr, 0);
170 if (managed_shm_ == reinterpret_cast<void*>(-1)) {
171 AERROR << "attach shm failed, error: " << strerror(errno);
172 return false;
173 }
174
175 // get indicator_
176 indicator_ = reinterpret_cast<Indicator*>(managed_shm_);
177 if (indicator_ == nullptr) {
178 AERROR << "get indicator failed.";
179 shmdt(managed_shm_);
180 managed_shm_ = nullptr;
181 return false;
182 }
183
184 ADEBUG << "open true.";
185 return true;
186}
187
188bool ConditionNotifier::Remove() {
189 int shmid = shmget(key_, 0, 0644);
190 if (shmid == -1 || shmctl(shmid, IPC_RMID, 0) == -1) {
191 AERROR << "remove shm failed, error code: " << strerror(errno);
192 return false;
193 }
194 ADEBUG << "remove success.";
195
196 return true;
197}
198
199void ConditionNotifier::Reset() {
200 indicator_ = nullptr;
201 if (managed_shm_ != nullptr) {
202 shmdt(managed_shm_);
203 managed_shm_ = nullptr;
204 }
205}
206
207} // namespace transport
208} // namespace cyber
209} // namespace apollo
bool Listen(int timeout_ms, ReadableInfo *info) override
bool Notify(const ReadableInfo &info) override
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
std::size_t Hash(const std::string &key)
Definition util.h:27
class register implement
Definition arena_queue.h:37