Apollo 10.0
自动驾驶开放平台
socket_manager.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2017 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <vector>
20
21#include "cyber/common/file.h"
22#include "cyber/common/log.h"
28namespace apollo {
29namespace dreamview {
30
32using Json = nlohmann::json;
34
36 UpdaterManager *updater_manager,
37 DvPluginManager *dv_plugin_manager)
38 : enabled_(false),
39 websocket_(websocket),
40 updater_manager_(updater_manager),
41 dv_plugin_manager_(dv_plugin_manager) {
42 RegisterDataHandlers();
43 RegisterMessageHandlers();
44 auto channel_manager =
45 apollo::cyber::service_discovery::TopologyManager::Instance()
46 ->channel_manager();
47 auto topology_callback =
48 [this](const apollo::cyber::proto::ChangeMsg &change_msg) {
49 this->RefreshChannels(change_msg);
50 };
51 channel_manager->AddChangeListener(topology_callback);
52}
53
54void SocketManager::RegisterDataHandlers() {
55 enabled_ = GetProtoFromFile(FLAGS_data_handler_config_path,
56 &data_handler_base_conf_);
57 if (!enabled_) {
58 AERROR << "Unable to parse data handler configuration from file "
59 << FLAGS_data_handler_config_path;
60 }
61 for (auto &data_handler_iter :
62 dv_plugin_manager_->data_handler_conf_.data_handler_info()) {
63 if (data_handler_base_conf_.data_handler_info().find(
64 data_handler_iter.first) !=
65 data_handler_base_conf_.data_handler_info().end()) {
66 AERROR << "There are duplicate updater handlers between dv and plugins";
67 continue;
68 }
69 auto data_handler_info =
70 data_handler_base_conf_.mutable_data_handler_info();
71 (*data_handler_info)[data_handler_iter.first] = data_handler_iter.second;
72 }
73 AINFO << "data_handler_base_conf_: " << data_handler_base_conf_.DebugString();
74
75 // 遍历 然后register
76 // for (const auto& data_handler_iter :
77 // data_handler_base_conf_.data_handler_info()) {
78 // const std::string& data_name = data_handler_iter.first;
79 // const DataHandlerInfo& data_handler_info = data_handler_iter.second;
80 // RegisterDataHandler(data_handler_info);
81 // }
82}
83
84void SocketManager::RegisterMessageHandlers() {
87
88 websocket_->RegisterMessageHandler(
89 "subscribe",
90 [this](const Json &json, WebSocketHandler::Connection *conn) {
91 Json response;
92 response["action"] = "response";
93 response["data"]["info"] = {};
94 response["data"]["info"]["code"] = this->Subscribe(json) ? 0 : -1;
95 websocket_->SendData(conn, response.dump());
96 });
97
98 websocket_->RegisterMessageHandler(
99 "unsubscribe",
100 [this](const Json &json, WebSocketHandler::Connection *conn) {
101 Json response;
102 response["action"] = "response";
103 response["data"]["info"] = {};
104 response["data"]["info"]["code"] = this->UnSubscribe(json) ? 0 : -1;
105 websocket_->SendData(conn, response.dump());
106 });
107}
108
109bool SocketManager::GetDataUpdaterChannels(const std::string &updater_path,
110 std::vector<std::string> *channels) {
111 UpdaterBase *updater = updater_manager_->GetUpdater(updater_path);
112 UpdaterWithChannelsBase *updater_with_channels =
113 dynamic_cast<UpdaterWithChannelsBase *>(updater);
114 if (updater_with_channels == nullptr) {
115 return false;
116 }
117 updater_with_channels->GetChannelMsg(channels);
118 return true;
119}
120
121bool SocketManager::ModifyUpdaterChannels(const std::string &updater_path,
122 const std::string &channel_name,
123 const std::string &operation) {
124 UpdaterBase *updater = updater_manager_->GetUpdater(updater_path);
125 UpdaterWithChannelsBase *updater_with_channels =
126 dynamic_cast<UpdaterWithChannelsBase *>(updater);
127 if (updater_with_channels == nullptr) {
128 return false;
129 }
130 auto iter = std::find(updater_with_channels->channels_.begin(),
131 updater_with_channels->channels_.end(), channel_name);
132 if (operation == "join" && iter == updater_with_channels->channels_.end()) {
133 updater_with_channels->channels_.emplace_back(channel_name);
134 } else if (operation == "leave" &&
135 iter != updater_with_channels->channels_.end()) {
136 updater_with_channels->channels_.erase(iter);
137 }
138 return true;
139}
140
141bool SocketManager::Subscribe(const Json &json) {
142 const std::string url = json["data"]["info"]["websocketName"];
143 double time_interval_ms = 0;
144 JsonUtil::GetNumberByPath(json, "data.info.dataFrequencyMs",
145 &time_interval_ms);
146 std::string channel_name;
147 if (!JsonUtil::GetStringByPath(json, "data.info.channelName",
148 &channel_name)) {
149 channel_name = "";
150 }
151 Json subscribe_param = {};
152 std::vector<std::string> json_path = {"data", "info", "param"};
153 JsonUtil::GetJsonByPath(json, json_path, &subscribe_param);
154 return updater_manager_->Start(url, time_interval_ms, channel_name,
155 &subscribe_param);
156}
157
158bool SocketManager::UnSubscribe(const Json &json) {
159 const std::string url = json["data"]["info"]["websocketName"];
160 std::string channel_name;
161 if (!JsonUtil::GetStringByPath(json, "data.info.channelName",
162 &channel_name)) {
163 channel_name = "";
164 }
165 return updater_manager_->Stop(url, channel_name);
166}
167
168Json SocketManager::ClearDataHandlerChannelMsgs() {
169 Json response_data({});
170 Json data({});
171
172 if (enabled_) {
173 for (auto iter = data_handler_conf_.mutable_data_handler_info()->begin();
174 iter != data_handler_conf_.mutable_data_handler_info()->end();
175 iter++) {
176 if (iter->second.different_for_channels()) {
177 const std::string &data_type = (*iter).first;
178 (*data_handler_conf_.mutable_data_handler_info())[data_type]
179 .clear_channels();
180 }
181 }
182 complete_channel_count_.clear();
183 data_handler_channel_count_.clear();
184 }
185 data = JsonUtil::ProtoToTypedJson("data", data_handler_conf_);
186 data.erase("type");
187 response_data["info"] = data;
188 response_data["info"]["code"] = 0;
189 return response_data;
190}
191
192Json SocketManager::GetDataHandlerInfo() {
193 Json response_data({});
194 Json data({});
195 data_handler_conf_.Clear();
196 data_handler_channel_count_.clear();
197 complete_channel_count_.clear();
198
199 if (enabled_) {
200 for (auto iter =
201 data_handler_base_conf_.mutable_data_handler_info()->begin();
202 iter != data_handler_base_conf_.mutable_data_handler_info()->end();
203 iter++) {
204 const std::string &data_type = (*iter).first;
205 DataHandlerInfo updater_info;
206 updater_info.CopyFrom(iter->second);
207 if (data_type == FLAGS_cyber_channels_key) {
208 // For cyber channels
209 auto topology_manager =
210 apollo::cyber::service_discovery::TopologyManager::Instance();
211 auto channel_manager = topology_manager->channel_manager();
212
213 std::vector<apollo::cyber::proto::RoleAttributes> role_attributes;
214 channel_manager->GetWriters(&role_attributes);
215
216 for (const auto &role : role_attributes) {
217 if (!complete_channel_count_[role.channel_name()]) {
218 std::string proto_path;
219 apollo::cyber::message::ProtobufFactory::Instance()->GetProtoPath(
220 role.message_type(), proto_path);
221 if (proto_path.empty()) {
222 AWARN << "Cannot find proto location for message type "
223 << role.message_type();
224 continue;
225 }
226 auto channel_item = updater_info.add_channels();
227 channel_item->set_proto_path(proto_path);
228 channel_item->set_channel_name(role.channel_name());
229 channel_item->set_msg_type(role.message_type());
230 }
231 complete_channel_count_[role.channel_name()]++;
232 }
233 } else {
234 // For other channels
235 const std::string &updater_path = (*iter).second.data_name();
236 std::string proto_path;
237 apollo::cyber::message::ProtobufFactory::Instance()->GetProtoPath(
238 data_type, proto_path);
239 if (proto_path.empty()) {
240 AWARN << "Cannot find proto location for message type " << data_type;
241 continue;
242 }
243 updater_info.set_proto_path(proto_path);
244 if (iter->second.different_for_channels()) {
245 std::vector<std::string> channels;
246 if (!GetDataUpdaterChannels(updater_path, &channels)) {
247 continue;
248 }
249 for (auto iter = channels.begin(); iter != channels.end(); iter++) {
250 if (!data_handler_channel_count_[*iter]) {
251 auto channel_item = updater_info.add_channels();
252 channel_item->set_channel_name(*iter);
253 }
254 data_handler_channel_count_[*iter]++;
255 }
256 }
257 }
258 (*data_handler_conf_.mutable_data_handler_info())[data_type] =
259 updater_info;
260 }
261 }
262 data = JsonUtil::ProtoToTypedJson("data", data_handler_conf_);
263 data.erase("type");
264 response_data["info"] = data;
265 response_data["info"]["code"] = 0;
266 return response_data;
267}
268
269void SocketManager::BrocastDataHandlerConf(bool clear_channel_msg) {
270 Json response({});
271 response["data"] =
272 clear_channel_msg ? ClearDataHandlerChannelMsgs() : GetDataHandlerInfo();
273 response["action"] = "metadata";
274 websocket_->BroadcastData(response.dump());
275}
276
277bool SocketManager::UpdateUpdaterInfo(
278 const apollo::cyber::proto::ChangeMsg &change_msg,
279 DataHandlerInfo &updater_info) {
280 auto role_attr = change_msg.role_attr();
281 auto channel_item = updater_info.add_channels();
282 channel_item->set_channel_name(role_attr.channel_name());
284 change_msg.operate_type()) {
285 channel_item->set_msg_type(role_attr.message_type());
286 std::string proto_path;
287 apollo::cyber::message::ProtobufFactory::Instance()->GetProtoPath(
288 role_attr.message_type(), proto_path);
289 if (proto_path.empty()) {
290 AWARN << "Cannot find proto location for message type "
291 << role_attr.message_type();
292 return false;
293 }
294 channel_item->set_proto_path(proto_path);
295 }
296 return true;
297}
298
299bool SocketManager::UpdateCyberChannels(
300 const apollo::cyber::proto::ChangeMsg &change_msg,
301 DataHandlerInfo &updater_info) {
302 std::string channel_name = change_msg.role_attr().channel_name();
303
305 change_msg.operate_type()) {
306 // Used to extract the channel of the complete set.
307 if (!complete_channel_count_[channel_name]) {
308 if (!UpdateUpdaterInfo(change_msg, updater_info)) return false;
309 complete_channel_count_[channel_name]++;
310 return true;
311 }
312 complete_channel_count_[channel_name]++;
313 } else {
314 complete_channel_count_[channel_name]--;
315 if (complete_channel_count_[channel_name] == 0) {
316 if (!UpdateUpdaterInfo(change_msg, updater_info)) return false;
317 return true;
318 }
319 }
320 return false;
321}
322
323void SocketManager::RefreshDataHandlerChannels(
324 const apollo::cyber::proto::ChangeMsg &change_msg,
325 DataHandlerConf &data_handler_conf_diff, bool &flag) {
326 auto role_attr = change_msg.role_attr();
327 std::string message_type = role_attr.message_type();
328 std::string channel_name = role_attr.channel_name();
329
330 // Used to filter the channel of the current data packet and the channel in
331 // non-broadcast packet mode.
332 std::vector<std::string> records;
333 std::vector<std::string> other_record_node_name;
334 auto *record_player_factory = RecordPlayerFactory::Instance();
335 record_player_factory->GetAllRecords(&records);
336 const std::string current_record = record_player_factory->GetCurrentRecord();
337 for (auto iter = records.begin(); iter != records.end(); iter++) {
338 if (current_record.empty() || *iter != current_record) {
339 std::string other_node_name = "record_player_factory_" + *iter;
340 other_record_node_name.push_back(other_node_name);
341 }
342 }
343
344 std::string node_name = role_attr.node_name();
345 for (auto iter = data_handler_base_conf_.mutable_data_handler_info()->begin();
346 iter != data_handler_base_conf_.mutable_data_handler_info()->end();
347 iter++) {
348 const std::string &data_type = (*iter).first;
349 DataHandlerInfo updater_info;
350 updater_info.set_data_name(iter->second.data_name());
351 updater_info.set_different_for_channels(
352 iter->second.different_for_channels());
353
354 if (data_type == FLAGS_cyber_channels_key) {
355 // For cyber channels
356 flag = UpdateCyberChannels(change_msg, updater_info);
357 } else {
358 // For other channels
359 if (!iter->second.different_for_channels()) {
360 (*data_handler_conf_diff.mutable_data_handler_info())[data_type] =
361 updater_info;
362 continue;
363 }
364 bool is_channel_in_updater = updater_manager_->IsChannelInUpdater(
365 (*iter).second.data_name(), message_type, channel_name);
366 if (is_channel_in_updater &&
367 (current_record.empty() ||
368 std::find(other_record_node_name.begin(),
369 other_record_node_name.end(),
370 node_name) == other_record_node_name.end())) {
372 change_msg.operate_type()) {
373 if (!data_handler_channel_count_[channel_name]) {
374 flag = true;
375 auto channel_item = updater_info.add_channels();
376 channel_item->set_channel_name(channel_name);
377 if (!ModifyUpdaterChannels(iter->second.data_name(), channel_name,
378 "join"))
379 continue;
380 }
381 data_handler_channel_count_[channel_name]++;
382 } else {
383 data_handler_channel_count_[channel_name]--;
384 if (data_handler_channel_count_[channel_name] == 0) {
385 flag = true;
386 auto channel_item = updater_info.add_channels();
387 channel_item->set_channel_name(channel_name);
388 if (!ModifyUpdaterChannels(iter->second.data_name(), channel_name,
389 "leave"))
390 continue;
391 }
392 }
393 }
394 }
395 (*data_handler_conf_diff.mutable_data_handler_info())[data_type] =
396 updater_info;
397 }
398}
399
400void SocketManager::RefreshChannels(
401 const apollo::cyber::proto::ChangeMsg &change_msg) {
402 // Just look at the writer's
404 return;
405 }
406
407 DataHandlerConf data_handler_conf_diff;
408 bool flag = false;
409
410 RefreshDataHandlerChannels(change_msg, data_handler_conf_diff, flag);
411
412 // If the channel is not used.
413 if (flag == false) {
414 return;
415 }
416
417 Json response({});
418 Json data({});
419 data = JsonUtil::ProtoToTypedJson("data", data_handler_conf_diff);
420 data.erase("type");
421 response["data"]["info"] = data;
422 response["data"]["info"]["code"] = 0;
423 response["action"] =
425 ? "join"
426 : "leave";
427 websocket_->BroadcastData(response.dump());
428}
429
430} // namespace dreamview
431} // namespace apollo
static bool GetStringByPath(const nlohmann::json &json, const std::string &path, std::string *value)
Get a string value from the given json and path.
Definition json_util.cc:97
static bool GetJsonByPath(const nlohmann::json &json, const std::vector< std::string > &paths, nlohmann::json *value)
Get the json from the given json and path.
Definition json_util.cc:73
static nlohmann::json ProtoToTypedJson(const std::string &json_type, const google::protobuf::Message &proto)
Convert proto to a json string.
Definition json_util.cc:37
static bool GetNumberByPath(const nlohmann::json &json, const std::string &path, T *value)
Get a number value from the given json and path.
Definition json_util.h:120
A class that manages dreamview plus plug-ins.
void BrocastDataHandlerConf(bool clear_channel_msg=false)
Broadcast data handler conf for all connections
SocketManager(WebSocketHandler *websocket, UpdaterManager *updater_manager, DvPluginManager *dv_plugin_manager)
Constructor of SocketManager.
Management for all data updater.
bool Start(const std::string &path_name, const double &time_interval_ms, const std::string &channel_name="", nlohmann::json *subscribe_param=nullptr)
Start a updater implemetnent.
UpdaterBase * GetUpdater(const std::string &path_name)
Get registered updater
bool Stop(const std::string &path_name, const std::string &channel_name)
Stop updater publish data.
bool IsChannelInUpdater(const std::string &path_name, const std::string &message_type, const std::string &channel_name)
Check if the channel belongs to an updater.
The WebSocketHandler, built on top of CivetWebSocketHandler, is a websocket handler that handles diff...
bool BroadcastData(const std::string &data, bool skippable=false)
Sends the provided data to all the connected clients.
void RegisterMessageHandler(std::string type, MessageHandler handler)
Add a new message handler for a message type.
bool SendData(Connection *conn, const std::string &data, bool skippable=false, int op_code=MG_WEBSOCKET_OPCODE_TEXT)
Sends the provided data to a specific connected client.
void RegisterConnectionReadyHandler(ConnectionReadyHandler handler)
Add a new handler for new connections.
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
#define AWARN
Definition log.h:43
nlohmann::json Json
bool GetProtoFromFile(const std::string &file_name, google::protobuf::Message *message)
Parses the content of the file specified by the file_name as a representation of protobufs,...
Definition file.cc:132
class register implement
Definition arena_queue.h:37
SocketManager to manage all websocket
optional RoleAttributes role_attr
map< string, DataHandlerInfo > data_handler_info