32ConditionNotifier::ConditionNotifier() {
33 key_ =
static_cast<key_t
>(
Hash(
"/apollo/cyber/transport/shm/notifier"));
34 ADEBUG <<
"condition notifier key: " << key_;
35 shm_size_ =
sizeof(Indicator);
38 AERROR <<
"fail to init condition notifier.";
39 is_shutdown_.store(
true);
42 next_seq_ = indicator_->next_seq.load();
43 ADEBUG <<
"next_seq: " << next_seq_;
49 if (is_shutdown_.exchange(
true)) {
53 std::this_thread::sleep_for(std::chrono::milliseconds(100));
58 if (is_shutdown_.load()) {
59 ADEBUG <<
"notifier is shutdown.";
63 uint64_t seq = indicator_->next_seq.fetch_add(1);
65 indicator_->infos[idx] = info;
66 indicator_->seqs[idx] = seq;
72 if (info ==
nullptr) {
77 if (is_shutdown_.load()) {
78 ADEBUG <<
"notifier is shutdown.";
82 int timeout_us = timeout_ms * 1000;
83 while (!is_shutdown_.load()) {
84 uint64_t seq = indicator_->next_seq.load();
85 if (seq != next_seq_) {
87 auto actual_seq = indicator_->seqs[idx];
88 if (actual_seq >= next_seq_) {
89 next_seq_ = actual_seq;
90 *info = indicator_->infos[idx];
94 ADEBUG <<
"seq[" << next_seq_ <<
"] is writing, can not read now.";
99 std::this_thread::sleep_for(std::chrono::microseconds(50));
108bool ConditionNotifier::Init() {
return OpenOrCreate(); }
110bool ConditionNotifier::OpenOrCreate() {
115 shmid = shmget(key_, shm_size_, 0644 | IPC_CREAT | IPC_EXCL);
120 if (EINVAL == errno) {
121 AINFO <<
"need larger space, recreate.";
125 }
else if (EEXIST == errno) {
126 ADEBUG <<
"shm already exist, open only.";
134 AERROR <<
"create shm failed, error code: " << strerror(errno);
139 managed_shm_ = shmat(shmid,
nullptr, 0);
140 if (managed_shm_ ==
reinterpret_cast<void*
>(-1)) {
141 AERROR <<
"attach shm failed.";
142 shmctl(shmid, IPC_RMID, 0);
147 indicator_ =
new (managed_shm_) Indicator();
148 if (indicator_ ==
nullptr) {
149 AERROR <<
"create indicator failed.";
151 managed_shm_ =
nullptr;
152 shmctl(shmid, IPC_RMID, 0);
156 ADEBUG <<
"open or create true.";
160bool ConditionNotifier::OpenOnly() {
162 int shmid = shmget(key_, 0, 0644);
164 AERROR <<
"get shm failed, error: " << strerror(errno);
169 managed_shm_ = shmat(shmid,
nullptr, 0);
170 if (managed_shm_ ==
reinterpret_cast<void*
>(-1)) {
171 AERROR <<
"attach shm failed, error: " << strerror(errno);
176 indicator_ =
reinterpret_cast<Indicator*
>(managed_shm_);
177 if (indicator_ ==
nullptr) {
178 AERROR <<
"get indicator failed.";
180 managed_shm_ =
nullptr;
188bool ConditionNotifier::Remove() {
189 int shmid = shmget(key_, 0, 0644);
190 if (shmid == -1 || shmctl(shmid, IPC_RMID, 0) == -1) {
191 AERROR <<
"remove shm failed, error code: " << strerror(errno);
194 ADEBUG <<
"remove success.";
199void ConditionNotifier::Reset() {
200 indicator_ =
nullptr;
201 if (managed_shm_ !=
nullptr) {
203 managed_shm_ =
nullptr;
virtual ~ConditionNotifier()
bool Listen(int timeout_ms, ReadableInfo *info) override
bool Notify(const ReadableInfo &info) override
std::size_t Hash(const std::string &key)
const uint32_t kBufLength