30 sp_channel_checker_ =
nullptr;
31 sp_check_result_ =
nullptr;
34void ChannelVerifyAgent::Reset() {
35 std::lock_guard<std::mutex> guard(stop_mutex_);
38 sp_channel_checker_ = std::make_shared<ChannelVerify>(sp_conf_);
39 sp_check_result_ =
nullptr;
46 AINFO <<
"ChannelVerifyAgent Request: " << request->DebugString();
47 switch (request->
cmd()) {
49 AINFO <<
"ChannelVerifyAgent start";
50 StartCheck(request, response);
53 AINFO <<
"ChannelVerifyAgent check";
54 CheckResult(request, response);
57 AINFO <<
"ChannelVerifyAgent stop";
58 StopCheck(request, response);
64 AINFO <<
"ChannelVerifyAgent progress: " << response->DebugString();
65 return grpc::Status::OK;
71 AINFO <<
"ChannelVerify is RUNNING, do not need start again";
76 std::string records_path = request->
path();
77 AsyncCheck(records_path);
81void ChannelVerifyAgent::AsyncCheck(
const std::string &records_path) {
83 std::thread doctor_strange([=]() {
84 check_thread_id_ = std::this_thread::get_id();
85 int wait_sec = sp_conf_->channel_check_trigger_gap;
88 std::lock_guard<std::mutex> guard(stop_mutex_);
92 DoCheck(records_path);
93 AINFO <<
"thread check done";
95 std::this_thread::sleep_for(std::chrono::seconds(wait_sec));
96 if (std::this_thread::get_id() != check_thread_id_) {
102 doctor_strange.detach();
103 AINFO <<
"ChannelVerifyAgent::async_check exit";
106void ChannelVerifyAgent::DoCheck(
const std::string &records_path) {
107 if (sp_channel_checker_ ==
nullptr) {
108 sp_channel_checker_ = std::make_shared<ChannelVerify>(sp_conf_);
111 sp_channel_checker_->Check(records_path);
112 sp_check_result_ = sp_channel_checker_->get_check_result();
115int ChannelVerifyAgent::AddTopicLack(
116 VerifyResult *result,
const std::string &record_path,
117 std::vector<std::string>
const &lack_channels) {
118 TopicResult *topics = result->mutable_topics();
119 for (
const std::string &channel : lack_channels) {
120 topics->add_topic_lack(channel);
121 AINFO << record_path <<
" lack topic: " << channel;
123 return static_cast<int>(lack_channels.size());
126FrameRate *ChannelVerifyAgent::FindRates(VerifyResult *result,
127 const std::string &channel) {
128 for (FrameRate &rate : *result->mutable_rates()) {
129 if (rate.topic() == channel) {
136int ChannelVerifyAgent::AddInadequateRate(
137 VerifyResult *result, std::string
const &record_path,
138 std::map<std::string, std::pair<double, double>>
const &inadequate_rate) {
139 for (
auto it = inadequate_rate.begin(); it != inadequate_rate.end(); ++it) {
140 const std::string &channel = it->first;
141 double expected_rate = it->second.first;
142 double current_rate = it->second.second;
143 FrameRate *rate = FindRates(result, channel);
144 if (rate ==
nullptr) {
145 rate = result->add_rates();
146 rate->add_bad_record_name(record_path);
147 rate->set_topic(channel);
148 rate->set_expected_rate(expected_rate);
149 rate->set_current_rate(current_rate);
151 rate->set_current_rate(current_rate);
152 rate->add_bad_record_name(record_path);
155 return result->rates_size();
158void ChannelVerifyAgent::CheckResult(ChannelVerifyRequest *request,
159 ChannelVerifyResponse *response) {
161 AINFO <<
"ChannelVerify is not RUNNING, it should start first";
166 if (sp_channel_checker_ ==
nullptr || sp_check_result_ ==
nullptr) {
167 AINFO <<
"check result is null. check later";
173 response->set_code(sp_channel_checker_->GetReturnState());
178 VerifyResult *result = response->mutable_result();
180 it != sp_check_result_->end(); ++it) {
183 res = AddInadequateRate(result, it->record_path, it->inadequate_rate);
188 res = AddTopicLack(result, it->record_path, it->lack_channels);
195void ChannelVerifyAgent::StopCheck(ChannelVerifyRequest *request,
196 ChannelVerifyResponse *response) {
197 std::lock_guard<std::mutex> guard(stop_mutex_);
201 VerifyResult *result = response->mutable_result();
202 if (sp_check_result_ ==
nullptr) {
206 it != sp_check_result_->end(); ++it) {
209 res = AddInadequateRate(result, it->record_path, it->inadequate_rate);
214 res = AddTopicLack(result, it->record_path, it->lack_channels);
ChannelVerifyAgent(std::shared_ptr< JsonConf > sp_conf)
grpc::Status ProcessGrpcRequest(grpc::ServerContext *context, ChannelVerifyRequest *request, ChannelVerifyResponse *response)
@ ERROR_CHANNEL_VERIFY_RATES_ABNORMAL
@ ERROR_CHANNEL_VERIFY_TOPIC_LACK
@ ERROR_CHECK_BEFORE_START
std::vector< OneRecordChannelCheckResult >::iterator CheckResultIterator