376 {
379
380 if (config.readers_size() < 3) {
381 AERROR <<
"Invalid config file: too few readers.";
382 return false;
383 }
384
386 AERROR <<
"Component Init() failed.";
387 return false;
388 }
389
390 bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
391
392 ReaderConfig reader_cfg;
393 reader_cfg.channel_name = config.readers(1).channel();
394 reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
395 reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();
396
397 auto reader1 =
node_->template CreateReader<M1>(reader_cfg);
398
399 reader_cfg.channel_name = config.readers(2).channel();
400 reader_cfg.qos_profile.CopyFrom(config.readers(2).qos_profile());
401 reader_cfg.pending_queue_size = config.readers(2).pending_queue_size();
402
403 auto reader2 =
node_->template CreateReader<M2>(reader_cfg);
404
405 reader_cfg.channel_name = config.readers(0).channel();
406 reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
407 reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();
408
409 auto role_attr = std::make_shared<proto::RoleAttributes>();
410 role_attr->set_node_name(config.name());
411 role_attr->set_channel_name(config.readers(0).channel());
412
413 std::shared_ptr<Reader<M0>> reader0 = nullptr;
415 reader0 =
node_->template CreateReader<M0>(reader_cfg);
416 } else {
417 std::weak_ptr<Component<M0, M1, M2, NullType>> self =
418 std::dynamic_pointer_cast<Component<M0, M1, M2, NullType>>(
419 shared_from_this());
420
422 config.readers(1).channel());
424 config.readers(2).channel());
425
426 auto func = [self, blocker1, blocker2,
427 role_attr](const std::shared_ptr<M0>& msg0) {
429 auto ptr = self.lock();
430 if (ptr) {
431 if (!blocker1->IsPublishedEmpty() && !blocker2->IsPublishedEmpty()) {
432 auto msg1 = blocker1->GetLatestPublishedPtr();
433 auto msg2 = blocker2->GetLatestPublishedPtr();
434 ptr->Process(msg0, msg1, msg2);
436
437 uint64_t process_start_time;
438 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
439 *role_attr, end_time - start_time);
440 if (statistics::Statistics::Instance()->GetProcStatus(
441 *role_attr, &process_start_time) &&
442 (start_time - process_start_time) > 0) {
443 statistics::Statistics::Instance()->SamplingCyberLatency(
444 *role_attr, start_time - process_start_time);
445 }
446 }
447 } else {
448 AERROR <<
"Component object has been destroyed.";
449 }
450 };
451
452 reader0 =
node_->template CreateReader<M0>(reader_cfg, func);
453 }
454
455 if (reader0 == nullptr || reader1 == nullptr || reader2 == nullptr) {
456 AERROR <<
"Component create reader failed.";
457 return false;
458 }
459 readers_.push_back(std::move(reader0));
460 readers_.push_back(std::move(reader1));
461 readers_.push_back(std::move(reader2));
462
464 return true;
465 }
466
468 std::weak_ptr<Component<M0, M1, M2, NullType>> self =
469 std::dynamic_pointer_cast<Component<M0, M1, M2, NullType>>(
470 shared_from_this());
471 auto func = [self, role_attr](
const std::shared_ptr<M0>& msg0,
472 const std::shared_ptr<M1>& msg1,
473 const std::shared_ptr<M2>& msg2) {
475 auto ptr = self.lock();
476 if (ptr) {
477 ptr->Process(msg0, msg1, msg2);
479
480 uint64_t process_start_time;
481 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
482 *role_attr, end_time - start_time);
483 if (statistics::Statistics::Instance()->GetProcStatus(
484 *role_attr, &process_start_time) &&
485 (start_time - process_start_time) > 0) {
486 statistics::Statistics::Instance()->SamplingCyberLatency(
487 *role_attr, start_time - process_start_time);
488 }
489 } else {
490 AERROR <<
"Component object has been destroyed.";
491 }
492 };
493
494 std::vector<data::VisitorConfig> config_list;
496 config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
497 }
498 auto dv = std::make_shared<data::DataVisitor<M0, M1, M2>>(config_list);
499 croutine::RoutineFactory factory =
500 croutine::CreateRoutineFactory<M0, M1, M2>(func, dv);
501 return sched->CreateTask(factory,
node_->Name());
502}
std::vector< std::shared_ptr< ReaderBase > > readers_
std::shared_ptr< Node > node_
void LoadConfigFiles(const ComponentConfig &config)
uint64_t ToMicrosecond() const
convert time to microsecond (us).
static Time Now()
get the current time.
static const std::shared_ptr< BlockerManager > & Instance()
#define cyber_unlikely(x)