Apollo 10.0
自动驾驶开放平台
apollo::cyber::Reader< MessageT > 模板类 参考

Reader subscribes a channel, it has two main functions: 更多...

#include <reader.h>

类 apollo::cyber::Reader< MessageT > 继承关系图:
apollo::cyber::Reader< MessageT > 的协作图:

Public 类型

using BlockerPtr = std::unique_ptr< blocker::Blocker< MessageT > >
 
using ReceiverPtr = std::shared_ptr< transport::Receiver< MessageT > >
 
using ChangeConnection = typename service_discovery::Manager::ChangeConnection
 
using Iterator = typename std::list< std::shared_ptr< MessageT > >::const_iterator
 

Public 成员函数

 Reader (const proto::RoleAttributes &role_attr, const CallbackFunc< MessageT > &reader_func=nullptr, uint32_t pending_queue_size=DEFAULT_PENDING_QUEUE_SIZE)
 Constructor a Reader object.
 
virtual ~Reader ()
 
bool Init () override
 Init Reader
 
void Shutdown () override
 Shutdown Reader
 
void Observe () override
 Get All data that Blocker stores
 
void ClearData () override
 Clear Blocker's data
 
bool HasReceived () const override
 Query whether we have received data since last clear
 
bool Empty () const override
 Query whether the Reader has data to be handled
 
double GetDelaySec () const override
 Get time interval of since last receive message
 
uint32_t PendingQueueSize () const override
 Get pending_queue_size configuration
 
virtual void Enqueue (const std::shared_ptr< MessageT > &msg)
 Push msg to Blocker's PublishQueue
 
virtual void SetHistoryDepth (const uint32_t &depth)
 Set Blocker's PublishQueue's capacity to depth
 
virtual uint32_t GetHistoryDepth () const
 Get Blocker's PublishQueue's capacity
 
virtual std::shared_ptr< MessageT > GetLatestObserved () const
 Get the latest message we Observe
 
virtual std::shared_ptr< MessageT > GetOldestObserved () const
 Get the oldest message we Observe
 
virtual Iterator Begin () const
 Get the begin iterator of ObserveQueue, used to traverse
 
virtual Iterator End () const
 Get the end iterator of ObserveQueue, used to traverse
 
bool HasWriter () override
 Is there is at least one writer publish the channel that we subscribes?
 
void GetWriters (std::vector< proto::RoleAttributes > *writers) override
 Get all writers pushlish the channel we subscribes
 
- Public 成员函数 继承自 apollo::cyber::ReaderBase
 ReaderBase (const proto::RoleAttributes &role_attr)
 
virtual ~ReaderBase ()
 
const std::string & GetChannelName () const
 Get Reader's Channel name
 
uint64_t ChannelId () const
 Get Reader's Channel id
 
const proto::QosProfileQosProfile () const
 Get qos profile.
 
bool IsInit () const
 Query whether the Reader is initialized
 

Protected 属性

double latest_recv_time_sec_ = -1.0
 
double second_to_lastest_recv_time_sec_ = -1.0
 
uint32_t pending_queue_size_
 
- Protected 属性 继承自 apollo::cyber::ReaderBase
proto::RoleAttributes role_attr_
 
std::atomic< bool > init_
 

详细描述

template<typename MessageT>
class apollo::cyber::Reader< MessageT >

Reader subscribes a channel, it has two main functions:

  1. You can pass a CallbackFunc to handle the message then it arrived
  2. You can Observe messages that Blocker cached. Reader automatically push the message to Blocker's PublishQueue, and we can use Observe to fetch messages from PublishQueue to ObserveQueue. But, if you have set CallbackFunc, you can ignore this. One Reader uses one ChannelBuffer, the message we are handling is stored in ChannelBuffer Reader will Join the topology when init and Leave the topology when shutdown
    警告
    To save resource, ChannelBuffer has limited length, it's passed through the pending_queue_size param. pending_queue_size is default set to 1, So, If you handle slower than writer sending, older messages that are not handled will be lost. You can increase pending_queue_size to resolve this problem.

在文件 reader.h69 行定义.

成员类型定义说明

◆ BlockerPtr

template<typename MessageT >
using apollo::cyber::Reader< MessageT >::BlockerPtr = std::unique_ptr<blocker::Blocker<MessageT> >

