165 {
168
169 if (config.readers_size() < 1) {
170 AERROR <<
"Invalid config file: too few readers.";
171 return false;
172 }
173
175 AERROR <<
"Component Init() failed.";
176 return false;
177 }
178
179 bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
180
181 ReaderConfig reader_cfg;
182 reader_cfg.channel_name = config.readers(0).channel();
183 reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
184 reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();
185
186 auto role_attr = std::make_shared<proto::RoleAttributes>();
187 role_attr->set_node_name(config.name());
188 role_attr->set_channel_name(config.readers(0).channel());
189
190 std::weak_ptr<Component<M0>> self =
191 std::dynamic_pointer_cast<Component<M0>>(shared_from_this());
192 auto func = [self, role_attr](
const std::shared_ptr<M0>& msg) {
194 auto ptr = self.lock();
195 if (ptr) {
196 ptr->Process(msg);
197 } else {
198 AERROR <<
"Component object has been destroyed.";
199 }
201
202 uint64_t process_start_time;
203 statistics::Statistics::Instance()->SamplingProcLatency<uint64_t>(
204 *role_attr, end_time - start_time);
205 if (statistics::Statistics::Instance()->GetProcStatus(
206 *role_attr, &process_start_time) &&
207 (start_time - process_start_time) > 0) {
208 statistics::Statistics::Instance()->SamplingCyberLatency(
209 *role_attr, start_time - process_start_time);
210 }
211 };
212
213 std::shared_ptr<Reader<M0>> reader = nullptr;
214
216 reader =
node_->CreateReader<M0>(reader_cfg);
217 } else {
218 reader =
node_->CreateReader<M0>(reader_cfg,
func);
219 }
220
221 if (reader == nullptr) {
222 AERROR <<
"Component create reader failed.";
223 return false;
224 }
225 readers_.emplace_back(std::move(reader));
226
228 return true;
229 }
230
231 data::VisitorConfig conf = {
readers_[0]->ChannelId(),
233 auto dv = std::make_shared<data::DataVisitor<M0>>(conf);
234 croutine::RoutineFactory factory =
235 croutine::CreateRoutineFactory<M0>(func, dv);
237 return sched->CreateTask(factory,
node_->Name());
238}
std::vector< std::shared_ptr< ReaderBase > > readers_
std::shared_ptr< Node > node_
void LoadConfigFiles(const ComponentConfig &config)
uint64_t ToMicrosecond() const
convert time to microsecond (us).
static Time Now()
get the current time.
#define cyber_unlikely(x)