Apollo 10.0
自动驾驶开放平台
channel_verify_agent.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2019 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
17
18#include <chrono>
19#include <map>
20#include <string>
21#include <thread>
22#include <utility>
23#include <vector>
24
25namespace apollo {
26namespace hdmap {
27
28ChannelVerifyAgent::ChannelVerifyAgent(std::shared_ptr<JsonConf> sp_conf) {
29 sp_conf_ = sp_conf;
30 sp_channel_checker_ = nullptr;
31 sp_check_result_ = nullptr;
32}
33
34void ChannelVerifyAgent::Reset() {
35 std::lock_guard<std::mutex> guard(stop_mutex_);
36 need_stop_ = false;
37 stopped_ = false;
38 sp_channel_checker_ = std::make_shared<ChannelVerify>(sp_conf_);
39 sp_check_result_ = nullptr;
41}
42
44 grpc::ServerContext *context, ChannelVerifyRequest *request,
45 ChannelVerifyResponse *response) {
46 AINFO << "ChannelVerifyAgent Request: " << request->DebugString();
47 switch (request->cmd()) {
48 case CmdType::START:
49 AINFO << "ChannelVerifyAgent start";
50 StartCheck(request, response);
51 break;
52 case CmdType::CHECK:
53 AINFO << "ChannelVerifyAgent check";
54 CheckResult(request, response);
55 break;
56 case CmdType::STOP:
57 AINFO << "ChannelVerifyAgent stop";
58 StopCheck(request, response);
59 break;
60 default:
61 response->set_code(ErrorCode::ERROR_REQUEST);
62 AERROR << "command error";
63 }
64 AINFO << "ChannelVerifyAgent progress: " << response->DebugString();
65 return grpc::Status::OK;
66}
67
68void ChannelVerifyAgent::StartCheck(ChannelVerifyRequest *request,
69 ChannelVerifyResponse *response) {
70 if (GetState() == ChannelVerifyAgentState::RUNNING) {
71 AINFO << "ChannelVerify is RUNNING, do not need start again";
72 response->set_code(ErrorCode::ERROR_REPEATED_START);
73 return;
74 }
75 Reset();
76 std::string records_path = request->path();
77 AsyncCheck(records_path);
78 response->set_code(ErrorCode::SUCCESS);
79}
80
81void ChannelVerifyAgent::AsyncCheck(const std::string &records_path) {
83 std::thread doctor_strange([=]() {
84 check_thread_id_ = std::this_thread::get_id();
85 int wait_sec = sp_conf_->channel_check_trigger_gap;
86 while (true) {
87 {
88 std::lock_guard<std::mutex> guard(stop_mutex_);
89 if (need_stop_) {
90 break;
91 }
92 DoCheck(records_path);
93 AINFO << "thread check done";
94 }
95 std::this_thread::sleep_for(std::chrono::seconds(wait_sec));
96 if (std::this_thread::get_id() != check_thread_id_) {
97 break;
98 }
99 }
100 stopped_ = true;
101 });
102 doctor_strange.detach();
103 AINFO << "ChannelVerifyAgent::async_check exit";
104}
105
106void ChannelVerifyAgent::DoCheck(const std::string &records_path) {
107 if (sp_channel_checker_ == nullptr) {
108 sp_channel_checker_ = std::make_shared<ChannelVerify>(sp_conf_);
109 }
111 sp_channel_checker_->Check(records_path);
112 sp_check_result_ = sp_channel_checker_->get_check_result();
113}
114
115int ChannelVerifyAgent::AddTopicLack(
116 VerifyResult *result, const std::string &record_path,
117 std::vector<std::string> const &lack_channels) {
118 TopicResult *topics = result->mutable_topics();
119 for (const std::string &channel : lack_channels) {
120 topics->add_topic_lack(channel);
121 AINFO << record_path << " lack topic: " << channel;
122 }
123 return static_cast<int>(lack_channels.size());
124}
125
126FrameRate *ChannelVerifyAgent::FindRates(VerifyResult *result,
127 const std::string &channel) {
128 for (FrameRate &rate : *result->mutable_rates()) {
129 if (rate.topic() == channel) {
130 return &rate;
131 }
132 }
133 return nullptr;
134}
135
136int ChannelVerifyAgent::AddInadequateRate(
137 VerifyResult *result, std::string const &record_path,
138 std::map<std::string, std::pair<double, double>> const &inadequate_rate) {
139 for (auto it = inadequate_rate.begin(); it != inadequate_rate.end(); ++it) {
140 const std::string &channel = it->first;
141 double expected_rate = it->second.first;
142 double current_rate = it->second.second;
143 FrameRate *rate = FindRates(result, channel);
144 if (rate == nullptr) {
145 rate = result->add_rates();
146 rate->add_bad_record_name(record_path);
147 rate->set_topic(channel);
148 rate->set_expected_rate(expected_rate);
149 rate->set_current_rate(current_rate);
150 } else {
151 rate->set_current_rate(current_rate);
152 rate->add_bad_record_name(record_path);
153 }
154 }
155 return result->rates_size();
156}
157
158void ChannelVerifyAgent::CheckResult(ChannelVerifyRequest *request,
159 ChannelVerifyResponse *response) {
160 if (GetState() == ChannelVerifyAgentState::IDLE) {
161 AINFO << "ChannelVerify is not RUNNING, it should start first";
162 response->set_code(ErrorCode::ERROR_CHECK_BEFORE_START);
163 return;
164 }
165
166 if (sp_channel_checker_ == nullptr || sp_check_result_ == nullptr) {
167 AINFO << "check result is null. check later";
168 response->set_code(ErrorCode::SUCCESS);
169 return;
170 }
171
172 if (sp_channel_checker_->GetReturnState() != ErrorCode::SUCCESS) {
173 response->set_code(sp_channel_checker_->GetReturnState());
174 return;
175 }
176
177 response->set_code(ErrorCode::SUCCESS);
178 VerifyResult *result = response->mutable_result();
179 for (CheckResultIterator it = sp_check_result_->begin();
180 it != sp_check_result_->end(); ++it) {
181 int res = 0;
182 // write rate
183 res = AddInadequateRate(result, it->record_path, it->inadequate_rate);
184 if (res > 0) {
186 }
187 // write topic lack
188 res = AddTopicLack(result, it->record_path, it->lack_channels);
189 if (res > 0) {
191 }
192 }
193}
194
195void ChannelVerifyAgent::StopCheck(ChannelVerifyRequest *request,
196 ChannelVerifyResponse *response) {
197 std::lock_guard<std::mutex> guard(stop_mutex_);
198 need_stop_ = true;
199 response->set_code(ErrorCode::SUCCESS);
201 VerifyResult *result = response->mutable_result();
202 if (sp_check_result_ == nullptr) {
203 return;
204 }
205 for (CheckResultIterator it = sp_check_result_->begin();
206 it != sp_check_result_->end(); ++it) {
207 int res = 0;
208 // write rate
209 res = AddInadequateRate(result, it->record_path, it->inadequate_rate);
210 if (res > 0) {
212 }
213 // write topic lack
214 res = AddTopicLack(result, it->record_path, it->lack_channels);
215 if (res > 0) {
217 }
218 }
219}
220
221void ChannelVerifyAgent::SetState(ChannelVerifyAgentState state) {
222 state_ = state;
223}
224
225ChannelVerifyAgentState ChannelVerifyAgent::GetState() const { return state_; }
226
227} // namespace hdmap
228} // namespace apollo
ChannelVerifyAgent(std::shared_ptr< JsonConf > sp_conf)
grpc::Status ProcessGrpcRequest(grpc::ServerContext *context, ChannelVerifyRequest *request, ChannelVerifyResponse *response)
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
std::vector< OneRecordChannelCheckResult >::iterator CheckResultIterator
class register implement
Definition arena_queue.h:37