Apollo 10.0
自动驾驶开放平台
channel_verify.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2019 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
17
18#include <unordered_map>
19
20#include <boost/algorithm/string/classification.hpp>
21#include <boost/algorithm/string/split.hpp>
22#include <boost/filesystem.hpp>
23
24#include "cyber/cyber.h"
25#include "cyber/proto/record.pb.h"
27
28namespace apollo {
29namespace hdmap {
30
31ChannelVerify::ChannelVerify(std::shared_ptr<JsonConf> sp_conf)
32 : sp_conf_(sp_conf) {
33 Reset();
34}
35
36void ChannelVerify::Reset() {
37 return_state_ = ErrorCode::SUCCESS;
38 checked_records_.clear();
39 sp_vec_check_result_ =
40 std::make_shared<std::vector<OneRecordChannelCheckResult>>();
41}
42
44 const std::string& record_dir_or_record_full_path) {
45 std::vector<std::string> records_path;
46 records_path = GetRecordsPath(record_dir_or_record_full_path);
47 if (records_path.empty()) {
48 AINFO << "have no data file to check";
50 return return_state_;
51 }
52 IncrementalCheck(records_path);
53 return_state_ = ErrorCode::SUCCESS;
54 return return_state_;
55}
56
57std::shared_ptr<std::vector<OneRecordChannelCheckResult>>
59 return sp_vec_check_result_;
60}
61
62int ChannelVerify::IncrementalCheck(
63 const std::vector<std::string>& records_path) {
64 std::vector<std::string> not_check_records_path;
65 AINFO << "all records path:";
66 for (size_t i = 0; i < records_path.size(); ++i) {
67 AINFO << "[" << i << "]: " << records_path[i];
68 if (IsRecordFile(records_path[i]) && !IsRecordChecked(records_path[i])) {
69 not_check_records_path.push_back(records_path[i]);
70 }
71 }
72
73 AINFO << "not_check_records_path:";
74 for (size_t i = 0; i < not_check_records_path.size(); ++i) {
75 AINFO << "[" << i << "]: " << not_check_records_path[i];
76 OneRecordChannelCheckResult check_result =
77 CheckRecordChannels(not_check_records_path[i]);
78 if (check_result.record_path.empty()) {
79 continue;
80 }
81 sp_vec_check_result_->push_back(check_result);
82 }
83
84 return 0;
85}
86
87bool ChannelVerify::IsRecordFile(const std::string& record_path) const {
88 if (!boost::filesystem::exists(record_path)) {
89 AINFO << "path [" << record_path << "] does not exist";
90 return false;
91 }
92 if (!boost::filesystem::is_regular_file(record_path)) {
93 AINFO << "path [" << record_path << "] is not a regular file";
94 return false;
95 }
96 // To avoid disk overhead caused by opening files twice, the real
97 // file checking is placed in the function [ChannelVerify::get_record_info]
98 return true;
99}
100
101std::vector<std::string> ChannelVerify::GetRecordsPath(
102 const std::string& record_dir_or_record_full_path) const {
103 // record_dir_or_record_full_path is record fullpath or
104 // directory which contains some records
105 std::vector<std::string> records_path;
106 // 1. check record_dir_or_record_full_path is valid or not
107 boost::filesystem::path path(record_dir_or_record_full_path);
108 if (!boost::filesystem::exists(path)) {
109 AINFO << "record path [" << record_dir_or_record_full_path
110 << "] does not exist";
111 return records_path;
112 }
113
114 if (IsRecordFile(record_dir_or_record_full_path)) {
115 records_path.push_back(record_dir_or_record_full_path);
116 } else if (boost::filesystem::is_directory(path)) {
117 using dit_t = boost::filesystem::directory_iterator;
118 dit_t end;
119 for (dit_t it(record_dir_or_record_full_path); it != end; ++it) {
120 if (IsRecordFile(it->path().string())) {
121 records_path.push_back(it->path().string());
122 }
123 }
124 }
125 return records_path;
126}
127
128bool ChannelVerify::IsRecordChecked(const std::string& record_path) {
129 return !checked_records_.insert(record_path).second;
130}
131
132std::shared_ptr<CyberRecordInfo> ChannelVerify::GetRecordInfo(
133 const std::string& record_path) const {
134 if (!IsRecordFile(record_path)) {
135 AINFO << "get_record_info failed.[" << record_path
136 << "] is not record file";
137 return nullptr;
138 }
139 std::shared_ptr<CyberRecordInfo> sp_record_info(new CyberRecordInfo);
140 std::shared_ptr<apollo::cyber::record::RecordReader> sp_reader =
141 std::make_shared<apollo::cyber::record::RecordReader>(record_path);
142 if (sp_reader == nullptr || !sp_reader->IsValid()) {
143 AINFO << "open record [" << record_path << "] failed";
144 return nullptr;
145 }
146 std::shared_ptr<apollo::cyber::record::RecordViewer> sp_viewer(
148 sp_record_info->path = record_path;
149 sp_record_info->start_time = sp_viewer->begin_time();
150 sp_record_info->end_time = sp_viewer->end_time();
151 sp_record_info->duration =
152 static_cast<double>((sp_viewer->end_time() - sp_viewer->begin_time())) /
153 1e9;
154
155 std::set<std::string> channel_list = sp_reader->GetChannelList();
156 for (auto it = channel_list.begin(); it != channel_list.end(); ++it) {
157 const std::string& channel_name = *it;
158 CyberRecordChannel channel;
159 channel.channel_name = channel_name;
160 channel.msgnum = sp_reader->GetMessageNumber(channel_name);
161 channel.msg_type = sp_reader->GetMessageType(channel_name);
162 sp_record_info->channels.push_back(channel);
163 }
164 return sp_record_info;
165}
166
167OneRecordChannelCheckResult ChannelVerify::CheckRecordChannels(
168 const std::string& record_path) {
169 OneRecordChannelCheckResult check_result;
170 std::shared_ptr<CyberRecordInfo> sp_record_info = GetRecordInfo(record_path);
171 if (sp_record_info == nullptr) {
172 return check_result;
173 }
174 std::vector<CyberRecordChannel>& channels = sp_record_info->channels;
175 std::vector<std::pair<std::string, double>>& topic_list =
176 sp_conf_->topic_list;
177 check_result.record_path = record_path;
178 check_result.start_time = sp_record_info->start_time;
179 for (size_t i = 0; i < topic_list.size(); ++i) {
180 std::string& channel_in_list = topic_list[i].first;
181 double channel_expected_rate = topic_list[i].second;
182 bool channel_in_list_found = false;
183 size_t j = 0;
184 for (j = 0; j < channels.size(); ++j) {
185 std::string& channel_in_record = channels[j].channel_name;
186 if (channel_in_record == channel_in_list) {
187 channel_in_list_found = true;
188 break;
189 }
190 }
191 if (!channel_in_list_found) { // topic
192 AINFO << record_path << " lacks [" << channel_in_list << "]";
193 check_result.lack_channels.push_back(channel_in_list);
194 } else { // rate
195 double actual_rate =
196 static_cast<double>((channels[j].msgnum)) / sp_record_info->duration;
197 if (actual_rate < 1e-8) {
198 actual_rate = 0.0;
199 AINFO << "msgnum:" << channels[j].msgnum
200 << ",duration:" << sp_record_info->duration;
201 }
202 AINFO << record_path << " [" << channel_in_list
203 << "] expected rate: " << channel_expected_rate
204 << ", actual rate: " << actual_rate;
205 if (actual_rate <
206 channel_expected_rate * sp_conf_->topic_rate_tolerance) {
207 check_result.inadequate_rate[channel_in_list] =
208 std::make_pair(channel_expected_rate, actual_rate);
209 }
210 }
211 }
212 return check_result;
213}
214
215ErrorCode ChannelVerify::GetReturnState() const { return return_state_; }
216
217} // namespace hdmap
218} // namespace apollo
ChannelVerify(std::shared_ptr< JsonConf > sp_conf)
std::shared_ptr< std::vector< OneRecordChannelCheckResult > > get_check_result() const
ErrorCode Check(const std::string &record_dir_or_record_full_path)
#define AINFO
Definition log.h:42
class register implement
Definition arena_queue.h:37