Apollo 10.0
自动驾驶开放平台
apollo::data::RealtimeRecordProcessor类 参考

Realtime processor against recorded tasks that are being recorded 更多...

#include <realtime_record_processor.h>

类 apollo::data::RealtimeRecordProcessor 继承关系图:
apollo::data::RealtimeRecordProcessor 的协作图:

Public 成员函数

 RealtimeRecordProcessor (const std::string &source_record_dir, const std::string &restored_output_dir)
 
bool Init (const SmartRecordTrigger &trigger_conf) override
 
bool Process () override
 
std::string GetDefaultOutputFile () const override
 
void MonitorStatus ()
 
virtual ~RealtimeRecordProcessor ()=default
 
- Public 成员函数 继承自 apollo::data::RecordProcessor
 RecordProcessor (const std::string &source_record_dir, const std::string &restored_output_dir)
 
virtual ~RecordProcessor ()
 

额外继承的成员函数

- Protected 成员函数 继承自 apollo::data::RecordProcessor
bool InitTriggers (const SmartRecordTrigger &trigger_conf)
 
bool ShouldRestore (const cyber::record::RecordMessage &msg) const
 
- Protected 属性 继承自 apollo::data::RecordProcessor
const std::string source_record_dir_
 
const std::string restored_output_dir_
 
std::vector< std::unique_ptr< TriggerBase > > triggers_
 
std::unique_ptr< cyber::record::RecordWriterwriter_ = nullptr
 

详细描述

Realtime processor against recorded tasks that are being recorded

在文件 realtime_record_processor.h38 行定义.

构造及析构函数说明

◆ RealtimeRecordProcessor()

apollo::data::RealtimeRecordProcessor::RealtimeRecordProcessor ( const std::string &  source_record_dir,
const std::string &  restored_output_dir 
)

在文件 realtime_record_processor.cc92 行定义.

95 : RecordProcessor(source_record_dir, restored_output_dir) {
96 default_output_filename_ = restored_output_dir_;
97 default_output_filename_.erase(
98 std::remove(default_output_filename_.begin(),
99 default_output_filename_.end(), '-'),
100 default_output_filename_.end());
101 default_output_filename_ =
102 GetFileName(absl::StrCat(default_output_filename_, ".record"), false);
103}
RecordProcessor(const std::string &source_record_dir, const std::string &restored_output_dir)
const std::string restored_output_dir_
std::string GetFileName(const std::string &path, const bool remove_extension)
Definition file.cc:415

◆ ~RealtimeRecordProcessor()

virtual apollo::data::RealtimeRecordProcessor::~RealtimeRecordProcessor ( )
virtualdefault

成员函数说明

◆ GetDefaultOutputFile()

std::string apollo::data::RealtimeRecordProcessor::GetDefaultOutputFile ( ) const
inlineoverridevirtual

实现了 apollo::data::RecordProcessor.

在文件 realtime_record_processor.h44 行定义.

44 {
45 return absl::StrCat(restored_output_dir_, "/", default_output_filename_);
46 };

◆ Init()

bool apollo::data::RealtimeRecordProcessor::Init ( const SmartRecordTrigger trigger_conf)
overridevirtual

重载 apollo::data::RecordProcessor .

在文件 realtime_record_processor.cc105 行定义.

105 {
106 // Init input/output, for realtime processor create both
107 // input and output dir if they do not exist
110 AERROR << "unable to init input/output dir: " << source_record_dir_ << "/"
112 return false;
113 }
115 AERROR << "unable to clear input dir: " << source_record_dir_;
116 return false;
117 }
118 // Init recorder
119 cyber::Init("smart_recorder");
120 smart_recorder_node_ = CreateNode(absl::StrCat("smart_recorder_", getpid()));
121 if (smart_recorder_node_ == nullptr) {
122 AERROR << "create smart recorder node failed: " << getpid();
123 return false;
124 }
125 recorder_status_writer_ =
126 smart_recorder_node_->CreateWriter<SmartRecorderStatus>(
127 FLAGS_recorder_status_topic);
128 max_backward_time_ = trigger_conf.max_backward_time();
129 min_restore_chunk_ = trigger_conf.min_restore_chunk();
130 std::vector<std::string> all_channels;
131 std::vector<std::string> black_channels;
132 const std::set<std::string>& all_channels_set =
133 ChannelPool::Instance()->GetAllChannels();
134 std::copy(all_channels_set.begin(), all_channels_set.end(),
135 std::back_inserter(all_channels));
136 recorder_ = std::make_shared<Recorder>(
137 absl::StrCat(source_record_dir_, "/", default_output_filename_), false,
138 all_channels, black_channels, HeaderBuilder::GetHeader());
139 // Init base
140 if (!RecordProcessor::Init(trigger_conf)) {
141 AERROR << "base init failed";
142 return false;
143 }
144 reused_record_num_ = trigger_conf.reused_record_num();
145 record_files_.clear();
146 return true;
147}
const std::string source_record_dir_
virtual bool Init(const SmartRecordTrigger &trigger_conf)
#define AERROR
Definition log.h:44
bool RemoveAllFiles(const std::string &directory_path)
Remove all the files under a specified directory.
Definition file.cc:328
bool EnsureDirectory(const std::string &directory_path)
Check if a specified directory specified by directory_path exists.
Definition file.cc:299
bool Init(const char *binary_name, const std::string &dag_info)
Definition init.cc:98
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space)
Definition cyber.cc:33

