17#ifndef CYBER_BLOCKER_INTRA_READER_H_
18#define CYBER_BLOCKER_INTRA_READER_H_
33template <
typename MessageT>
37 using Callback = std::function<void(
const std::shared_ptr<MessageT>&)>;
39 typename std::list<std::shared_ptr<MessageT>>::const_iterator;
49 bool Empty()
const override;
52 void Enqueue(
const std::shared_ptr<MessageT>& msg)
override;
66template <
typename MessageT>
69 :
Reader<MessageT>(attr), msg_callback_(callback) {}
71template <
typename MessageT>
76template <
typename MessageT>
78 if (this->init_.exchange(
true)) {
82 this->role_attr_.channel_name(), this->role_attr_.qos_profile().depth(),
83 this->role_attr_.node_name(),
85 std::placeholders::_1));
88template <
typename MessageT>
90 if (!this->init_.exchange(
false)) {
94 this->role_attr_.channel_name(), this->role_attr_.node_name());
97template <
typename MessageT>
100 this->role_attr_.channel_name());
101 if (blocker !=
nullptr) {
102 blocker->ClearObserved();
103 blocker->ClearPublished();
107template <
typename MessageT>
110 this->role_attr_.channel_name());
111 if (blocker !=
nullptr) {
116template <
typename MessageT>
119 this->role_attr_.channel_name());
120 if (blocker !=
nullptr) {
121 return blocker->IsObservedEmpty();
126template <
typename MessageT>
129 this->role_attr_.channel_name());
130 if (blocker !=
nullptr) {
131 return !blocker->IsPublishedEmpty();
136template <
typename MessageT>
142template <
typename MessageT>
145 this->role_attr_.channel_name());
146 if (blocker !=
nullptr) {
147 blocker->set_capacity(depth);
151template <
typename MessageT>
154 this->role_attr_.channel_name());
155 if (blocker !=
nullptr) {
156 return static_cast<uint32_t
>(blocker->capacity());
161template <
typename MessageT>
164 this->role_attr_.channel_name());
165 if (blocker !=
nullptr) {
166 return blocker->GetLatestObservedPtr();
171template <
typename MessageT>
174 this->role_attr_.channel_name());
175 if (blocker !=
nullptr) {
176 return blocker->GetOldestObservedPtr();
181template <
typename MessageT>
184 this->role_attr_.channel_name());
185 ACHECK(blocker !=
nullptr);
186 return blocker->ObservedBegin();
189template <
typename MessageT>
192 this->role_attr_.channel_name());
193 ACHECK(blocker !=
nullptr);
194 return blocker->ObservedEnd();
197template <
typename MessageT>
199 this->second_to_lastest_recv_time_sec_ = this->latest_recv_time_sec_;
201 if (msg_callback_ !=
nullptr) {
202 msg_callback_(msg_ptr);
Reader subscribes a channel, it has two main functions:
static Time Now()
get the current time.
double ToSecond() const
convert time to second.
static const std::shared_ptr< BlockerManager > & Instance()
void ClearData() override
Clear local data
std::function< void(const std::shared_ptr< MessageT > &)> Callback
void Enqueue(const std::shared_ptr< MessageT > &msg) override
Push msg to Blocker's PublishQueue
bool HasReceived() const override
Query whether we have received data since last clear
typename std::list< std::shared_ptr< MessageT > >::const_iterator Iterator
bool Init() override
Init the Reader object
std::shared_ptr< MessageT > GetOldestObserved() const override
Get the oldest message we Observe
void Shutdown() override
Shutdown the Reader object
IntraReader(const proto::RoleAttributes &attr, const Callback &callback)
void SetHistoryDepth(const uint32_t &depth) override
Set Blocker's PublishQueue's capacity to depth
bool Empty() const override
Query whether the Reader has data to be handled
std::shared_ptr< MessageT > GetLatestObserved() const override
Get the latest message we Observe
void Observe() override
Get stored data
Iterator Begin() const override
Get the begin iterator of ObserveQueue, used to traverse
std::shared_ptr< MessageT > MessagePtr
Iterator End() const override
Get the end iterator of ObserveQueue, used to traverse
uint32_t GetHistoryDepth() const override
Get Blocker's PublishQueue's capacity