Apollo 10.0
自动驾驶开放平台
reader.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_NODE_READER_H_
18#define CYBER_NODE_READER_H_
19
20#include <algorithm>
21#include <list>
22#include <memory>
23#include <mutex>
24#include <string>
25#include <unordered_map>
26#include <utility>
27#include <vector>
28
29#include "cyber/proto/topology_change.pb.h"
30
39#include "cyber/time/time.h"
41
42namespace apollo {
43namespace cyber {
44
45template <typename M0>
46using CallbackFunc = std::function<void(const std::shared_ptr<M0>&)>;
47
48using proto::RoleType;
49
50const uint32_t DEFAULT_PENDING_QUEUE_SIZE = 1;
51
68template <typename MessageT>
69class Reader : public ReaderBase {
70 public:
71 using BlockerPtr = std::unique_ptr<blocker::Blocker<MessageT>>;
72 using ReceiverPtr = std::shared_ptr<transport::Receiver<MessageT>>;
75 using Iterator =
76 typename std::list<std::shared_ptr<MessageT>>::const_iterator;
77
87 explicit Reader(const proto::RoleAttributes& role_attr,
88 const CallbackFunc<MessageT>& reader_func = nullptr,
89 uint32_t pending_queue_size = DEFAULT_PENDING_QUEUE_SIZE);
90 virtual ~Reader();
91
98 bool Init() override;
99
103 void Shutdown() override;
104
108 void Observe() override;
109
113 void ClearData() override;
114
121 bool HasReceived() const override;
122
129 bool Empty() const override;
130
136 double GetDelaySec() const override;
137
143 uint32_t PendingQueueSize() const override;
144
150 virtual void Enqueue(const std::shared_ptr<MessageT>& msg);
151
157 virtual void SetHistoryDepth(const uint32_t& depth);
158
164 virtual uint32_t GetHistoryDepth() const;
165
171 virtual std::shared_ptr<MessageT> GetLatestObserved() const;
172
178 virtual std::shared_ptr<MessageT> GetOldestObserved() const;
179
185 virtual Iterator Begin() const { return blocker_->ObservedBegin(); }
186
192 virtual Iterator End() const { return blocker_->ObservedEnd(); }
193
201 bool HasWriter() override;
202
208 void GetWriters(std::vector<proto::RoleAttributes>* writers) override;
209
210 protected:
214
215 private:
216 void JoinTheTopology();
217 void LeaveTheTopology();
218 void OnChannelChange(const proto::ChangeMsg& change_msg);
219
220 CallbackFunc<MessageT> reader_func_;
221 ReceiverPtr receiver_ = nullptr;
222 std::string croutine_name_;
223
224 BlockerPtr blocker_ = nullptr;
225
226 ChangeConnection change_conn_;
227 service_discovery::ChannelManagerPtr channel_manager_ = nullptr;
228};
229
230template <typename MessageT>
232 const CallbackFunc<MessageT>& reader_func,
233 uint32_t pending_queue_size)
234 : ReaderBase(role_attr),
235 pending_queue_size_(pending_queue_size),
236 reader_func_(reader_func) {
238 role_attr.qos_profile().depth(), role_attr.channel_name())));
239}
240
241template <typename MessageT>
243 Shutdown();
244}
245
246template <typename MessageT>
247void Reader<MessageT>::Enqueue(const std::shared_ptr<MessageT>& msg) {
248 second_to_lastest_recv_time_sec_ = latest_recv_time_sec_;
249 latest_recv_time_sec_ = Time::Now().ToSecond();
250 blocker_->Publish(msg);
251}
252
253template <typename MessageT>
255 blocker_->Observe();
256}
257
258template <typename MessageT>
260 if (init_.exchange(true)) {
261 return true;
262 }
263 auto statistics_center = statistics::Statistics::Instance();
264 if (!statistics_center->RegisterChanVar(role_attr_)) {
265 AWARN << "Failed to register reader var!";
266 }
267 std::function<void(const std::shared_ptr<MessageT>&)> func;
268 if (reader_func_ != nullptr) {
269 func = [this](const std::shared_ptr<MessageT>& msg) {
270 uint64_t process_start_time;
271 uint64_t proc_done_time;
272 uint64_t proc_start_time;
273
274 this->Enqueue(msg);
275 this->reader_func_(msg);
276 // sampling proc latency in microsecond
277 proc_done_time = Time::Now().ToMicrosecond();
278 proc_start_time =
279 static_cast<uint64_t>(latest_recv_time_sec_ * 1000000UL);
280
281 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
282 this->role_attr_, (proc_done_time - proc_start_time));
283 if (statistics::Statistics::Instance()->GetProcStatus(
284 this->role_attr_, &process_start_time)) {
285 auto cyber_latency = proc_start_time - process_start_time;
286 if (process_start_time > 0 && cyber_latency > 0) {
287 statistics::Statistics::Instance()->SamplingCyberLatency(
288 this->role_attr_, cyber_latency);
289 }
290 }
291 };
292 } else {
293 func = [this](const std::shared_ptr<MessageT>& msg) { this->Enqueue(msg); };
294 }
295 auto sched = scheduler::Instance();
296 croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name();
297 auto dv = std::make_shared<data::DataVisitor<MessageT>>(
298 role_attr_.channel_id(), pending_queue_size_);
299 // Using factory to wrap templates.
301 croutine::CreateRoutineFactory<MessageT>(std::move(func), dv);
302 if (!sched->CreateTask(factory, croutine_name_)) {
303 AERROR << "Create Task Failed!";
304 init_.store(false);
305 return false;
306 }
307
308 receiver_ = ReceiverManager<MessageT>::Instance()->GetReceiver(role_attr_);
309 this->role_attr_.set_id(receiver_->id().HashValue());
310 channel_manager_ =
311 service_discovery::TopologyManager::Instance()->channel_manager();
312 JoinTheTopology();
313
314 return true;
315}
316
317template <typename MessageT>
319 if (!init_.exchange(false)) {
320 return;
321 }
322 LeaveTheTopology();
323 receiver_ = nullptr;
324 channel_manager_ = nullptr;
325
326 if (!croutine_name_.empty()) {
327 scheduler::Instance()->RemoveTask(croutine_name_);
328 }
329}
330
331template <typename MessageT>
333 // add listener
334 change_conn_ = channel_manager_->AddChangeListener(std::bind(
335 &Reader<MessageT>::OnChannelChange, this, std::placeholders::_1));
336
337 // get peer writers
338 const std::string& channel_name = this->role_attr_.channel_name();
339 std::vector<proto::RoleAttributes> writers;
340 channel_manager_->GetWritersOfChannel(channel_name, &writers);
341 for (auto& writer : writers) {
342 receiver_->Enable(writer);
343 }
344 channel_manager_->Join(this->role_attr_, proto::RoleType::ROLE_READER,
345 message::HasSerializer<MessageT>::value);
346}
347
348template <typename MessageT>
349void Reader<MessageT>::LeaveTheTopology() {
350 channel_manager_->RemoveChangeListener(change_conn_);
351 channel_manager_->Leave(this->role_attr_, proto::RoleType::ROLE_READER);
352}
353
354template <typename MessageT>
355void Reader<MessageT>::OnChannelChange(const proto::ChangeMsg& change_msg) {
356 if (change_msg.role_type() != proto::RoleType::ROLE_WRITER) {
357 return;
358 }
359
360 auto& writer_attr = change_msg.role_attr();
361 if (writer_attr.channel_name() != this->role_attr_.channel_name()) {
362 return;
363 }
364
365 auto operate_type = change_msg.operate_type();
366 if (operate_type == proto::OperateType::OPT_JOIN) {
367 receiver_->Enable(writer_attr);
368 } else {
369 receiver_->Disable(writer_attr);
370 }
371}
372
373template <typename MessageT>
375 return !blocker_->IsPublishedEmpty();
376}
377
378template <typename MessageT>
380 return blocker_->IsObservedEmpty();
381}
382
383template <typename MessageT>
385 if (latest_recv_time_sec_ < 0) {
386 return -1.0;
387 }
388 if (second_to_lastest_recv_time_sec_ < 0) {
389 return Time::Now().ToSecond() - latest_recv_time_sec_;
390 }
391 return std::max((Time::Now().ToSecond() - latest_recv_time_sec_),
392 (latest_recv_time_sec_ - second_to_lastest_recv_time_sec_));
393}
394
395template <typename MessageT>
397 return pending_queue_size_;
398}
399
400template <typename MessageT>
401std::shared_ptr<MessageT> Reader<MessageT>::GetLatestObserved() const {
402 return blocker_->GetLatestObservedPtr();
403}
404
405template <typename MessageT>
406std::shared_ptr<MessageT> Reader<MessageT>::GetOldestObserved() const {
407 return blocker_->GetOldestObservedPtr();
408}
409
410template <typename MessageT>
412 blocker_->ClearPublished();
413 blocker_->ClearObserved();
414}
415
416template <typename MessageT>
417void Reader<MessageT>::SetHistoryDepth(const uint32_t& depth) {
418 blocker_->set_capacity(depth);
419}
420
421template <typename MessageT>
423 return static_cast<uint32_t>(blocker_->capacity());
424}
425
426template <typename MessageT>
428 if (!init_.load()) {
429 return false;
430 }
431
432 return channel_manager_->HasWriter(role_attr_.channel_name());
433}
434
435template <typename MessageT>
436void Reader<MessageT>::GetWriters(std::vector<proto::RoleAttributes>* writers) {
437 if (writers == nullptr) {
438 return;
439 }
440
441 if (!init_.load()) {
442 return;
443 }
444
445 channel_manager_->GetWritersOfChannel(role_attr_.channel_name(), writers);
446}
447
448} // namespace cyber
449} // namespace apollo
450
451#endif // CYBER_NODE_READER_H_
Base Class for Reader Reader is identified by one apollo::cyber::proto::RoleAttribute,...
Definition reader_base.h:46
Reader subscribes a channel, it has two main functions:
Definition reader.h:69
void Observe() override
Get All data that Blocker stores
Definition reader.h:254
virtual std::shared_ptr< MessageT > GetOldestObserved() const
Get the oldest message we Observe
Definition reader.h:406
typename service_discovery::Manager::ChangeConnection ChangeConnection
Definition reader.h:74
virtual void SetHistoryDepth(const uint32_t &depth)
Set Blocker's PublishQueue's capacity to depth
Definition reader.h:417
virtual ~Reader()
Definition reader.h:242
bool Init() override
Init Reader
Definition reader.h:259
virtual uint32_t GetHistoryDepth() const
Get Blocker's PublishQueue's capacity
Definition reader.h:422
std::shared_ptr< transport::Receiver< MessageT > > ReceiverPtr
Definition reader.h:72
double GetDelaySec() const override
Get time interval of since last receive message
Definition reader.h:384
void GetWriters(std::vector< proto::RoleAttributes > *writers) override
Get all writers pushlish the channel we subscribes
Definition reader.h:436
virtual Iterator Begin() const
Get the begin iterator of ObserveQueue, used to traverse
Definition reader.h:185
Reader(const proto::RoleAttributes &role_attr, const CallbackFunc< MessageT > &reader_func=nullptr, uint32_t pending_queue_size=DEFAULT_PENDING_QUEUE_SIZE)
Constructor a Reader object.
Definition reader.h:231
typename std::list< std::shared_ptr< MessageT > >::const_iterator Iterator
Definition reader.h:76
uint32_t pending_queue_size_
Definition reader.h:213
double latest_recv_time_sec_
Definition reader.h:211
virtual Iterator End() const
Get the end iterator of ObserveQueue, used to traverse
Definition reader.h:192
virtual void Enqueue(const std::shared_ptr< MessageT > &msg)
Push msg to Blocker's PublishQueue
Definition reader.h:247
std::unique_ptr< blocker::Blocker< MessageT > > BlockerPtr
Definition reader.h:71
uint32_t PendingQueueSize() const override
Get pending_queue_size configuration
Definition reader.h:396
bool HasWriter() override
Is there is at least one writer publish the channel that we subscribes?
Definition reader.h:427
void ClearData() override
Clear Blocker's data
Definition reader.h:411
bool Empty() const override
Query whether the Reader has data to be handled
Definition reader.h:379
double second_to_lastest_recv_time_sec_
Definition reader.h:212
bool HasReceived() const override
Query whether we have received data since last clear
Definition reader.h:374
virtual std::shared_ptr< MessageT > GetLatestObserved() const
Get the latest message we Observe
Definition reader.h:401
void Shutdown() override
Shutdown Reader
Definition reader.h:318
One Channel is related to one Receiver.
auto GetReceiver(const proto::RoleAttributes &role_attr) -> typename std::shared_ptr< transport::Receiver< MessageT > >
Get the Receiver object
uint64_t ToMicrosecond() const
convert time to microsecond (us).
Definition time.cc:85
static Time Now()
get the current time.
Definition time.cc:57
double ToSecond() const
convert time to second.
Definition time.cc:77
virtual bool RemoveTask(const std::string &name)=0
base::Connection< const ChangeMsg & > ChangeConnection
Definition manager.h:58
#define AERROR
Definition log.h:44
#define AWARN
Definition log.h:43
std::shared_ptr< ChannelManager > ChannelManagerPtr
const uint32_t DEFAULT_PENDING_QUEUE_SIZE
Definition reader.h:50
std::function< void(const std::shared_ptr< M0 > &)> CallbackFunc
Definition reader.h:46
class register implement
Definition arena_queue.h:37