23#include "google/protobuf/util/json_util.h"
30using apollo::common::util::FillHeader;
32using google::protobuf::util::JsonStringToMessage;
33using Json = nlohmann::json;
39std::map<string, int> data_type_dict = {{
61 : node_(cyber::CreateNode(
"PluginManager")),
63 plugin_ws_(plugin_ws) {
64 RegisterDvSupportApis();
69 callback_api_ = callback_api;
74void PluginManager::Stop() {
76 std::string stop_command;
77 for (
auto iter = plugins_.begin(); iter != plugins_.end(); iter++) {
78 stop_command = iter->second.stop_command;
79 const int ret = std::system(stop_command.data());
81 AERROR <<
"Failed to stop plugin! ret: " << ret;
90auto PluginManager::InitPluginReader(
const ChannelConf& channel_conf,
91 const string& channel_prefix,
92 const string& plugin_name)
93 -> std::shared_ptr<cyber::Reader<DvPluginMsg>> {
94 if (!channel_conf.has_location()) {
95 AERROR <<
"Failed to init reader for plugins for missing required file "
99 const string channel_location = channel_conf.location();
101 if (channel_location.find(channel_prefix) != 0) {
102 AERROR <<
"Plugin related channel should observe channel name conventions!";
105 if (plugins_[plugin_name].readers.find(channel_location) !=
106 plugins_[plugin_name].readers.end()) {
107 AERROR <<
"Plugin has already register this channel: " << channel_location;
110 cyber::ReaderConfig reader_config;
111 reader_config.channel_name = channel_location;
112 reader_config.pending_queue_size = channel_conf.pending_queue_size();
113 auto reader = node_->CreateReader<DvPluginMsg>(
114 reader_config, [
this](
const std::shared_ptr<DvPluginMsg>& msg) {
115 if (!ReceiveMsgFromPlugin(*msg)) {
116 AERROR <<
"Failed to handle received msg from plugin";
123auto PluginManager::InitPluginWriterAndMsg(
const ChannelConf& channel_conf,
124 const string& channel_prefix,
125 const string& plugin_name)
126 -> std::shared_ptr<cyber::Writer<DvPluginMsg>> {
127 if (!channel_conf.has_location()) {
128 AERROR <<
"Failed to init writer for plugins for missing required file "
132 const string channel_location = channel_conf.location();
134 if (channel_location.find(channel_prefix) != 0) {
135 AERROR <<
"Plugin related channel should observe channel name conventions!";
138 if (plugins_[plugin_name].writers.find(channel_location) !=
139 plugins_[plugin_name].writers.end()) {
140 AERROR <<
"Plugin has already register this channel!";
143 auto writer = node_->CreateWriter<DvPluginMsg>(channel_location);
145 AERROR <<
"Failed to create writer!";
148 plugins_[plugin_name].writers[channel_location] = writer;
149 if (channel_conf.support_msg_name_size()) {
150 for (
auto& support_msg : channel_conf.support_msg_name()) {
151 if (plugins_[plugin_name].plugin_accept_msg.find(support_msg) !=
152 plugins_[plugin_name].plugin_accept_msg.end()) {
153 AERROR <<
"One-to-one message and channel, no repetition is allowed";
156 plugins_[plugin_name].plugin_accept_msg[support_msg] = channel_location;
162bool PluginManager::RegisterPlugin(
163 const std::shared_ptr<PluginConfig>& plugin_config) {
164 if (!plugin_config->has_name() || !plugin_config->has_launch_command() ||
165 !plugin_config->process_command_keywords_size() ||
166 !plugin_config->has_stop_command()) {
167 AERROR <<
"Failed to register plugin for required fields missing!";
170 string plugin_name = plugin_config->name();
171 string launch_command = plugin_config->launch_command();
172 if (plugins_.find(plugin_name) != plugins_.end()) {
173 AERROR <<
"This plugin has already registered! Don't install the plugin "
177 struct PluginInfo plugin_info;
178 plugins_[plugin_name] = {};
179 plugins_[plugin_name].launch_command = launch_command;
180 plugins_[plugin_name].stop_command = plugin_config->stop_command();
181 string channel_prefix = FLAGS_plugin_channel_prefix + plugin_name +
"/";
182 for (
auto reader_channel_conf : plugin_config->reader_channel_conf()) {
184 InitPluginReader(reader_channel_conf, channel_prefix, plugin_name);
185 if (plugin_reader ==
nullptr) {
186 AERROR <<
"Failed to register plugin reader!";
187 plugins_.erase(plugin_name);
191 for (
auto writer_channel_conf : plugin_config->writer_channel_conf()) {
192 auto plugin_writer = InitPluginWriterAndMsg(writer_channel_conf,
193 channel_prefix, plugin_name);
194 if (plugin_writer ==
nullptr) {
195 AERROR <<
"Failed to register plugin writer!";
196 plugins_.erase(plugin_name);
200 const int ret = std::system((
"nohup " + launch_command +
" &").data());
202 AINFO <<
"SUCCESS to launch plugin: " << plugin_name;
204 AERROR <<
"Failed to launch plugin: " << plugin_name <<
" ret: " << ret;
205 plugins_.erase(plugin_name);
208 for (
auto command_keyword : plugin_config->process_command_keywords()) {
209 plugins_[plugin_name].process_command_keywords.push_back(command_keyword);
214bool PluginManager::RegisterPlugins() {
215 const std::string plugin_path =
217 DIR* directory = opendir(plugin_path.c_str());
218 if (directory ==
nullptr) {
219 AERROR <<
"Cannot open directory " << FLAGS_plugin_path;
222 struct dirent* entry;
223 bool register_res =
true;
224 while ((entry = readdir(directory)) !=
nullptr && register_res) {
226 if (!strcmp(entry->d_name,
".") || !strcmp(entry->d_name,
"..")) {
229 if (entry->d_type != DT_DIR) {
233 const string plugin_config_file_name =
234 entry->d_name + FLAGS_plugin_config_file_name_suffix;
235 const string plugin_config_file_path =
237 entry->d_name +
"/" + plugin_config_file_name;
239 AERROR <<
"Cannot find plugin:" << entry->d_name
240 <<
" plugin config file, jump it!";
243 auto plugin_config = std::make_shared<PluginConfig>();
245 plugin_config.get())) {
246 AWARN <<
"Unable to read plugin config from file: "
247 << plugin_config_file_path;
250 register_res = RegisterPlugin(plugin_config);
255bool PluginManager::CheckPluginStatus(
const string& plugin_name) {
256 if (plugins_.find(plugin_name) == plugins_.end()) {
257 AERROR <<
"Failed to register this plugin, cann't check!";
261 std::vector<string> running_processes;
262 for (
const auto& cmd_file : cyber::common::
Glob(
"/proc/*/cmdline")) {
266 !cmd_string.empty()) {
269 std::replace(cmd_string.begin(), cmd_string.end(),
'\0',
' ');
270 running_processes.push_back(cmd_string);
273 bool command_found =
false;
274 for (
const string& command : running_processes) {
275 bool all_keywords_matched =
true;
276 for (
const string& keyword :
277 plugins_[plugin_name].process_command_keywords) {
278 all_keywords_matched &= (command.find(keyword) != string::npos);
279 if (!all_keywords_matched) {
283 command_found |= all_keywords_matched;
285 if (!command_found) {
286 AERROR <<
"Failed to pass plugin status check!";
293bool PluginManager::SendMsgToPlugin(
const string& json_str) {
294 auto plugin_msg = std::make_shared<DvPluginMsg>();
295 if (!JsonStringToMessage(json_str, plugin_msg.get()).ok()) {
296 AERROR <<
"Failed to parse DvPluginMsg from json!";
300 if (!plugin_msg->has_target() || !plugin_msg->has_name()) {
301 AERROR <<
"Missing required field for DvPluginMsg.";
304 const string plugin_name = plugin_msg->target();
305 if (!CheckPluginStatus(plugin_name)) {
308 const string msg_name = plugin_msg->name();
310 if (plugins_[plugin_name].plugin_accept_msg.find(msg_name) ==
311 plugins_[plugin_name].plugin_accept_msg.end()) {
312 AERROR <<
"Plugin not accept this msg!";
315 const string channel_location =
316 plugins_[plugin_name].plugin_accept_msg[msg_name];
317 if (plugins_[plugin_name].writers.find(channel_location) ==
318 plugins_[plugin_name].writers.end()) {
319 AERROR <<
"The plugin does not support communication on this channel";
322 FillHeader(
"PluginManager", plugin_msg.get());
323 plugins_[plugin_name].writers[channel_location]->Write(plugin_msg);
327void PluginManager::RegisterDvSupportApi(
const string& api_name,
329 dv_support_apis_[api_name] = api;
332void PluginManager::RegisterDvSupportApis() {
333 RegisterDvSupportApi(
"UpdateScenarioSetList", &PluginManager::UpdateData);
334 RegisterDvSupportApi(
"UpdateDynamicModelList", &PluginManager::UpdateData);
335 RegisterDvSupportApi(
"UpdateRecordToStatus", &PluginManager::UpdateData);
336 RegisterDvSupportApi(
"ResetVehicleConfigSuccess", &PluginManager::UpdateData);
337 RegisterDvSupportApi(
"RefreshVehicleConfigSuccess",
338 &PluginManager::UpdateData);
339 RegisterDvSupportApi(
"UpdateMapToStatus", &PluginManager::UpdateData);
342bool PluginManager::ReceiveMsgFromPlugin(
const DvPluginMsg& msg) {
343 if (!msg.has_name()) {
344 AERROR <<
"Invalid message name!";
347 const string msg_name = msg.name();
348 if (dv_support_apis_.find(msg_name) != dv_support_apis_.end()) {
350 bool result = (this->*dv_support_apis_[msg_name])(msg, json_res);
352 AERROR <<
"Failed to handle msg!";
357 Json response = JsonUtil::ProtoToTypedJson(
"PluginResponse", msg);
358 response[
"action"] =
"response";
359 Json info = Json::parse(msg.info());
360 response[
"data"][
"info"] = info;
362 if (!JsonUtil::GetBooleanByPath(response,
"data.broadcast", &broadcast)) {
367 plugin_ws_->BroadcastData(response.dump());
372bool PluginManager::UpdateData(
const DvPluginMsg& msg,
const string& json_str) {
373 if (!msg.has_info()) {
374 AERROR <<
"Failed to get data type!";
377 const string info_str = msg.info();
379 info = Json::parse(info_str);
380 if (!info.contains(
"data") || !info[
"data"].contains(
"data_type")) {
381 AERROR <<
"Failed to get data or data type!";
384 const string data_type = info[
"data"][
"data_type"];
385 if (data_type_dict.find(
data_type) == data_type_dict.end()) {
386 AERROR <<
"Dv don't support this kind of data type!";
389 const int data_type_index = data_type_dict[
data_type];
390 bool update_data_res =
false;
391 switch (data_type_index) {
393 update_data_res = callback_api_(
"UpdateScenarioSetToStatus", info);
399 update_data_res = callback_api_(
"UpdateDynamicModelToStatus", info);
403 update_data_res = callback_api_(
"UpdateRecordToStatus", info);
407 update_data_res = callback_api_(
"UpdateVehicleToStatus", info);
411 update_data_res = callback_api_(
"UpdateMapToStatus", info);
417 if (!update_data_res) {
418 AERROR <<
"Failed to update data!";
void Start(DvCallback callback_api)
std::function< bool(const std::string &function_name, const nlohmann::json ¶m_json)> DvCallback
PluginManager(WebSocketHandler *plugin_ws)
The WebSocketHandler, built on top of CivetWebSocketHandler, is a websocket handler that handles diff...
Some string util functions.
std::vector< std::string > Glob(const std::string &pattern)
Expand path pattern to matched paths.
bool PathExists(const std::string &path)
Check if the path exists.
bool GetProtoFromFile(const std::string &file_name, google::protobuf::Message *message)
Parses the content of the file specified by the file_name as a representation of protobufs,...
std::string GetEnv(const std::string &var_name, const std::string &default_value="")
bool GetContent(const std::string &file_name, std::string *content)
Get file content as string.