◆ MonitorStatus()

void apollo::data::RealtimeRecordProcessor::MonitorStatus ( )

在文件 realtime_record_processor.cc221 行定义.

221 {
222 int status_counter = 0;
223 while (!cyber::IsShutdown()) {
224 static constexpr int kCheckingFrequency = 100;
225 static constexpr int kPublishStatusFrequency = 30;
226 std::this_thread::sleep_for(std::chrono::milliseconds(kCheckingFrequency));
227 if (++status_counter % kPublishStatusFrequency == 0) {
228 status_counter = 0;
229 PublishStatus(RecordingState::RECORDING, "smart recorder recording");
230 // AINFO << "smart recorder recording status check every 3000ms a time.";
231 ProcessRestoreRecord(source_record_dir_);
232 }
233 }
234 recorder_->Stop();
235 is_terminating_ = true;
236 AINFO << "wait for a while trying to complete the restore work";
237 static constexpr int kMessageInterval = 1000;
238 int interval_counter = 0;
239 while (++interval_counter * kMessageInterval < recorder_wait_time_) {
240 MonitorManager::Instance()->LogBuffer().WARN(
241 "SmartRecorder is terminating...");
242 std::this_thread::sleep_for(std::chrono::milliseconds(kMessageInterval));
243 }
244}
#define AINFO
Definition log.h:42
bool IsShutdown()
Definition state.h:46

◆ Process()

bool apollo::data::RealtimeRecordProcessor::Process ( )
overridevirtual

实现了 apollo::data::RecordProcessor.

在文件 realtime_record_processor.cc149 行定义.

149 {
150 // Recorder goes first
151 recorder_->Start();
152 PublishStatus(RecordingState::RECORDING, "smart recorder started");
153 MonitorManager::Instance()->LogBuffer().INFO("SmartRecorder is recording...");
154 std::shared_ptr<std::thread> monitor_thread =
155 std::make_shared<std::thread>([this]() { this->MonitorStatus(); });
156 // Now fast reader follows and reacts for any events
157 std::string record_path;
158 do {
159 if (!GetNextValidRecord(&record_path)) {
160 AINFO << "record reader " << record_path << " reached end, exit now";
161 break;
162 }
163 auto reader = std::make_shared<RecordReader>(record_path);
164 RecordViewer viewer(reader, 0, std::numeric_limits<uint64_t>::max(),
165 ChannelPool::Instance()->GetAllChannels());
166 AINFO << "checking " << record_path << ": " << viewer.begin_time() << " - "
167 << viewer.end_time();
168 if (restore_reader_time_ == 0) {
169 restore_reader_time_ = viewer.begin_time();
170 GetNextValidRecord(&restore_path_);
171 }
172 for (const auto& msg : viewer) {
173 for (const auto& trigger : triggers_) {
174 trigger->Pull(msg);
175 }
176 // Slow reader restores the events if any
177 RestoreMessage(msg.time);
178 }
179 } while (!is_terminating_);
180 // Try restore the rest of messages one last time
181 RestoreMessage(std::numeric_limits<uint64_t>::max());
182 if (monitor_thread && monitor_thread->joinable()) {
183 monitor_thread->join();
184 monitor_thread = nullptr;
185 }
186 PublishStatus(RecordingState::STOPPED, "smart recorder stopped");
187 MonitorManager::Instance()->LogBuffer().INFO("SmartRecorder is stopped");
188 return true;
189}
std::vector< std::unique_ptr< TriggerBase > > triggers_

该类的文档由以下文件生成: