Apollo 10.0
自动驾驶开放平台
realtime_record_processor.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2019 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <csignal>
20
21#include <algorithm>
22#include <chrono>
23#include <limits>
24#include <set>
25#include <sstream>
26#include <thread>
27
28#include "cyber/common/file.h"
29#include "cyber/common/log.h"
30#include "cyber/init.h"
36
39
41
42namespace apollo {
43namespace data {
44
45namespace {
46
54using cyber::record::HeaderBuilder;
55using cyber::record::Recorder;
56using cyber::record::RecordFileReader;
57using cyber::record::RecordMessage;
58using cyber::record::RecordReader;
59using cyber::record::RecordViewer;
60
61std::string GetNextRecordFileName(const std::string& record_path) {
62 static constexpr int kSuffixLen = 5;
63 const std::string kInitialSequence = "00000";
64 if (record_path.empty()) {
65 return kInitialSequence;
66 }
67 std::stringstream record_suffix;
68 record_suffix.fill('0');
69 record_suffix.width(kSuffixLen);
70 record_suffix << std::stoi(record_path.substr(record_path.size() - kSuffixLen,
71 kSuffixLen)) +
72 1;
73 return record_suffix.str();
74}
75
76bool IsRecordValid(const std::string& record_path) {
77 if (!PathExists(record_path)) {
78 return false;
79 }
80 const std::unique_ptr<RecordFileReader> file_reader(new RecordFileReader());
81 if (!file_reader->Open(record_path)) {
82 AERROR << "failed to open record file for checking header: " << record_path;
83 return false;
84 }
85 const bool is_complete = file_reader->GetHeader().is_complete();
86 file_reader->Close();
87 return is_complete;
88}
89
90} // namespace
91
93 const std::string& source_record_dir,
94 const std::string& restored_output_dir)
95 : RecordProcessor(source_record_dir, restored_output_dir) {
96 default_output_filename_ = restored_output_dir_;
97 default_output_filename_.erase(
98 std::remove(default_output_filename_.begin(),
99 default_output_filename_.end(), '-'),
100 default_output_filename_.end());
101 default_output_filename_ =
102 GetFileName(absl::StrCat(default_output_filename_, ".record"), false);
103}
104
106 // Init input/output, for realtime processor create both
107 // input and output dir if they do not exist
108 if (!EnsureDirectory(source_record_dir_) ||
109 !EnsureDirectory(restored_output_dir_)) {
110 AERROR << "unable to init input/output dir: " << source_record_dir_ << "/"
112 return false;
113 }
114 if (!RemoveAllFiles(source_record_dir_)) {
115 AERROR << "unable to clear input dir: " << source_record_dir_;
116 return false;
117 }
118 // Init recorder
119 cyber::Init("smart_recorder");
120 smart_recorder_node_ = CreateNode(absl::StrCat("smart_recorder_", getpid()));
121 if (smart_recorder_node_ == nullptr) {
122 AERROR << "create smart recorder node failed: " << getpid();
123 return false;
124 }
125 recorder_status_writer_ =
126 smart_recorder_node_->CreateWriter<SmartRecorderStatus>(
127 FLAGS_recorder_status_topic);
128 max_backward_time_ = trigger_conf.max_backward_time();
129 min_restore_chunk_ = trigger_conf.min_restore_chunk();
130 std::vector<std::string> all_channels;
131 std::vector<std::string> black_channels;
132 const std::set<std::string>& all_channels_set =
133 ChannelPool::Instance()->GetAllChannels();
134 std::copy(all_channels_set.begin(), all_channels_set.end(),
135 std::back_inserter(all_channels));
136 recorder_ = std::make_shared<Recorder>(
137 absl::StrCat(source_record_dir_, "/", default_output_filename_), false,
138 all_channels, black_channels, HeaderBuilder::GetHeader());
139 // Init base
140 if (!RecordProcessor::Init(trigger_conf)) {
141 AERROR << "base init failed";
142 return false;
143 }
144 reused_record_num_ = trigger_conf.reused_record_num();
145 record_files_.clear();
146 return true;
147}
148
150 // Recorder goes first
151 recorder_->Start();
152 PublishStatus(RecordingState::RECORDING, "smart recorder started");
153 MonitorManager::Instance()->LogBuffer().INFO("SmartRecorder is recording...");
154 std::shared_ptr<std::thread> monitor_thread =
155 std::make_shared<std::thread>([this]() { this->MonitorStatus(); });
156 // Now fast reader follows and reacts for any events
157 std::string record_path;
158 do {
159 if (!GetNextValidRecord(&record_path)) {
160 AINFO << "record reader " << record_path << " reached end, exit now";
161 break;
162 }
163 auto reader = std::make_shared<RecordReader>(record_path);
164 RecordViewer viewer(reader, 0, std::numeric_limits<uint64_t>::max(),
165 ChannelPool::Instance()->GetAllChannels());
166 AINFO << "checking " << record_path << ": " << viewer.begin_time() << " - "
167 << viewer.end_time();
168 if (restore_reader_time_ == 0) {
169 restore_reader_time_ = viewer.begin_time();
170 GetNextValidRecord(&restore_path_);
171 }
172 for (const auto& msg : viewer) {
173 for (const auto& trigger : triggers_) {
174 trigger->Pull(msg);
175 }
176 // Slow reader restores the events if any
177 RestoreMessage(msg.time);
178 }
179 } while (!is_terminating_);
180 // Try restore the rest of messages one last time
181 RestoreMessage(std::numeric_limits<uint64_t>::max());
182 if (monitor_thread && monitor_thread->joinable()) {
183 monitor_thread->join();
184 monitor_thread = nullptr;
185 }
186 PublishStatus(RecordingState::STOPPED, "smart recorder stopped");
187 MonitorManager::Instance()->LogBuffer().INFO("SmartRecorder is stopped");
188 return true;
189}
190
191void RealtimeRecordProcessor::ProcessRestoreRecord(
192 const std::string& record_path) {
193 // Get all the record files
194 std::string record_source_path = "";
195 record_source_path = record_path + "/";
196 std::vector<std::string> files =
197 cyber::common::ListSubPaths(record_source_path, DT_REG);
198 std::smatch result;
199 std::regex record_file_name_regex("[1-9][0-9]{13}\\.record\\.[0-9]{5}");
200 for (const auto& file : files) {
201 if (std::regex_match(file, result, record_file_name_regex)) {
202 if (std::find(record_files_.begin(), record_files_.end(), file) ==
203 record_files_.end()) {
204 record_files_.emplace_back(file);
205 }
206 }
207 }
208 // Sort the files in name order.
209 std::sort(record_files_.begin(), record_files_.end(),
210 [](const std::string& a, const std::string& b) { return a < b; });
211 // Delete the overdue files by num.
212 if (record_files_.size() > reused_record_num_) {
213 if (0 !=
214 std::remove((record_source_path + (*record_files_.begin())).c_str())) {
215 AWARN << "Failed to delete file: " << *record_files_.begin();
216 }
217 record_files_.erase(record_files_.begin());
218 }
219}
220
222 int status_counter = 0;
223 while (!cyber::IsShutdown()) {
224 static constexpr int kCheckingFrequency = 100;
225 static constexpr int kPublishStatusFrequency = 30;
226 std::this_thread::sleep_for(std::chrono::milliseconds(kCheckingFrequency));
227 if (++status_counter % kPublishStatusFrequency == 0) {
228 status_counter = 0;
229 PublishStatus(RecordingState::RECORDING, "smart recorder recording");
230 // AINFO << "smart recorder recording status check every 3000ms a time.";
231 ProcessRestoreRecord(source_record_dir_);
232 }
233 }
234 recorder_->Stop();
235 is_terminating_ = true;
236 AINFO << "wait for a while trying to complete the restore work";
237 static constexpr int kMessageInterval = 1000;
238 int interval_counter = 0;
239 while (++interval_counter * kMessageInterval < recorder_wait_time_) {
240 MonitorManager::Instance()->LogBuffer().WARN(
241 "SmartRecorder is terminating...");
242 std::this_thread::sleep_for(std::chrono::milliseconds(kMessageInterval));
243 }
244}
245
246void RealtimeRecordProcessor::PublishStatus(const RecordingState state,
247 const std::string& message) const {
248 SmartRecorderStatus status;
249 Header* status_headerpb = status.mutable_header();
250 status_headerpb->set_timestamp_sec(Time::Now().ToSecond());
251 status.set_recording_state(state);
252 status.set_state_message(message);
253 AINFO << "send message with state " << state << ", " << message;
254 recorder_status_writer_->Write(status);
255}
256
257bool RealtimeRecordProcessor::GetNextValidRecord(
258 std::string* record_path) const {
259 *record_path = absl::StrCat(source_record_dir_, "/", default_output_filename_,
260 ".", GetNextRecordFileName(*record_path));
261 while (!is_terminating_ && !IsRecordValid(*record_path)) {
262 AINFO << "next record unavailable, wait " << recorder_wait_time_ << " ms";
263 std::this_thread::sleep_for(std::chrono::milliseconds(recorder_wait_time_));
264 }
265 return IsRecordValid(*record_path);
266}
267
268void RealtimeRecordProcessor::RestoreMessage(const uint64_t message_time) {
269 // Check and restore messages, logic is:
270 // 1. If new events got triggered, restore reader proceeds all the way to the
271 // event's end
272 // 2. If no events got triggered, but given message leads the restore reader
273 // by more than max value, proceeds to the max value point
274 // 3. Otherwise, do nothing
275 const struct Interval interval = IntervalPool::Instance()->GetNextInterval();
276 const uint64_t target_end = std::max(
277 interval.end_time,
278 message_time - static_cast<uint64_t>(max_backward_time_ * 1000000000UL));
279 const bool small_channels_only = restore_reader_time_ >= interval.end_time;
280 if (small_channels_only &&
281 target_end <=
282 restore_reader_time_ +
283 static_cast<uint64_t>(min_restore_chunk_ * 1000000000UL)) {
284 return;
285 }
286 do {
287 if (!IsRecordValid(restore_path_)) {
288 AWARN << "invalid restore path " << restore_path_ << ", exit";
289 break;
290 }
291 AINFO << "target restoring " << restore_path_ << ": "
292 << restore_reader_time_ << " - " << target_end;
293 auto reader = std::make_shared<RecordReader>(restore_path_);
294 restore_reader_time_ =
295 std::max(restore_reader_time_, reader->GetHeader().begin_time());
296 if (restore_reader_time_ > target_end ||
297 reader->GetHeader().begin_time() >= reader->GetHeader().end_time()) {
298 AWARN << "record " << restore_path_ << " begin_time beyond target, exit";
299 break;
300 }
301 RecordViewer viewer(reader, restore_reader_time_, target_end,
302 ChannelPool::Instance()->GetAllChannels());
303 AINFO << "actual restoring " << restore_path_ << ": " << viewer.begin_time()
304 << " - " << viewer.end_time();
305 for (const auto& msg : viewer) {
306 if ((!small_channels_only && msg.time >= interval.begin_time &&
307 msg.time <= interval.end_time) ||
308 ShouldRestore(msg)) {
309 if (writer_->IsNewChannel(msg.channel_name)) {
310 writer_->WriteChannel(msg.channel_name,
311 reader->GetMessageType(msg.channel_name),
312 reader->GetProtoDesc(msg.channel_name));
313 }
314 writer_->WriteMessage(msg.channel_name, msg.content, msg.time);
315 }
316 }
317 restore_reader_time_ = std::min(reader->GetHeader().end_time(), target_end);
318 if (target_end >= reader->GetHeader().end_time()) {
319 GetNextValidRecord(&restore_path_);
320 }
321 } while (restore_reader_time_ < target_end);
322}
323
324} // namespace data
325} // namespace apollo
Cyber has builtin time type Time.
Definition time.h:31
static Time Now()
get the current time.
Definition time.cc:57
uint64_t begin_time() const
Get begin time.
uint64_t end_time() const
Get end time.
bool Init(const SmartRecordTrigger &trigger_conf) override
RealtimeRecordProcessor(const std::string &source_record_dir, const std::string &restored_output_dir)
Process messages and apply the rules based on configured triggers
std::vector< std::unique_ptr< TriggerBase > > triggers_
const std::string restored_output_dir_
std::unique_ptr< cyber::record::RecordWriter > writer_
bool ShouldRestore(const cyber::record::RecordMessage &msg) const
const std::string source_record_dir_
virtual bool Init(const SmartRecordTrigger &trigger_conf)
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
#define AWARN
Definition log.h:43
bool PathExists(const std::string &path)
Check if the path exists.
Definition file.cc:195
std::string GetFileName(const std::string &path, const bool remove_extension)
Definition file.cc:415
bool RemoveAllFiles(const std::string &directory_path)
Remove all the files under a specified directory.
Definition file.cc:328
std::vector< std::string > ListSubPaths(const std::string &directory_path, const unsigned char d_type)
List sub-paths.
Definition file.cc:353
bool EnsureDirectory(const std::string &directory_path)
Check if a specified directory specified by directory_path exists.
Definition file.cc:299
bool IsShutdown()
Definition state.h:46
bool Init(const char *binary_name, const std::string &dag_info)
Definition init.cc:98
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space)
Definition cyber.cc:33
class register implement
Definition arena_queue.h:37