Apollo 10.0
自动驾驶开放平台
plugin_manager.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2019 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
17
18#include <dirent.h>
19
20#include <limits>
21#include <string>
22
23#include "google/protobuf/util/json_util.h"
24
25#include "cyber/common/file.h"
29
30using apollo::common::util::FillHeader;
32using google::protobuf::util::JsonStringToMessage;
33using Json = nlohmann::json;
34using std::string;
35
36// scenario_set: update one scenario set from studio to local
37// scenarios: update all scenarios
38namespace {
39std::map<string, int> data_type_dict = {{
40 "scenario_set", 0,
41 },
42 {
43 "scenarios", 1,
44 },
45 {
46 "dynamic_model", 2,
47 },
48 {
49 "records", 3,
50 },
51 {
52 "vehicles", 4,
53 },
54 {
55 "maps", 5,
56 }};
57} // namespace
58namespace apollo { // namespace apollo
59namespace dreamview { // namespace dreamview
61 : node_(cyber::CreateNode("PluginManager")),
62 enabled_(false),
63 plugin_ws_(plugin_ws) {
64 RegisterDvSupportApis();
65 RegisterPlugins();
66}
67
68void PluginManager::Start(DvCallback callback_api) {
69 callback_api_ = callback_api;
70 enabled_ = true;
71 return;
72}
73
74void PluginManager::Stop() {
75 if (enabled_) {
76 std::string stop_command;
77 for (auto iter = plugins_.begin(); iter != plugins_.end(); iter++) {
78 stop_command = iter->second.stop_command;
79 const int ret = std::system(stop_command.data());
80 if (ret != 0) {
81 AERROR << "Failed to stop plugin! ret: " << ret;
82 }
83 }
84 plugins_.clear();
85 // need kill all plugin process
86 }
87 enabled_ = false;
88}
89
90auto PluginManager::InitPluginReader(const ChannelConf& channel_conf,
91 const string& channel_prefix,
92 const string& plugin_name)
93 -> std::shared_ptr<cyber::Reader<DvPluginMsg>> {
94 if (!channel_conf.has_location()) {
95 AERROR << "Failed to init reader for plugins for missing required file "
96 "location";
97 return nullptr;
98 }
99 const string channel_location = channel_conf.location();
100 // Plugin related channel name should follow the plugin specification
101 if (channel_location.find(channel_prefix) != 0) {
102 AERROR << "Plugin related channel should observe channel name conventions!";
103 return nullptr;
104 }
105 if (plugins_[plugin_name].readers.find(channel_location) !=
106 plugins_[plugin_name].readers.end()) {
107 AERROR << "Plugin has already register this channel: " << channel_location;
108 return nullptr;
109 }
110 cyber::ReaderConfig reader_config;
111 reader_config.channel_name = channel_location;
112 reader_config.pending_queue_size = channel_conf.pending_queue_size();
113 auto reader = node_->CreateReader<DvPluginMsg>(
114 reader_config, [this](const std::shared_ptr<DvPluginMsg>& msg) {
115 if (!ReceiveMsgFromPlugin(*msg)) {
116 AERROR << "Failed to handle received msg from plugin";
117 }
118 return;
119 });
120 return reader;
121};
122
123auto PluginManager::InitPluginWriterAndMsg(const ChannelConf& channel_conf,
124 const string& channel_prefix,
125 const string& plugin_name)
126 -> std::shared_ptr<cyber::Writer<DvPluginMsg>> {
127 if (!channel_conf.has_location()) {
128 AERROR << "Failed to init writer for plugins for missing required file "
129 "location";
130 return nullptr;
131 }
132 const string channel_location = channel_conf.location();
133 // Plugin related channel name should follow the plugin specification
134 if (channel_location.find(channel_prefix) != 0) {
135 AERROR << "Plugin related channel should observe channel name conventions!";
136 return nullptr;
137 }
138 if (plugins_[plugin_name].writers.find(channel_location) !=
139 plugins_[plugin_name].writers.end()) {
140 AERROR << "Plugin has already register this channel!";
141 return nullptr;
142 }
143 auto writer = node_->CreateWriter<DvPluginMsg>(channel_location);
144 if (!writer) {
145 AERROR << "Failed to create writer!";
146 return nullptr;
147 }
148 plugins_[plugin_name].writers[channel_location] = writer;
149 if (channel_conf.support_msg_name_size()) {
150 for (auto& support_msg : channel_conf.support_msg_name()) {
151 if (plugins_[plugin_name].plugin_accept_msg.find(support_msg) !=
152 plugins_[plugin_name].plugin_accept_msg.end()) {
153 AERROR << "One-to-one message and channel, no repetition is allowed";
154 return nullptr;
155 }
156 plugins_[plugin_name].plugin_accept_msg[support_msg] = channel_location;
157 }
158 }
159 return writer;
160};
161
162bool PluginManager::RegisterPlugin(
163 const std::shared_ptr<PluginConfig>& plugin_config) {
164 if (!plugin_config->has_name() || !plugin_config->has_launch_command() ||
165 !plugin_config->process_command_keywords_size() ||
166 !plugin_config->has_stop_command()) {
167 AERROR << "Failed to register plugin for required fields missing!";
168 return false;
169 }
170 string plugin_name = plugin_config->name();
171 string launch_command = plugin_config->launch_command();
172 if (plugins_.find(plugin_name) != plugins_.end()) {
173 AERROR << "This plugin has already registered! Don't install the plugin "
174 "again!";
175 return false;
176 }
177 struct PluginInfo plugin_info;
178 plugins_[plugin_name] = {};
179 plugins_[plugin_name].launch_command = launch_command;
180 plugins_[plugin_name].stop_command = plugin_config->stop_command();
181 string channel_prefix = FLAGS_plugin_channel_prefix + plugin_name + "/";
182 for (auto reader_channel_conf : plugin_config->reader_channel_conf()) {
183 auto plugin_reader =
184 InitPluginReader(reader_channel_conf, channel_prefix, plugin_name);
185 if (plugin_reader == nullptr) {
186 AERROR << "Failed to register plugin reader!";
187 plugins_.erase(plugin_name);
188 return false;
189 }
190 }
191 for (auto writer_channel_conf : plugin_config->writer_channel_conf()) {
192 auto plugin_writer = InitPluginWriterAndMsg(writer_channel_conf,
193 channel_prefix, plugin_name);
194 if (plugin_writer == nullptr) {
195 AERROR << "Failed to register plugin writer!";
196 plugins_.erase(plugin_name);
197 return false;
198 }
199 }
200 const int ret = std::system(("nohup " + launch_command + " &").data());
201 if (ret == 0) {
202 AINFO << "SUCCESS to launch plugin: " << plugin_name;
203 } else {
204 AERROR << "Failed to launch plugin: " << plugin_name << " ret: " << ret;
205 plugins_.erase(plugin_name);
206 return false;
207 }
208 for (auto command_keyword : plugin_config->process_command_keywords()) {
209 plugins_[plugin_name].process_command_keywords.push_back(command_keyword);
210 }
211 return true;
212}
213
214bool PluginManager::RegisterPlugins() {
215 const std::string plugin_path =
216 (cyber::common::GetEnv("HOME")) + FLAGS_plugin_path;
217 DIR* directory = opendir(plugin_path.c_str());
218 if (directory == nullptr) {
219 AERROR << "Cannot open directory " << FLAGS_plugin_path;
220 return false;
221 }
222 struct dirent* entry;
223 bool register_res = true;
224 while ((entry = readdir(directory)) != nullptr && register_res) {
225 // skip directory_path/. and directory_path/..
226 if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, "..")) {
227 continue;
228 }
229 if (entry->d_type != DT_DIR) {
230 // skip not directory
231 continue;
232 }
233 const string plugin_config_file_name =
234 entry->d_name + FLAGS_plugin_config_file_name_suffix;
235 const string plugin_config_file_path =
236 (cyber::common::GetEnv("HOME")) + FLAGS_plugin_path + "/" +
237 entry->d_name + "/" + plugin_config_file_name;
238 if (!cyber::common::PathExists(plugin_config_file_path)) {
239 AERROR << "Cannot find plugin:" << entry->d_name
240 << " plugin config file, jump it!";
241 continue;
242 }
243 auto plugin_config = std::make_shared<PluginConfig>();
244 if (!cyber::common::GetProtoFromFile(plugin_config_file_path,
245 plugin_config.get())) {
246 AWARN << "Unable to read plugin config from file: "
247 << plugin_config_file_path;
248 return false;
249 }
250 register_res = RegisterPlugin(plugin_config);
251 }
252 return register_res;
253}
254
255bool PluginManager::CheckPluginStatus(const string& plugin_name) {
256 if (plugins_.find(plugin_name) == plugins_.end()) {
257 AERROR << "Failed to register this plugin, cann't check!";
258 return false;
259 }
260 // todo: Extract the logic for monitoring plugin status
261 std::vector<string> running_processes;
262 for (const auto& cmd_file : cyber::common::Glob("/proc/*/cmdline")) {
263 // Get process command string.
264 string cmd_string;
265 if (cyber::common::GetContent(cmd_file, &cmd_string) &&
266 !cmd_string.empty()) {
267 // In /proc/<PID>/cmdline, the parts are separated with \0, which will be
268 // converted back to whitespaces here.
269 std::replace(cmd_string.begin(), cmd_string.end(), '\0', ' ');
270 running_processes.push_back(cmd_string);
271 }
272 }
273 bool command_found = false;
274 for (const string& command : running_processes) {
275 bool all_keywords_matched = true;
276 for (const string& keyword :
277 plugins_[plugin_name].process_command_keywords) {
278 all_keywords_matched &= (command.find(keyword) != string::npos);
279 if (!all_keywords_matched) {
280 break;
281 }
282 }
283 command_found |= all_keywords_matched;
284 }
285 if (!command_found) {
286 AERROR << "Failed to pass plugin status check!";
287 return false;
288 }
289 // Process command keywords are all matched. The process is running.
290 return true;
291}
292
293bool PluginManager::SendMsgToPlugin(const string& json_str) {
294 auto plugin_msg = std::make_shared<DvPluginMsg>();
295 if (!JsonStringToMessage(json_str, plugin_msg.get()).ok()) {
296 AERROR << "Failed to parse DvPluginMsg from json!";
297 return false;
298 }
299 // Cancel check requestId for dv_plus does not have this field
300 if (!plugin_msg->has_target() || !plugin_msg->has_name()) {
301 AERROR << "Missing required field for DvPluginMsg.";
302 return false;
303 }
304 const string plugin_name = plugin_msg->target();
305 if (!CheckPluginStatus(plugin_name)) {
306 return false;
307 }
308 const string msg_name = plugin_msg->name();
309
310 if (plugins_[plugin_name].plugin_accept_msg.find(msg_name) ==
311 plugins_[plugin_name].plugin_accept_msg.end()) {
312 AERROR << "Plugin not accept this msg!";
313 return false;
314 }
315 const string channel_location =
316 plugins_[plugin_name].plugin_accept_msg[msg_name];
317 if (plugins_[plugin_name].writers.find(channel_location) ==
318 plugins_[plugin_name].writers.end()) {
319 AERROR << "The plugin does not support communication on this channel";
320 return false;
321 }
322 FillHeader("PluginManager", plugin_msg.get());
323 plugins_[plugin_name].writers[channel_location]->Write(plugin_msg);
324 return true;
325}
326
327void PluginManager::RegisterDvSupportApi(const string& api_name,
328 const DvApi& api) {
329 dv_support_apis_[api_name] = api;
330}
331
332void PluginManager::RegisterDvSupportApis() {
333 RegisterDvSupportApi("UpdateScenarioSetList", &PluginManager::UpdateData);
334 RegisterDvSupportApi("UpdateDynamicModelList", &PluginManager::UpdateData);
335 RegisterDvSupportApi("UpdateRecordToStatus", &PluginManager::UpdateData);
336 RegisterDvSupportApi("ResetVehicleConfigSuccess", &PluginManager::UpdateData);
337 RegisterDvSupportApi("RefreshVehicleConfigSuccess",
338 &PluginManager::UpdateData);
339 RegisterDvSupportApi("UpdateMapToStatus", &PluginManager::UpdateData);
340}
341
342bool PluginManager::ReceiveMsgFromPlugin(const DvPluginMsg& msg) {
343 if (!msg.has_name()) {
344 AERROR << "Invalid message name!";
345 return false;
346 }
347 const string msg_name = msg.name();
348 if (dv_support_apis_.find(msg_name) != dv_support_apis_.end()) {
349 string json_res;
350 bool result = (this->*dv_support_apis_[msg_name])(msg, json_res);
351 if (!result) {
352 AERROR << "Failed to handle msg!";
353 return false;
354 }
355 }
356 // Encapsulated as dreamview response message format
357 Json response = JsonUtil::ProtoToTypedJson("PluginResponse", msg);
358 response["action"] = "response";
359 Json info = Json::parse(msg.info());
360 response["data"]["info"] = info;
361 bool broadcast;
362 if (!JsonUtil::GetBooleanByPath(response, "data.broadcast", &broadcast)) {
363 // default true,broadcast to websocket
364 broadcast = true;
365 }
366 if (broadcast) {
367 plugin_ws_->BroadcastData(response.dump());
368 }
369 return true;
370}
371
372bool PluginManager::UpdateData(const DvPluginMsg& msg, const string& json_str) {
373 if (!msg.has_info()) {
374 AERROR << "Failed to get data type!";
375 return false;
376 }
377 const string info_str = msg.info();
378 Json info({});
379 info = Json::parse(info_str);
380 if (!info.contains("data") || !info["data"].contains("data_type")) {
381 AERROR << "Failed to get data or data type!";
382 return false;
383 }
384 const string data_type = info["data"]["data_type"];
385 if (data_type_dict.find(data_type) == data_type_dict.end()) {
386 AERROR << "Dv don't support this kind of data type!";
387 return false;
388 }
389 const int data_type_index = data_type_dict[data_type];
390 bool update_data_res = false;
391 switch (data_type_index) {
392 case 0: {
393 update_data_res = callback_api_("UpdateScenarioSetToStatus", info);
394 break;
395 }
396 case 2: {
397 // 下载成功-新增文件+register+本地hmistatus
398 // 删除-删除文件+unregister+本地Hmistatus
399 update_data_res = callback_api_("UpdateDynamicModelToStatus", info);
400 break;
401 }
402 case 3: {
403 update_data_res = callback_api_("UpdateRecordToStatus", info);
404 break;
405 }
406 case 4: {
407 update_data_res = callback_api_("UpdateVehicleToStatus", info);
408 break;
409 }
410 case 5: {
411 update_data_res = callback_api_("UpdateMapToStatus", info);
412 break;
413 }
414 default:
415 break;
416 }
417 if (!update_data_res) {
418 AERROR << "Failed to update data!";
419 return false;
420 }
421 return true;
422}
423
424} // namespace dreamview
425} // namespace apollo
void Start(DvCallback callback_api)
std::function< bool(const std::string &function_name, const nlohmann::json &param_json)> DvCallback
PluginManager(WebSocketHandler *plugin_ws)
The WebSocketHandler, built on top of CivetWebSocketHandler, is a websocket handler that handles diff...
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
#define AWARN
Definition log.h:43
Some string util functions.
nlohmann::json Json
std::vector< std::string > Glob(const std::string &pattern)
Expand path pattern to matched paths.
Definition file.cc:212
bool PathExists(const std::string &path)
Check if the path exists.
Definition file.cc:195
bool GetProtoFromFile(const std::string &file_name, google::protobuf::Message *message)
Parses the content of the file specified by the file_name as a representation of protobufs,...
Definition file.cc:132
std::string GetEnv(const std::string &var_name, const std::string &default_value="")
Definition environment.h:29
bool GetContent(const std::string &file_name, std::string *content)
Get file content as string.
Definition file.cc:167
class register implement
Definition arena_queue.h:37