在文件 reader.h71 行定义.

◆ ChangeConnection

template<typename MessageT >
using apollo::cyber::Reader< MessageT >::ChangeConnection = typename service_discovery::Manager::ChangeConnection

在文件 reader.h73 行定义.

◆ Iterator

template<typename MessageT >
using apollo::cyber::Reader< MessageT >::Iterator = typename std::list<std::shared_ptr<MessageT> >::const_iterator

在文件 reader.h75 行定义.

◆ ReceiverPtr

template<typename MessageT >
using apollo::cyber::Reader< MessageT >::ReceiverPtr = std::shared_ptr<transport::Receiver<MessageT> >

在文件 reader.h72 行定义.

构造及析构函数说明

◆ Reader()

template<typename MessageT >
apollo::cyber::Reader< MessageT >::Reader ( const proto::RoleAttributes role_attr,
const CallbackFunc< MessageT > &  reader_func = nullptr,
uint32_t  pending_queue_size = DEFAULT_PENDING_QUEUE_SIZE 
)
explicit

Constructor a Reader object.

参数
role_attris a protobuf message RoleAttributes, which includes the channel name and other info.
reader_funcis the callback function, when the message is received.
pending_queue_sizeis the max depth of message cache queue.
警告
the received messages is enqueue a queue,the queue's depth is pending_queue_size

在文件 reader.h231 行定义.

234 : ReaderBase(role_attr),
235 pending_queue_size_(pending_queue_size),
236 reader_func_(reader_func) {
237 blocker_.reset(new blocker::Blocker<MessageT>(blocker::BlockerAttr(
238 role_attr.qos_profile().depth(), role_attr.channel_name())));
239}
ReaderBase(const proto::RoleAttributes &role_attr)
Definition reader_base.h:48
uint32_t pending_queue_size_
Definition reader.h:213

◆ ~Reader()

template<typename MessageT >
apollo::cyber::Reader< MessageT >::~Reader ( )
virtual

在文件 reader.h242 行定义.

242 {
243 Shutdown();
244}
void Shutdown() override
Shutdown Reader
Definition reader.h:318

成员函数说明

◆ Begin()

template<typename MessageT >
virtual Iterator apollo::cyber::Reader< MessageT >::Begin ( ) const
inlinevirtual

Get the begin iterator of ObserveQueue, used to traverse

返回
Iterator begin iterator

apollo::cyber::blocker::IntraReader< MessageT > 重载.

在文件 reader.h185 行定义.

185{ return blocker_->ObservedBegin(); }

◆ ClearData()

template<typename MessageT >
void apollo::cyber::Reader< MessageT >::ClearData ( )
overridevirtual

Clear Blocker's data

实现了 apollo::cyber::ReaderBase.

在文件 reader.h411 行定义.

411 {
412 blocker_->ClearPublished();
413 blocker_->ClearObserved();
414}

◆ Empty()

template<typename MessageT >
bool apollo::cyber::Reader< MessageT >::Empty ( ) const
overridevirtual

Query whether the Reader has data to be handled

返回
true if blocker is empty
false if blocker has data

实现了 apollo::cyber::ReaderBase.

在文件 reader.h379 行定义.

379 {
380 return blocker_->IsObservedEmpty();
381}

◆ End()

template<typename MessageT >
virtual Iterator apollo::cyber::Reader< MessageT >::End ( ) const
inlinevirtual

Get the end iterator of ObserveQueue, used to traverse

返回
Iterator begin iterator

apollo::cyber::blocker::IntraReader< MessageT > 重载.

在文件 reader.h192 行定义.

192{ return blocker_->ObservedEnd(); }

◆ Enqueue()

template<typename MessageT >
void apollo::cyber::Reader< MessageT >::Enqueue ( const std::shared_ptr< MessageT > &  msg)
virtual

Push msg to Blocker's PublishQueue

参数
msgmessage ptr to be pushed

apollo::cyber::blocker::IntraReader< MessageT > 重载.

在文件 reader.h247 行定义.

247 {
250 blocker_->Publish(msg);
251}
double latest_recv_time_sec_
Definition reader.h:211
double second_to_lastest_recv_time_sec_
Definition reader.h:212
static Time Now()
get the current time.
Definition time.cc:57
double ToSecond() const
convert time to second.
Definition time.cc:77

