Apollo 10.0
自动驾驶开放平台
intra_reader.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_BLOCKER_INTRA_READER_H_
18#define CYBER_BLOCKER_INTRA_READER_H_
19
20#include <functional>
21#include <list>
22#include <memory>
23
25#include "cyber/common/log.h"
26#include "cyber/node/reader.h"
27#include "cyber/time/time.h"
28
29namespace apollo {
30namespace cyber {
31namespace blocker {
32
33template <typename MessageT>
34class IntraReader : public apollo::cyber::Reader<MessageT> {
35 public:
36 using MessagePtr = std::shared_ptr<MessageT>;
37 using Callback = std::function<void(const std::shared_ptr<MessageT>&)>;
38 using Iterator =
39 typename std::list<std::shared_ptr<MessageT>>::const_iterator;
40
41 IntraReader(const proto::RoleAttributes& attr, const Callback& callback);
42 virtual ~IntraReader();
43
44 bool Init() override;
45 void Shutdown() override;
46
47 void ClearData() override;
48 void Observe() override;
49 bool Empty() const override;
50 bool HasReceived() const override;
51
52 void Enqueue(const std::shared_ptr<MessageT>& msg) override;
53 void SetHistoryDepth(const uint32_t& depth) override;
54 uint32_t GetHistoryDepth() const override;
55 std::shared_ptr<MessageT> GetLatestObserved() const override;
56 std::shared_ptr<MessageT> GetOldestObserved() const override;
57 Iterator Begin() const override;
58 Iterator End() const override;
59
60 private:
61 void OnMessage(const MessagePtr& msg_ptr);
62
63 Callback msg_callback_;
64};
65
66template <typename MessageT>
68 const Callback& callback)
69 : Reader<MessageT>(attr), msg_callback_(callback) {}
70
71template <typename MessageT>
75
76template <typename MessageT>
78 if (this->init_.exchange(true)) {
79 return true;
80 }
81 return BlockerManager::Instance()->Subscribe<MessageT>(
82 this->role_attr_.channel_name(), this->role_attr_.qos_profile().depth(),
83 this->role_attr_.node_name(),
84 std::bind(&IntraReader<MessageT>::OnMessage, this,
85 std::placeholders::_1));
86}
87
88template <typename MessageT>
90 if (!this->init_.exchange(false)) {
91 return;
92 }
93 BlockerManager::Instance()->Unsubscribe<MessageT>(
94 this->role_attr_.channel_name(), this->role_attr_.node_name());
95}
96
97template <typename MessageT>
99 auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
100 this->role_attr_.channel_name());
101 if (blocker != nullptr) {
102 blocker->ClearObserved();
103 blocker->ClearPublished();
104 }
105}
106
107template <typename MessageT>
109 auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
110 this->role_attr_.channel_name());
111 if (blocker != nullptr) {
112 blocker->Observe();
113 }
114}
115
116template <typename MessageT>
118 auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
119 this->role_attr_.channel_name());
120 if (blocker != nullptr) {
121 return blocker->IsObservedEmpty();
122 }
123 return true;
124}
125
126template <typename MessageT>
128 auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
129 this->role_attr_.channel_name());
130 if (blocker != nullptr) {
131 return !blocker->IsPublishedEmpty();
132 }
133 return false;
134}
135
136template <typename MessageT>
137void IntraReader<MessageT>::Enqueue(const std::shared_ptr<MessageT>& msg) {
138 BlockerManager::Instance()->Publish<MessageT>(this->role_attr_.channel_name(),
139 msg);
140}
141
142template <typename MessageT>
143void IntraReader<MessageT>::SetHistoryDepth(const uint32_t& depth) {
144 auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
145 this->role_attr_.channel_name());
146 if (blocker != nullptr) {
147 blocker->set_capacity(depth);
148 }
149}
150
151template <typename MessageT>
153 auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
154 this->role_attr_.channel_name());
155 if (blocker != nullptr) {
156 return static_cast<uint32_t>(blocker->capacity());
157 }
158 return 0;
159}
160
161template <typename MessageT>
162std::shared_ptr<MessageT> IntraReader<MessageT>::GetLatestObserved() const {
163 auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
164 this->role_attr_.channel_name());
165 if (blocker != nullptr) {
166 return blocker->GetLatestObservedPtr();
167 }
168 return nullptr;
169}
170
171template <typename MessageT>
172std::shared_ptr<MessageT> IntraReader<MessageT>::GetOldestObserved() const {
173 auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
174 this->role_attr_.channel_name());
175 if (blocker != nullptr) {
176 return blocker->GetOldestObservedPtr();
177 }
178 return nullptr;
179}
180
181template <typename MessageT>
183 auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
184 this->role_attr_.channel_name());
185 ACHECK(blocker != nullptr);
186 return blocker->ObservedBegin();
187}
188
189template <typename MessageT>
191 auto blocker = BlockerManager::Instance()->GetBlocker<MessageT>(
192 this->role_attr_.channel_name());
193 ACHECK(blocker != nullptr);
194 return blocker->ObservedEnd();
195}
196
197template <typename MessageT>
198void IntraReader<MessageT>::OnMessage(const MessagePtr& msg_ptr) {
199 this->second_to_lastest_recv_time_sec_ = this->latest_recv_time_sec_;
200 this->latest_recv_time_sec_ = apollo::cyber::Time::Now().ToSecond();
201 if (msg_callback_ != nullptr) {
202 msg_callback_(msg_ptr);
203 }
204}
205
206} // namespace blocker
207} // namespace cyber
208} // namespace apollo
209
210#endif // CYBER_BLOCKER_INTRA_READER_H_
Reader subscribes a channel, it has two main functions:
Definition reader.h:69
static Time Now()
get the current time.
Definition time.cc:57
double ToSecond() const
convert time to second.
Definition time.cc:77
static const std::shared_ptr< BlockerManager > & Instance()
void ClearData() override
Clear local data
std::function< void(const std::shared_ptr< MessageT > &)> Callback
void Enqueue(const std::shared_ptr< MessageT > &msg) override
Push msg to Blocker's PublishQueue
bool HasReceived() const override
Query whether we have received data since last clear
typename std::list< std::shared_ptr< MessageT > >::const_iterator Iterator
bool Init() override
Init the Reader object
std::shared_ptr< MessageT > GetOldestObserved() const override
Get the oldest message we Observe
void Shutdown() override
Shutdown the Reader object
IntraReader(const proto::RoleAttributes &attr, const Callback &callback)
void SetHistoryDepth(const uint32_t &depth) override
Set Blocker's PublishQueue's capacity to depth
bool Empty() const override
Query whether the Reader has data to be handled
std::shared_ptr< MessageT > GetLatestObserved() const override
Get the latest message we Observe
void Observe() override
Get stored data
Iterator Begin() const override
Get the begin iterator of ObserveQueue, used to traverse
std::shared_ptr< MessageT > MessagePtr
Iterator End() const override
Get the end iterator of ObserveQueue, used to traverse
uint32_t GetHistoryDepth() const override
Get Blocker's PublishQueue's capacity
#define ACHECK(cond)
Definition log.h:80
class register implement
Definition arena_queue.h:37