73 bool Process(
const std::shared_ptr<M0>& msg0,
const std::shared_ptr<M1>& msg1,
74 const std::shared_ptr<M2>& msg2,
75 const std::shared_ptr<M3>& msg3);
88 virtual bool Proc(
const std::shared_ptr<M0>& msg0,
89 const std::shared_ptr<M1>& msg1,
90 const std::shared_ptr<M2>& msg2,
91 const std::shared_ptr<M3>& msg3) = 0;
102template <
typename M0>
108 bool Process(
const std::shared_ptr<M0>& msg);
111 virtual bool Proc(
const std::shared_ptr<M0>& msg) = 0;
114template <
typename M0,
typename M1>
120 bool Process(
const std::shared_ptr<M0>& msg0,
121 const std::shared_ptr<M1>& msg1);
124 virtual bool Proc(
const std::shared_ptr<M0>& msg,
125 const std::shared_ptr<M1>& msg1) = 0;
128template <
typename M0,
typename M1,
typename M2>
134 bool Process(
const std::shared_ptr<M0>& msg0,
const std::shared_ptr<M1>& msg1,
135 const std::shared_ptr<M2>& msg2);
138 virtual bool Proc(
const std::shared_ptr<M0>& msg,
139 const std::shared_ptr<M1>& msg1,
140 const std::shared_ptr<M2>& msg2) = 0;
143template <
typename M0>
145 const std::shared_ptr<M0>& msg) {
146 if (is_shutdown_.load()) {
154 node_.reset(
new Node(config.
name()));
155 LoadConfigFiles(config);
157 AERROR <<
"Component Init() failed." << std::endl;
163template <
typename M0>
166 node_.reset(
new Node(config.
name()));
167 LoadConfigFiles(config);
169 if (config.readers_size() < 1) {
170 AERROR <<
"Invalid config file: too few readers.";
175 AERROR <<
"Component Init() failed.";
179 bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
186 auto role_attr = std::make_shared<proto::RoleAttributes>();
187 role_attr->set_node_name(config.
name());
190 std::weak_ptr<Component<M0>> self =
191 std::dynamic_pointer_cast<Component<M0>>(shared_from_this());
192 auto func = [self, role_attr](
const std::shared_ptr<M0>& msg) {
194 auto ptr = self.lock();
198 AERROR <<
"Component object has been destroyed.";
202 uint64_t process_start_time;
203 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
204 *role_attr, end_time - start_time);
205 if (statistics::Statistics::Instance()->GetProcStatus(
206 *role_attr, &process_start_time) &&
207 (start_time - process_start_time) > 0) {
208 statistics::Statistics::Instance()->SamplingCyberLatency(
209 *role_attr, start_time - process_start_time);
213 std::shared_ptr<Reader<M0>> reader =
nullptr;
216 reader = node_->CreateReader<M0>(reader_cfg);
218 reader = node_->CreateReader<M0>(reader_cfg, func);
221 if (reader ==
nullptr) {
222 AERROR <<
"Component create reader failed.";
225 readers_.emplace_back(std::move(reader));
232 readers_[0]->PendingQueueSize()};
233 auto dv = std::make_shared<data::DataVisitor<M0>>(conf);
235 croutine::CreateRoutineFactory<M0>(func, dv);
237 return sched->CreateTask(factory, node_->Name());
240template <
typename M0,
typename M1>
242 const std::shared_ptr<M0>& msg0,
const std::shared_ptr<M1>& msg1) {
243 if (is_shutdown_.load()) {
246 return Proc(msg0, msg1);
249template <
typename M0,
typename M1>
252 node_.reset(
new Node(config.
name()));
253 LoadConfigFiles(config);
255 if (config.readers_size() < 2) {
256 AERROR <<
"Invalid config file: too few readers.";
261 AERROR <<
"Component Init() failed.";
265 bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
272 auto reader1 = node_->template CreateReader<M1>(reader_cfg);
278 auto role_attr = std::make_shared<proto::RoleAttributes>();
279 role_attr->set_node_name(config.
name());
282 std::shared_ptr<Reader<M0>> reader0 =
nullptr;
284 reader0 = node_->template CreateReader<M0>(reader_cfg);
286 std::weak_ptr<Component<M0, M1>> self =
287 std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
292 auto func = [self, blocker1, role_attr](
const std::shared_ptr<M0>& msg0) {
294 auto ptr = self.lock();
296 if (!blocker1->IsPublishedEmpty()) {
297 auto msg1 = blocker1->GetLatestPublishedPtr();
298 ptr->Process(msg0, msg1);
301 uint64_t process_start_time;
302 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
303 *role_attr, end_time - start_time);
304 if (statistics::Statistics::Instance()->GetProcStatus(
305 *role_attr, &process_start_time) &&
306 (start_time - process_start_time) > 0) {
307 statistics::Statistics::Instance()->SamplingCyberLatency(
308 *role_attr, start_time - process_start_time);
312 AERROR <<
"Component object has been destroyed.";
316 reader0 = node_->template CreateReader<M0>(reader_cfg, func);
318 if (reader0 ==
nullptr || reader1 ==
nullptr) {
319 AERROR <<
"Component create reader failed.";
322 readers_.push_back(std::move(reader0));
323 readers_.push_back(std::move(reader1));
330 std::weak_ptr<Component<M0, M1>> self =
331 std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
332 auto func = [self, role_attr](
const std::shared_ptr<M0>& msg0,
333 const std::shared_ptr<M1>& msg1) {
335 auto ptr = self.lock();
337 ptr->Process(msg0, msg1);
340 uint64_t process_start_time;
341 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
342 *role_attr, end_time - start_time);
343 if (statistics::Statistics::Instance()->GetProcStatus(
344 *role_attr, &process_start_time) &&
345 (start_time - process_start_time) > 0) {
346 statistics::Statistics::Instance()->SamplingCyberLatency(
347 *role_attr, start_time - process_start_time);
350 AERROR <<
"Component object has been destroyed.";
354 std::vector<data::VisitorConfig> config_list;
355 for (
auto& reader : readers_) {
356 config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
358 auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
360 croutine::CreateRoutineFactory<M0, M1>(func, dv);
361 return sched->CreateTask(factory, node_->Name());
364template <
typename M0,
typename M1,
typename M2>
366 const std::shared_ptr<M1>& msg1,
367 const std::shared_ptr<M2>& msg2) {
368 if (is_shutdown_.load()) {
371 return Proc(msg0, msg1, msg2);
374template <
typename M0,
typename M1,
typename M2>
377 node_.reset(
new Node(config.
name()));
378 LoadConfigFiles(config);
380 if (config.readers_size() < 3) {
381 AERROR <<
"Invalid config file: too few readers.";
386 AERROR <<
"Component Init() failed.";
390 bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
397 auto reader1 = node_->template CreateReader<M1>(reader_cfg);
403 auto reader2 = node_->template CreateReader<M2>(reader_cfg);
409 auto role_attr = std::make_shared<proto::RoleAttributes>();
410 role_attr->set_node_name(config.
name());
413 std::shared_ptr<Reader<M0>> reader0 =
nullptr;
415 reader0 = node_->template CreateReader<M0>(reader_cfg);
417 std::weak_ptr<Component<M0, M1, M2, NullType>> self =
418 std::dynamic_pointer_cast<Component<M0, M1, M2, NullType>>(
426 auto func = [self, blocker1, blocker2,
427 role_attr](
const std::shared_ptr<M0>& msg0) {
429 auto ptr = self.lock();
431 if (!blocker1->IsPublishedEmpty() && !blocker2->IsPublishedEmpty()) {
432 auto msg1 = blocker1->GetLatestPublishedPtr();
433 auto msg2 = blocker2->GetLatestPublishedPtr();
434 ptr->Process(msg0, msg1, msg2);
437 uint64_t process_start_time;
438 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
439 *role_attr, end_time - start_time);
440 if (statistics::Statistics::Instance()->GetProcStatus(
441 *role_attr, &process_start_time) &&
442 (start_time - process_start_time) > 0) {
443 statistics::Statistics::Instance()->SamplingCyberLatency(
444 *role_attr, start_time - process_start_time);
448 AERROR <<
"Component object has been destroyed.";
452 reader0 = node_->template CreateReader<M0>(reader_cfg, func);
455 if (reader0 ==
nullptr || reader1 ==
nullptr || reader2 ==
nullptr) {
456 AERROR <<
"Component create reader failed.";
459 readers_.push_back(std::move(reader0));
460 readers_.push_back(std::move(reader1));
461 readers_.push_back(std::move(reader2));
468 std::weak_ptr<Component<M0, M1, M2, NullType>> self =
469 std::dynamic_pointer_cast<Component<M0, M1, M2, NullType>>(
471 auto func = [self, role_attr](
const std::shared_ptr<M0>& msg0,
472 const std::shared_ptr<M1>& msg1,
473 const std::shared_ptr<M2>& msg2) {
475 auto ptr = self.lock();
477 ptr->Process(msg0, msg1, msg2);
480 uint64_t process_start_time;
481 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
482 *role_attr, end_time - start_time);
483 if (statistics::Statistics::Instance()->GetProcStatus(
484 *role_attr, &process_start_time) &&
485 (start_time - process_start_time) > 0) {
486 statistics::Statistics::Instance()->SamplingCyberLatency(
487 *role_attr, start_time - process_start_time);
490 AERROR <<
"Component object has been destroyed.";
494 std::vector<data::VisitorConfig> config_list;
495 for (
auto& reader : readers_) {
496 config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
498 auto dv = std::make_shared<data::DataVisitor<M0, M1, M2>>(config_list);
500 croutine::CreateRoutineFactory<M0, M1, M2>(func, dv);
501 return sched->CreateTask(factory, node_->Name());
504template <
typename M0,
typename M1,
typename M2,
typename M3>
506 const std::shared_ptr<M1>& msg1,
507 const std::shared_ptr<M2>& msg2,
508 const std::shared_ptr<M3>& msg3) {
509 if (is_shutdown_.load()) {
512 return Proc(msg0, msg1, msg2, msg3);
515template <
typename M0,
typename M1,
typename M2,
typename M3>
517 node_.reset(
new Node(config.
name()));
518 LoadConfigFiles(config);
520 if (config.readers_size() < 4) {
521 AERROR <<
"Invalid config file: too few readers_." << std::endl;
526 AERROR <<
"Component Init() failed." << std::endl;
530 bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
532 ReaderConfig reader_cfg;
537 auto reader1 = node_->template CreateReader<M1>(reader_cfg);
543 auto reader2 = node_->template CreateReader<M2>(reader_cfg);
549 auto reader3 = node_->template CreateReader<M3>(reader_cfg);
555 auto role_attr = std::make_shared<proto::RoleAttributes>();
556 role_attr->set_node_name(config.
name());
559 std::shared_ptr<Reader<M0>> reader0 =
nullptr;
561 reader0 = node_->template CreateReader<M0>(reader_cfg);
563 std::weak_ptr<Component<M0, M1, M2, M3>> self =
564 std::dynamic_pointer_cast<Component<M0, M1, M2, M3>>(
574 auto func = [self, blocker1, blocker2, blocker3,
575 role_attr](
const std::shared_ptr<M0>& msg0) {
577 auto ptr = self.lock();
579 if (!blocker1->IsPublishedEmpty() && !blocker2->IsPublishedEmpty() &&
580 !blocker3->IsPublishedEmpty()) {
581 auto msg1 = blocker1->GetLatestPublishedPtr();
582 auto msg2 = blocker2->GetLatestPublishedPtr();
583 auto msg3 = blocker3->GetLatestPublishedPtr();
584 ptr->Process(msg0, msg1, msg2, msg3);
587 uint64_t process_start_time;
588 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
589 *role_attr, end_time - start_time);
590 if (statistics::Statistics::Instance()->GetProcStatus(
591 *role_attr, &process_start_time) &&
592 (start_time - process_start_time) > 0) {
593 statistics::Statistics::Instance()->SamplingCyberLatency(
594 *role_attr, start_time - process_start_time);
598 AERROR <<
"Component object has been destroyed.";
602 reader0 = node_->template CreateReader<M0>(reader_cfg, func);
605 if (reader0 ==
nullptr || reader1 ==
nullptr || reader2 ==
nullptr ||
606 reader3 ==
nullptr) {
607 AERROR <<
"Component create reader failed." << std::endl;
610 readers_.push_back(std::move(reader0));
611 readers_.push_back(std::move(reader1));
612 readers_.push_back(std::move(reader2));
613 readers_.push_back(std::move(reader3));
620 std::weak_ptr<Component<M0, M1, M2, M3>> self =
621 std::dynamic_pointer_cast<Component<M0, M1, M2, M3>>(shared_from_this());
622 auto func = [self, role_attr](
const std::shared_ptr<M0>& msg0,
623 const std::shared_ptr<M1>& msg1,
624 const std::shared_ptr<M2>& msg2,
625 const std::shared_ptr<M3>& msg3) {
627 auto ptr = self.lock();
629 ptr->Process(msg0, msg1, msg2, msg3);
632 uint64_t process_start_time;
633 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
634 *role_attr, end_time - start_time);
635 if (statistics::Statistics::Instance()->GetProcStatus(
636 *role_attr, &process_start_time) &&
637 (start_time - process_start_time) > 0) {
638 statistics::Statistics::Instance()->SamplingCyberLatency(
639 *role_attr, start_time - process_start_time);
642 AERROR <<
"Component object has been destroyed." << std::endl;
646 std::vector<data::VisitorConfig> config_list;
647 for (
auto& reader : readers_) {
648 config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
650 auto dv = std::make_shared<data::DataVisitor<M0, M1, M2, M3>>(config_list);
651 croutine::RoutineFactory factory =
652 croutine::CreateRoutineFactory<M0, M1, M2, M3>(func, dv);
653 return sched->CreateTask(factory, node_->Name());
656#define CYBER_REGISTER_COMPONENT(name) \
657 CLASS_LOADER_REGISTER_CLASS(name, apollo::cyber::ComponentBase)