◆ GetDelaySec()

template<typename MessageT >
double apollo::cyber::Reader< MessageT >::GetDelaySec ( ) const
overridevirtual

Get time interval of since last receive message

返回
double seconds delay

实现了 apollo::cyber::ReaderBase.

在文件 reader.h384 行定义.

384 {
385 if (latest_recv_time_sec_ < 0) {
386 return -1.0;
387 }
390 }
391 return std::max((Time::Now().ToSecond() - latest_recv_time_sec_),
393}

◆ GetHistoryDepth()

template<typename MessageT >
uint32_t apollo::cyber::Reader< MessageT >::GetHistoryDepth ( ) const
virtual

Get Blocker's PublishQueue's capacity

返回
uint32_t depth of the history

apollo::cyber::blocker::IntraReader< MessageT > 重载.

在文件 reader.h422 行定义.

422 {
423 return static_cast<uint32_t>(blocker_->capacity());
424}

◆ GetLatestObserved()

template<typename MessageT >
std::shared_ptr< MessageT > apollo::cyber::Reader< MessageT >::GetLatestObserved ( ) const
virtual

Get the latest message we Observe

返回
std::shared_ptr<MessageT> the latest message

apollo::cyber::blocker::IntraReader< MessageT > 重载.

在文件 reader.h401 行定义.

401 {
402 return blocker_->GetLatestObservedPtr();
403}

◆ GetOldestObserved()

template<typename MessageT >
std::shared_ptr< MessageT > apollo::cyber::Reader< MessageT >::GetOldestObserved ( ) const
virtual

Get the oldest message we Observe

返回
std::shared_ptr<MessageT> the oldest message

apollo::cyber::blocker::IntraReader< MessageT > 重载.

在文件 reader.h406 行定义.

406 {
407 return blocker_->GetOldestObservedPtr();
408}

◆ GetWriters()

template<typename MessageT >
void apollo::cyber::Reader< MessageT >::GetWriters ( std::vector< proto::RoleAttributes > *  writers)
overridevirtual

Get all writers pushlish the channel we subscribes

参数
writersresult vector of RoleAttributes

重载 apollo::cyber::ReaderBase .

在文件 reader.h436 行定义.

436 {
437 if (writers == nullptr) {
438 return;
439 }
440
441 if (!init_.load()) {
442 return;
443 }
444
445 channel_manager_->GetWritersOfChannel(role_attr_.channel_name(), writers);
446}
std::atomic< bool > init_
proto::RoleAttributes role_attr_

◆ HasReceived()

template<typename MessageT >
bool apollo::cyber::Reader< MessageT >::HasReceived ( ) const
overridevirtual

Query whether we have received data since last clear

返回
true if the reader has received data
false if the reader has not received data

实现了 apollo::cyber::ReaderBase.

在文件 reader.h374 行定义.

374 {
375 return !blocker_->IsPublishedEmpty();
376}

◆ HasWriter()

template<typename MessageT >
bool apollo::cyber::Reader< MessageT >::HasWriter ( )
overridevirtual

Is there is at least one writer publish the channel that we subscribes?

返回
true if the channel has writer
false if the channel has no writer

重载 apollo::cyber::ReaderBase .

在文件 reader.h427 行定义.

427 {
428 if (!init_.load()) {
429 return false;
430 }
431
432 return channel_manager_->HasWriter(role_attr_.channel_name());
433}

◆ Init()

template<typename MessageT >
bool apollo::cyber::Reader< MessageT >::Init ( )
overridevirtual

Init Reader

返回
true if init successfully
false if init failed

实现了 apollo::cyber::ReaderBase.

在文件 reader.h259 行定义.

