Apollo 10.0
自动驾驶开放平台
post_record_processor.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2019 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <dirent.h>
20
21#include <algorithm>
22#include <limits>
23#include <memory>
24
25#include "absl/strings/str_cat.h"
26#include "cyber/common/file.h"
27#include "cyber/common/log.h"
30
33
34namespace apollo {
35namespace data {
36
38using cyber::record::RecordReader;
39using cyber::record::RecordViewer;
40using cyber::record::RecordWriter;
41
43 if (!DirectoryExists(source_record_dir_)) {
44 AERROR << "source record dir does not exist: " << source_record_dir_;
45 return false;
46 }
47 LoadSourceRecords();
48 if (source_record_files_.empty()) {
49 AERROR << "source record dir does not have any records: "
51 return false;
52 }
53 if (!RecordProcessor::Init(trigger_conf)) {
54 AERROR << "base init failed";
55 return false;
56 }
57 return true;
58}
59
61 // First scan, get intervals
62 for (const std::string& record : source_record_files_) {
63 const auto reader = std::make_shared<RecordReader>(
64 absl::StrCat(source_record_dir_, "/", record));
65 RecordViewer viewer(reader, 0,
66 std::numeric_limits<uint64_t>::max(),
67 ChannelPool::Instance()->GetAllChannels());
68 AINFO << record << ":" << viewer.begin_time() << " - " << viewer.end_time();
69 for (const auto& msg : viewer) {
70 for (const auto& trigger : triggers_) {
71 trigger->Pull(msg);
72 }
73 }
74 }
75 // Second scan, restore messages based on intervals in the first scan
76 IntervalPool::Instance()->ReorgIntervals();
77 IntervalPool::Instance()->PrintIntervals();
78 for (const std::string& record : source_record_files_) {
79 const auto reader = std::make_shared<RecordReader>(
80 absl::StrCat(source_record_dir_, "/", record));
81 RecordViewer viewer(reader, 0,
82 std::numeric_limits<uint64_t>::max(),
83 ChannelPool::Instance()->GetAllChannels());
84 for (const auto& msg : viewer) {
85 // If the message fall into generated intervals,
86 // or required by any triggers, restore it
87 if (IntervalPool::Instance()->MessageFallIntoRange(msg.time) ||
88 ShouldRestore(msg)) {
89 writer_->WriteChannel(msg.channel_name,
90 reader->GetMessageType(msg.channel_name),
91 reader->GetProtoDesc(msg.channel_name));
92 writer_->WriteMessage(msg.channel_name, msg.content, msg.time);
93 }
94 }
95 }
96 return true;
97}
98
100 std::string src_file_name = source_record_files_.front();
101 const std::string record_flag(".record");
102 src_file_name.resize(src_file_name.size() - src_file_name.find(record_flag) +
103 record_flag.size() + 1);
104 return absl::StrCat(restored_output_dir_, "/", src_file_name);
105}
106
107void PostRecordProcessor::LoadSourceRecords() {
108 DIR* dirp = opendir(source_record_dir_.c_str());
109 if (dirp == nullptr) {
110 AERROR << "failed to open source dir: " << source_record_dir_;
111 return;
112 }
113 struct dirent* dp = nullptr;
114 while ((dp = readdir(dirp)) != nullptr) {
115 const std::string file_name = dp->d_name;
116 if (dp->d_type == DT_REG &&
117 file_name.find(".record") != std::string::npos) {
118 source_record_files_.push_back(file_name);
119 }
120 }
121 closedir(dirp);
122 std::sort(source_record_files_.begin(), source_record_files_.end());
123}
124
125} // namespace data
126} // namespace apollo
uint64_t begin_time() const
Get begin time.
uint64_t end_time() const
Get end time.
std::string GetDefaultOutputFile() const override
bool Init(const SmartRecordTrigger &trigger_conf) override
std::vector< std::unique_ptr< TriggerBase > > triggers_
const std::string restored_output_dir_
std::unique_ptr< cyber::record::RecordWriter > writer_
bool ShouldRestore(const cyber::record::RecordMessage &msg) const
const std::string source_record_dir_
virtual bool Init(const SmartRecordTrigger &trigger_conf)
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
bool DirectoryExists(const std::string &directory_path)
Check if the directory specified by directory_path exists and is indeed a directory.
Definition file.cc:207
class register implement
Definition arena_queue.h:37