18#include <unordered_map>
20#include <boost/algorithm/string/classification.hpp>
21#include <boost/algorithm/string/split.hpp>
22#include <boost/filesystem.hpp>
25#include "cyber/proto/record.pb.h"
36void ChannelVerify::Reset() {
38 checked_records_.clear();
39 sp_vec_check_result_ =
40 std::make_shared<std::vector<OneRecordChannelCheckResult>>();
44 const std::string& record_dir_or_record_full_path) {
45 std::vector<std::string> records_path;
46 records_path = GetRecordsPath(record_dir_or_record_full_path);
47 if (records_path.empty()) {
48 AINFO <<
"have no data file to check";
52 IncrementalCheck(records_path);
57std::shared_ptr<std::vector<OneRecordChannelCheckResult>>
59 return sp_vec_check_result_;
62int ChannelVerify::IncrementalCheck(
63 const std::vector<std::string>& records_path) {
64 std::vector<std::string> not_check_records_path;
65 AINFO <<
"all records path:";
66 for (
size_t i = 0; i < records_path.size(); ++i) {
67 AINFO <<
"[" << i <<
"]: " << records_path[i];
68 if (IsRecordFile(records_path[i]) && !IsRecordChecked(records_path[i])) {
69 not_check_records_path.push_back(records_path[i]);
73 AINFO <<
"not_check_records_path:";
74 for (
size_t i = 0; i < not_check_records_path.size(); ++i) {
75 AINFO <<
"[" << i <<
"]: " << not_check_records_path[i];
76 OneRecordChannelCheckResult check_result =
77 CheckRecordChannels(not_check_records_path[i]);
78 if (check_result.record_path.empty()) {
81 sp_vec_check_result_->push_back(check_result);
87bool ChannelVerify::IsRecordFile(
const std::string& record_path)
const {
88 if (!boost::filesystem::exists(record_path)) {
89 AINFO <<
"path [" << record_path <<
"] does not exist";
92 if (!boost::filesystem::is_regular_file(record_path)) {
93 AINFO <<
"path [" << record_path <<
"] is not a regular file";
101std::vector<std::string> ChannelVerify::GetRecordsPath(
102 const std::string& record_dir_or_record_full_path)
const {
105 std::vector<std::string> records_path;
107 boost::filesystem::path path(record_dir_or_record_full_path);
108 if (!boost::filesystem::exists(path)) {
109 AINFO <<
"record path [" << record_dir_or_record_full_path
110 <<
"] does not exist";
114 if (IsRecordFile(record_dir_or_record_full_path)) {
115 records_path.push_back(record_dir_or_record_full_path);
116 }
else if (boost::filesystem::is_directory(path)) {
117 using dit_t = boost::filesystem::directory_iterator;
119 for (dit_t it(record_dir_or_record_full_path); it != end; ++it) {
120 if (IsRecordFile(it->path().string())) {
121 records_path.push_back(it->path().string());
128bool ChannelVerify::IsRecordChecked(
const std::string& record_path) {
129 return !checked_records_.insert(record_path).second;
132std::shared_ptr<CyberRecordInfo> ChannelVerify::GetRecordInfo(
133 const std::string& record_path)
const {
134 if (!IsRecordFile(record_path)) {
135 AINFO <<
"get_record_info failed.[" << record_path
136 <<
"] is not record file";
139 std::shared_ptr<CyberRecordInfo> sp_record_info(
new CyberRecordInfo);
140 std::shared_ptr<apollo::cyber::record::RecordReader> sp_reader =
141 std::make_shared<apollo::cyber::record::RecordReader>(record_path);
142 if (sp_reader ==
nullptr || !sp_reader->IsValid()) {
143 AINFO <<
"open record [" << record_path <<
"] failed";
146 std::shared_ptr<apollo::cyber::record::RecordViewer> sp_viewer(
148 sp_record_info->path = record_path;
149 sp_record_info->start_time = sp_viewer->begin_time();
150 sp_record_info->end_time = sp_viewer->end_time();
151 sp_record_info->duration =
152 static_cast<double>((sp_viewer->end_time() - sp_viewer->begin_time())) /
155 std::set<std::string> channel_list = sp_reader->GetChannelList();
156 for (
auto it = channel_list.begin(); it != channel_list.end(); ++it) {
157 const std::string& channel_name = *it;
158 CyberRecordChannel channel;
159 channel.channel_name = channel_name;
160 channel.msgnum = sp_reader->GetMessageNumber(channel_name);
161 channel.msg_type = sp_reader->GetMessageType(channel_name);
162 sp_record_info->channels.push_back(channel);
164 return sp_record_info;
167OneRecordChannelCheckResult ChannelVerify::CheckRecordChannels(
168 const std::string& record_path) {
169 OneRecordChannelCheckResult check_result;
170 std::shared_ptr<CyberRecordInfo> sp_record_info = GetRecordInfo(record_path);
171 if (sp_record_info ==
nullptr) {
174 std::vector<CyberRecordChannel>& channels = sp_record_info->channels;
175 std::vector<std::pair<std::string, double>>& topic_list =
176 sp_conf_->topic_list;
177 check_result.record_path = record_path;
178 check_result.start_time = sp_record_info->start_time;
179 for (
size_t i = 0; i < topic_list.size(); ++i) {
180 std::string& channel_in_list = topic_list[i].first;
181 double channel_expected_rate = topic_list[i].second;
182 bool channel_in_list_found =
false;
184 for (j = 0; j < channels.size(); ++j) {
185 std::string& channel_in_record = channels[j].channel_name;
186 if (channel_in_record == channel_in_list) {
187 channel_in_list_found =
true;
191 if (!channel_in_list_found) {
192 AINFO << record_path <<
" lacks [" << channel_in_list <<
"]";
193 check_result.lack_channels.push_back(channel_in_list);
196 static_cast<double>((channels[j].msgnum)) / sp_record_info->duration;
197 if (actual_rate < 1e-8) {
199 AINFO <<
"msgnum:" << channels[j].msgnum
200 <<
",duration:" << sp_record_info->duration;
202 AINFO << record_path <<
" [" << channel_in_list
203 <<
"] expected rate: " << channel_expected_rate
204 <<
", actual rate: " << actual_rate;
206 channel_expected_rate * sp_conf_->topic_rate_tolerance) {
207 check_result.inadequate_rate[channel_in_list] =
208 std::make_pair(channel_expected_rate, actual_rate);
ErrorCode GetReturnState() const
ChannelVerify(std::shared_ptr< JsonConf > sp_conf)
std::shared_ptr< std::vector< OneRecordChannelCheckResult > > get_check_result() const
ErrorCode Check(const std::string &record_dir_or_record_full_path)
@ ERROR_VERIFY_NO_RECORDERS