54using cyber::record::HeaderBuilder;
55using cyber::record::Recorder;
56using cyber::record::RecordFileReader;
57using cyber::record::RecordMessage;
58using cyber::record::RecordReader;
59using cyber::record::RecordViewer;
61std::string GetNextRecordFileName(
const std::string& record_path) {
62 static constexpr int kSuffixLen = 5;
63 const std::string kInitialSequence =
"00000";
64 if (record_path.empty()) {
65 return kInitialSequence;
67 std::stringstream record_suffix;
68 record_suffix.fill(
'0');
69 record_suffix.width(kSuffixLen);
70 record_suffix << std::stoi(record_path.substr(record_path.size() - kSuffixLen,
73 return record_suffix.str();
76bool IsRecordValid(
const std::string& record_path) {
77 if (!PathExists(record_path)) {
80 const std::unique_ptr<RecordFileReader> file_reader(
new RecordFileReader());
81 if (!file_reader->Open(record_path)) {
82 AERROR <<
"failed to open record file for checking header: " << record_path;
85 const bool is_complete = file_reader->GetHeader().is_complete();
93 const std::string& source_record_dir,
94 const std::string& restored_output_dir)
97 default_output_filename_.erase(
98 std::remove(default_output_filename_.begin(),
99 default_output_filename_.end(),
'-'),
100 default_output_filename_.end());
101 default_output_filename_ =
102 GetFileName(absl::StrCat(default_output_filename_,
".record"),
false);
120 smart_recorder_node_ = CreateNode(absl::StrCat(
"smart_recorder_", getpid()));
121 if (smart_recorder_node_ ==
nullptr) {
122 AERROR <<
"create smart recorder node failed: " << getpid();
125 recorder_status_writer_ =
127 FLAGS_recorder_status_topic);
130 std::vector<std::string> all_channels;
131 std::vector<std::string> black_channels;
132 const std::set<std::string>& all_channels_set =
133 ChannelPool::Instance()->GetAllChannels();
134 std::copy(all_channels_set.begin(), all_channels_set.end(),
135 std::back_inserter(all_channels));
136 recorder_ = std::make_shared<Recorder>(
138 all_channels, black_channels, HeaderBuilder::GetHeader());
141 AERROR <<
"base init failed";
145 record_files_.clear();
153 MonitorManager::Instance()->LogBuffer().INFO(
"SmartRecorder is recording...");
154 std::shared_ptr<std::thread> monitor_thread =
155 std::make_shared<std::thread>([
this]() { this->
MonitorStatus(); });
157 std::string record_path;
159 if (!GetNextValidRecord(&record_path)) {
160 AINFO <<
"record reader " << record_path <<
" reached end, exit now";
163 auto reader = std::make_shared<RecordReader>(record_path);
164 RecordViewer viewer(reader, 0, std::numeric_limits<uint64_t>::max(),
165 ChannelPool::Instance()->GetAllChannels());
166 AINFO <<
"checking " << record_path <<
": " << viewer.
begin_time() <<
" - "
168 if (restore_reader_time_ == 0) {
170 GetNextValidRecord(&restore_path_);
172 for (
const auto& msg : viewer) {
177 RestoreMessage(msg.time);
179 }
while (!is_terminating_);
181 RestoreMessage(std::numeric_limits<uint64_t>::max());
182 if (monitor_thread && monitor_thread->joinable()) {
183 monitor_thread->join();
184 monitor_thread =
nullptr;
187 MonitorManager::Instance()->LogBuffer().INFO(
"SmartRecorder is stopped");
191void RealtimeRecordProcessor::ProcessRestoreRecord(
192 const std::string& record_path) {
194 std::string record_source_path =
"";
195 record_source_path = record_path +
"/";
196 std::vector<std::string> files =
199 std::regex record_file_name_regex(
"[1-9][0-9]{13}\\.record\\.[0-9]{5}");
200 for (
const auto& file : files) {
201 if (std::regex_match(file, result, record_file_name_regex)) {
202 if (std::find(record_files_.begin(), record_files_.end(), file) ==
203 record_files_.end()) {
204 record_files_.emplace_back(file);
209 std::sort(record_files_.begin(), record_files_.end(),
210 [](
const std::string& a,
const std::string& b) { return a < b; });
212 if (record_files_.size() > reused_record_num_) {
214 std::remove((record_source_path + (*record_files_.begin())).c_str())) {
215 AWARN <<
"Failed to delete file: " << *record_files_.begin();
217 record_files_.erase(record_files_.begin());
222 int status_counter = 0;
224 static constexpr int kCheckingFrequency = 100;
225 static constexpr int kPublishStatusFrequency = 30;
226 std::this_thread::sleep_for(std::chrono::milliseconds(kCheckingFrequency));
227 if (++status_counter % kPublishStatusFrequency == 0) {
235 is_terminating_ =
true;
236 AINFO <<
"wait for a while trying to complete the restore work";
237 static constexpr int kMessageInterval = 1000;
238 int interval_counter = 0;
239 while (++interval_counter * kMessageInterval < recorder_wait_time_) {
240 MonitorManager::Instance()->LogBuffer().WARN(
241 "SmartRecorder is terminating...");
242 std::this_thread::sleep_for(std::chrono::milliseconds(kMessageInterval));
246void RealtimeRecordProcessor::PublishStatus(
const RecordingState state,
247 const std::string& message)
const {
249 Header* status_headerpb = status.mutable_header();
250 status_headerpb->set_timestamp_sec(
Time::Now().ToSecond());
251 status.set_recording_state(state);
252 status.set_state_message(message);
253 AINFO <<
"send message with state " << state <<
", " << message;
254 recorder_status_writer_->Write(status);
257bool RealtimeRecordProcessor::GetNextValidRecord(
258 std::string* record_path)
const {
260 ".", GetNextRecordFileName(*record_path));
261 while (!is_terminating_ && !IsRecordValid(*record_path)) {
262 AINFO <<
"next record unavailable, wait " << recorder_wait_time_ <<
" ms";
263 std::this_thread::sleep_for(std::chrono::milliseconds(recorder_wait_time_));
265 return IsRecordValid(*record_path);
268void RealtimeRecordProcessor::RestoreMessage(
const uint64_t message_time) {
275 const struct Interval interval = IntervalPool::Instance()->GetNextInterval();
276 const uint64_t target_end = std::max(
278 message_time -
static_cast<uint64_t
>(max_backward_time_ * 1000000000UL));
279 const bool small_channels_only = restore_reader_time_ >= interval.end_time;
280 if (small_channels_only &&
282 restore_reader_time_ +
283 static_cast<uint64_t
>(min_restore_chunk_ * 1000000000UL)) {
287 if (!IsRecordValid(restore_path_)) {
288 AWARN <<
"invalid restore path " << restore_path_ <<
", exit";
291 AINFO <<
"target restoring " << restore_path_ <<
": "
292 << restore_reader_time_ <<
" - " << target_end;
293 auto reader = std::make_shared<RecordReader>(restore_path_);
294 restore_reader_time_ =
295 std::max(restore_reader_time_, reader->GetHeader().begin_time());
296 if (restore_reader_time_ > target_end ||
297 reader->GetHeader().begin_time() >= reader->GetHeader().end_time()) {
298 AWARN <<
"record " << restore_path_ <<
" begin_time beyond target, exit";
301 RecordViewer viewer(reader, restore_reader_time_, target_end,
302 ChannelPool::Instance()->GetAllChannels());
303 AINFO <<
"actual restoring " << restore_path_ <<
": " << viewer.begin_time()
304 <<
" - " << viewer.end_time();
305 for (
const auto& msg : viewer) {
306 if ((!small_channels_only && msg.time >= interval.begin_time &&
307 msg.time <= interval.end_time) ||
309 if (
writer_->IsNewChannel(msg.channel_name)) {
310 writer_->WriteChannel(msg.channel_name,
311 reader->GetMessageType(msg.channel_name),
312 reader->GetProtoDesc(msg.channel_name));
314 writer_->WriteMessage(msg.channel_name, msg.content, msg.time);
317 restore_reader_time_ = std::min(reader->GetHeader().end_time(), target_end);
318 if (target_end >= reader->GetHeader().end_time()) {
319 GetNextValidRecord(&restore_path_);
321 }
while (restore_reader_time_ < target_end);
Cyber has builtin time type Time.
static Time Now()
get the current time.
uint64_t begin_time() const
Get begin time.
uint64_t end_time() const
Get end time.
bool Init(const SmartRecordTrigger &trigger_conf) override
RealtimeRecordProcessor(const std::string &source_record_dir, const std::string &restored_output_dir)
Process messages and apply the rules based on configured triggers
std::vector< std::unique_ptr< TriggerBase > > triggers_
const std::string restored_output_dir_
std::unique_ptr< cyber::record::RecordWriter > writer_
bool ShouldRestore(const cyber::record::RecordMessage &msg) const
const std::string source_record_dir_
virtual bool Init(const SmartRecordTrigger &trigger_conf)
bool PathExists(const std::string &path)
Check if the path exists.
std::string GetFileName(const std::string &path, const bool remove_extension)
bool RemoveAllFiles(const std::string &directory_path)
Remove all the files under a specified directory.
std::vector< std::string > ListSubPaths(const std::string &directory_path, const unsigned char d_type)
List sub-paths.
bool EnsureDirectory(const std::string &directory_path)
Check if a specified directory specified by directory_path exists.
bool Init(const char *binary_name, const std::string &dag_info)
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space)
optional double min_restore_chunk
optional double max_backward_time
optional int32 reused_record_num