17#ifndef CYBER_NODE_READER_H_
18#define CYBER_NODE_READER_H_
25#include <unordered_map>
29#include "cyber/proto/topology_change.pb.h"
46using CallbackFunc = std::function<void(
const std::shared_ptr<M0>&)>;
68template <
typename MessageT>
71 using BlockerPtr = std::unique_ptr<blocker::Blocker<MessageT>>;
72 using ReceiverPtr = std::shared_ptr<transport::Receiver<MessageT>>;
76 typename std::list<std::shared_ptr<MessageT>>::const_iterator;
129 bool Empty()
const override;
150 virtual void Enqueue(
const std::shared_ptr<MessageT>& msg);
208 void GetWriters(std::vector<proto::RoleAttributes>* writers)
override;
216 void JoinTheTopology();
217 void LeaveTheTopology();
222 std::string croutine_name_;
230template <
typename MessageT>
233 uint32_t pending_queue_size)
235 pending_queue_size_(pending_queue_size),
236 reader_func_(reader_func) {
241template <
typename MessageT>
246template <
typename MessageT>
248 second_to_lastest_recv_time_sec_ = latest_recv_time_sec_;
250 blocker_->Publish(msg);
253template <
typename MessageT>
258template <
typename MessageT>
260 if (init_.exchange(
true)) {
263 auto statistics_center = statistics::Statistics::Instance();
264 if (!statistics_center->RegisterChanVar(role_attr_)) {
265 AWARN <<
"Failed to register reader var!";
267 std::function<void(
const std::shared_ptr<MessageT>&)> func;
268 if (reader_func_ !=
nullptr) {
269 func = [
this](
const std::shared_ptr<MessageT>& msg) {
270 uint64_t process_start_time;
271 uint64_t proc_done_time;
272 uint64_t proc_start_time;
275 this->reader_func_(msg);
279 static_cast<uint64_t
>(latest_recv_time_sec_ * 1000000UL);
281 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
282 this->role_attr_, (proc_done_time - proc_start_time));
283 if (statistics::Statistics::Instance()->GetProcStatus(
284 this->role_attr_, &process_start_time)) {
285 auto cyber_latency = proc_start_time - process_start_time;
286 if (process_start_time > 0 && cyber_latency > 0) {
287 statistics::Statistics::Instance()->SamplingCyberLatency(
288 this->role_attr_, cyber_latency);
293 func = [
this](
const std::shared_ptr<MessageT>& msg) { this->Enqueue(msg); };
296 croutine_name_ = role_attr_.node_name() +
"_" + role_attr_.channel_name();
297 auto dv = std::make_shared<data::DataVisitor<MessageT>>(
298 role_attr_.channel_id(), pending_queue_size_);
301 croutine::CreateRoutineFactory<MessageT>(std::move(func), dv);
302 if (!sched->CreateTask(factory, croutine_name_)) {
303 AERROR <<
"Create Task Failed!";
309 this->role_attr_.set_id(receiver_->id().HashValue());
311 service_discovery::TopologyManager::Instance()->channel_manager();
317template <
typename MessageT>
319 if (!init_.exchange(
false)) {
324 channel_manager_ =
nullptr;
326 if (!croutine_name_.empty()) {
331template <
typename MessageT>
334 change_conn_ = channel_manager_->AddChangeListener(std::bind(
338 const std::string& channel_name = this->role_attr_.channel_name();
339 std::vector<proto::RoleAttributes> writers;
340 channel_manager_->GetWritersOfChannel(channel_name, &writers);
341 for (
auto& writer : writers) {
342 receiver_->Enable(writer);
345 message::HasSerializer<MessageT>::value);
348template <
typename MessageT>
349void Reader<MessageT>::LeaveTheTopology() {
350 channel_manager_->RemoveChangeListener(change_conn_);
354template <
typename MessageT>
355void Reader<MessageT>::OnChannelChange(
const proto::ChangeMsg& change_msg) {
360 auto& writer_attr = change_msg.role_attr();
361 if (writer_attr.channel_name() != this->role_attr_.channel_name()) {
365 auto operate_type = change_msg.operate_type();
367 receiver_->Enable(writer_attr);
369 receiver_->Disable(writer_attr);
373template <
typename MessageT>
375 return !blocker_->IsPublishedEmpty();
378template <
typename MessageT>
380 return blocker_->IsObservedEmpty();
383template <
typename MessageT>
385 if (latest_recv_time_sec_ < 0) {
388 if (second_to_lastest_recv_time_sec_ < 0) {
391 return std::max((
Time::Now().ToSecond() - latest_recv_time_sec_),
392 (latest_recv_time_sec_ - second_to_lastest_recv_time_sec_));
395template <
typename MessageT>
397 return pending_queue_size_;
400template <
typename MessageT>
402 return blocker_->GetLatestObservedPtr();
405template <
typename MessageT>
407 return blocker_->GetOldestObservedPtr();
410template <
typename MessageT>
412 blocker_->ClearPublished();
413 blocker_->ClearObserved();
416template <
typename MessageT>
418 blocker_->set_capacity(depth);
421template <
typename MessageT>
423 return static_cast<uint32_t
>(blocker_->capacity());
426template <
typename MessageT>
432 return channel_manager_->HasWriter(role_attr_.channel_name());
435template <
typename MessageT>
437 if (writers ==
nullptr) {
445 channel_manager_->GetWritersOfChannel(role_attr_.channel_name(), writers);
Base Class for Reader Reader is identified by one apollo::cyber::proto::RoleAttribute,...
Reader subscribes a channel, it has two main functions:
void Observe() override
Get All data that Blocker stores
virtual std::shared_ptr< MessageT > GetOldestObserved() const
Get the oldest message we Observe
typename service_discovery::Manager::ChangeConnection ChangeConnection
virtual void SetHistoryDepth(const uint32_t &depth)
Set Blocker's PublishQueue's capacity to depth
bool Init() override
Init Reader
virtual uint32_t GetHistoryDepth() const
Get Blocker's PublishQueue's capacity
std::shared_ptr< transport::Receiver< MessageT > > ReceiverPtr
double GetDelaySec() const override
Get time interval of since last receive message
void GetWriters(std::vector< proto::RoleAttributes > *writers) override
Get all writers pushlish the channel we subscribes
virtual Iterator Begin() const
Get the begin iterator of ObserveQueue, used to traverse
Reader(const proto::RoleAttributes &role_attr, const CallbackFunc< MessageT > &reader_func=nullptr, uint32_t pending_queue_size=DEFAULT_PENDING_QUEUE_SIZE)
Constructor a Reader object.
typename std::list< std::shared_ptr< MessageT > >::const_iterator Iterator
uint32_t pending_queue_size_
double latest_recv_time_sec_
virtual Iterator End() const
Get the end iterator of ObserveQueue, used to traverse
virtual void Enqueue(const std::shared_ptr< MessageT > &msg)
Push msg to Blocker's PublishQueue
std::unique_ptr< blocker::Blocker< MessageT > > BlockerPtr
uint32_t PendingQueueSize() const override
Get pending_queue_size configuration
bool HasWriter() override
Is there is at least one writer publish the channel that we subscribes?
void ClearData() override
Clear Blocker's data
bool Empty() const override
Query whether the Reader has data to be handled
double second_to_lastest_recv_time_sec_
bool HasReceived() const override
Query whether we have received data since last clear
virtual std::shared_ptr< MessageT > GetLatestObserved() const
Get the latest message we Observe
void Shutdown() override
Shutdown Reader
One Channel is related to one Receiver.
auto GetReceiver(const proto::RoleAttributes &role_attr) -> typename std::shared_ptr< transport::Receiver< MessageT > >
Get the Receiver object
uint64_t ToMicrosecond() const
convert time to microsecond (us).
static Time Now()
get the current time.
double ToSecond() const
convert time to second.
virtual bool RemoveTask(const std::string &name)=0
base::Connection< const ChangeMsg & > ChangeConnection
std::shared_ptr< ChannelManager > ChannelManagerPtr
const uint32_t DEFAULT_PENDING_QUEUE_SIZE
std::function< void(const std::shared_ptr< M0 > &)> CallbackFunc
optional QosProfile qos_profile
optional string channel_name