251 {
254
255 if (config.readers_size() < 2) {
256 AERROR <<
"Invalid config file: too few readers.";
257 return false;
258 }
259
261 AERROR <<
"Component Init() failed.";
262 return false;
263 }
264
265 bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
266
267 ReaderConfig reader_cfg;
268 reader_cfg.channel_name = config.readers(1).channel();
269 reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
270 reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();
271
272 auto reader1 =
node_->template CreateReader<M1>(reader_cfg);
273
274 reader_cfg.channel_name = config.readers(0).channel();
275 reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
276 reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();
277
278 auto role_attr = std::make_shared<proto::RoleAttributes>();
279 role_attr->set_node_name(config.name());
280 role_attr->set_channel_name(config.readers(0).channel());
281
282 std::shared_ptr<Reader<M0>> reader0 = nullptr;
284 reader0 =
node_->template CreateReader<M0>(reader_cfg);
285 } else {
286 std::weak_ptr<Component<M0, M1>> self =
287 std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
288
290 config.readers(1).channel());
291
292 auto func = [self, blocker1, role_attr](
const std::shared_ptr<M0>& msg0) {
294 auto ptr = self.lock();
295 if (ptr) {
296 if (!blocker1->IsPublishedEmpty()) {
297 auto msg1 = blocker1->GetLatestPublishedPtr();
298 ptr->Process(msg0, msg1);
300
301 uint64_t process_start_time;
302 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
303 *role_attr, end_time - start_time);
304 if (statistics::Statistics::Instance()->GetProcStatus(
305 *role_attr, &process_start_time) &&
306 (start_time - process_start_time) > 0) {
307 statistics::Statistics::Instance()->SamplingCyberLatency(
308 *role_attr, start_time - process_start_time);
309 }
310 }
311 } else {
312 AERROR <<
"Component object has been destroyed.";
313 }
314 };
315
316 reader0 =
node_->template CreateReader<M0>(reader_cfg, func);
317 }
318 if (reader0 == nullptr || reader1 == nullptr) {
319 AERROR <<
"Component create reader failed.";
320 return false;
321 }
322 readers_.push_back(std::move(reader0));
323 readers_.push_back(std::move(reader1));
324
326 return true;
327 }
328
330 std::weak_ptr<Component<M0, M1>> self =
331 std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
332 auto func = [self, role_attr](
const std::shared_ptr<M0>& msg0,
333 const std::shared_ptr<M1>& msg1) {
335 auto ptr = self.lock();
336 if (ptr) {
337 ptr->Process(msg0, msg1);
339
340 uint64_t process_start_time;
341 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
342 *role_attr, end_time - start_time);
343 if (statistics::Statistics::Instance()->GetProcStatus(
344 *role_attr, &process_start_time) &&
345 (start_time - process_start_time) > 0) {
346 statistics::Statistics::Instance()->SamplingCyberLatency(
347 *role_attr, start_time - process_start_time);
348 }
349 } else {
350 AERROR <<
"Component object has been destroyed.";
351 }
352 };
353
354 std::vector<data::VisitorConfig> config_list;
356 config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
357 }
358 auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
359 croutine::RoutineFactory factory =
360 croutine::CreateRoutineFactory<M0, M1>(func, dv);
361 return sched->CreateTask(factory,
node_->Name());
362}
std::vector< std::shared_ptr< ReaderBase > > readers_
std::shared_ptr< Node > node_
void LoadConfigFiles(const ComponentConfig &config)
uint64_t ToMicrosecond() const
convert time to microsecond (us).
static Time Now()
get the current time.
static const std::shared_ptr< BlockerManager > & Instance()
#define cyber_unlikely(x)