259 {
260 if (init_.exchange(true)) {
261 return true;
262 }
263 auto statistics_center = statistics::Statistics::Instance();
264 if (!statistics_center->RegisterChanVar(role_attr_)) {
265 AWARN << "Failed to register reader var!";
266 }
267 std::function<void(const std::shared_ptr<MessageT>&)> func;
268 if (reader_func_ != nullptr) {
269 func = [this](const std::shared_ptr<MessageT>& msg) {
270 uint64_t process_start_time;
271 uint64_t proc_done_time;
272 uint64_t proc_start_time;
273
274 this->Enqueue(msg);
275 this->reader_func_(msg);
276 // sampling proc latency in microsecond
277 proc_done_time = Time::Now().ToMicrosecond();
278 proc_start_time =
279 static_cast<uint64_t>(latest_recv_time_sec_ * 1000000UL);
280
281 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
282 this->role_attr_, (proc_done_time - proc_start_time));
283 if (statistics::Statistics::Instance()->GetProcStatus(
284 this->role_attr_, &process_start_time)) {
285 auto cyber_latency = proc_start_time - process_start_time;
286 if (process_start_time > 0 && cyber_latency > 0) {
287 statistics::Statistics::Instance()->SamplingCyberLatency(
288 this->role_attr_, cyber_latency);
289 }
290 }
291 };
292 } else {
293 func = [this](const std::shared_ptr<MessageT>& msg) { this->Enqueue(msg); };
294 }
295 auto sched = scheduler::Instance();
296 croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name();
297 auto dv = std::make_shared<data::DataVisitor<MessageT>>(
299 // Using factory to wrap templates.
300 croutine::RoutineFactory factory =
301 croutine::CreateRoutineFactory<MessageT>(std::move(func), dv);
302 if (!sched->CreateTask(factory, croutine_name_)) {
303 AERROR << "Create Task Failed!";
304 init_.store(false);
305 return false;
306 }
307
308 receiver_ = ReceiverManager<MessageT>::Instance()->GetReceiver(role_attr_);
309 this->role_attr_.set_id(receiver_->id().HashValue());
310 channel_manager_ =
311 service_discovery::TopologyManager::Instance()->channel_manager();
312 JoinTheTopology();
313
314 return true;
315}
virtual void Enqueue(const std::shared_ptr< MessageT > &msg)
Push msg to Blocker's PublishQueue
Definition reader.h:247
uint64_t ToMicrosecond() const
convert time to microsecond (us).
Definition time.cc:85
#define AERROR
Definition log.h:44
#define AWARN
Definition log.h:43

◆ Observe()

template<typename MessageT >
void apollo::cyber::Reader< MessageT >::Observe ( )
overridevirtual

Get All data that Blocker stores

实现了 apollo::cyber::ReaderBase.

在文件 reader.h254 行定义.

254 {
255 blocker_->Observe();
256}

◆ PendingQueueSize()

template<typename MessageT >
uint32_t apollo::cyber::Reader< MessageT >::PendingQueueSize ( ) const
overridevirtual

Get pending_queue_size configuration

返回
uint32_t the value of pending queue size

实现了 apollo::cyber::ReaderBase.

在文件 reader.h396 行定义.

396 {
397 return pending_queue_size_;
398}

◆ SetHistoryDepth()

template<typename MessageT >
void apollo::cyber::Reader< MessageT >::SetHistoryDepth ( const uint32_t &  depth)
virtual

Set Blocker's PublishQueue's capacity to depth

参数
depththe value you want to set

apollo::cyber::blocker::IntraReader< MessageT > 重载.

在文件 reader.h417 行定义.

417 {
418 blocker_->set_capacity(depth);
419}

◆ Shutdown()

template<typename MessageT >
void apollo::cyber::Reader< MessageT >::Shutdown ( )
overridevirtual

Shutdown Reader

实现了 apollo::cyber::ReaderBase.

在文件 reader.h318 行定义.

318 {
319 if (!init_.exchange(false)) {
320 return;
321 }
322 LeaveTheTopology();
323 receiver_ = nullptr;
324 channel_manager_ = nullptr;
325
326 if (!croutine_name_.empty()) {
327 scheduler::Instance()->RemoveTask(croutine_name_);
328 }
329}
virtual bool RemoveTask(const std::string &name)=0

类成员变量说明

◆ latest_recv_time_sec_

template<typename MessageT >
double apollo::cyber::Reader< MessageT >::latest_recv_time_sec_ = -1.0
protected

在文件 reader.h211 行定义.

◆ pending_queue_size_

template<typename MessageT >
uint32_t apollo::cyber::Reader< MessageT >::pending_queue_size_
protected

在文件 reader.h213 行定义.

◆ second_to_lastest_recv_time_sec_

template<typename MessageT >
double apollo::cyber::Reader< MessageT >::second_to_lastest_recv_time_sec_ = -1.0
protected

在文件 reader.h212 行定义.


该类的文档由以下文件生成: