32using Json = nlohmann::json;
39 websocket_(websocket),
40 updater_manager_(updater_manager),
41 dv_plugin_manager_(dv_plugin_manager) {
42 RegisterDataHandlers();
43 RegisterMessageHandlers();
44 auto channel_manager =
45 apollo::cyber::service_discovery::TopologyManager::Instance()
47 auto topology_callback =
49 this->RefreshChannels(change_msg);
51 channel_manager->AddChangeListener(topology_callback);
54void SocketManager::RegisterDataHandlers() {
55 enabled_ = GetProtoFromFile(FLAGS_data_handler_config_path,
56 &data_handler_base_conf_);
58 AERROR <<
"Unable to parse data handler configuration from file "
59 << FLAGS_data_handler_config_path;
61 for (
auto &data_handler_iter :
62 dv_plugin_manager_->data_handler_conf_.data_handler_info()) {
64 data_handler_iter.first) !=
66 AERROR <<
"There are duplicate updater handlers between dv and plugins";
69 auto data_handler_info =
70 data_handler_base_conf_.mutable_data_handler_info();
71 (*data_handler_info)[data_handler_iter.first] = data_handler_iter.second;
73 AINFO <<
"data_handler_base_conf_: " << data_handler_base_conf_.DebugString();
84void SocketManager::RegisterMessageHandlers() {
92 response[
"action"] =
"response";
93 response[
"data"][
"info"] = {};
94 response[
"data"][
"info"][
"code"] = this->Subscribe(json) ? 0 : -1;
95 websocket_->
SendData(conn, response.dump());
102 response[
"action"] =
"response";
103 response[
"data"][
"info"] = {};
104 response[
"data"][
"info"][
"code"] = this->UnSubscribe(json) ? 0 : -1;
105 websocket_->
SendData(conn, response.dump());
109bool SocketManager::GetDataUpdaterChannels(
const std::string &updater_path,
110 std::vector<std::string> *channels) {
111 UpdaterBase *updater = updater_manager_->
GetUpdater(updater_path);
112 UpdaterWithChannelsBase *updater_with_channels =
113 dynamic_cast<UpdaterWithChannelsBase *
>(updater);
114 if (updater_with_channels ==
nullptr) {
117 updater_with_channels->GetChannelMsg(channels);
121bool SocketManager::ModifyUpdaterChannels(
const std::string &updater_path,
122 const std::string &channel_name,
123 const std::string &operation) {
124 UpdaterBase *updater = updater_manager_->
GetUpdater(updater_path);
125 UpdaterWithChannelsBase *updater_with_channels =
126 dynamic_cast<UpdaterWithChannelsBase *
>(updater);
127 if (updater_with_channels ==
nullptr) {
130 auto iter = std::find(updater_with_channels->channels_.begin(),
131 updater_with_channels->channels_.end(), channel_name);
132 if (operation ==
"join" && iter == updater_with_channels->channels_.end()) {
133 updater_with_channels->channels_.emplace_back(channel_name);
134 }
else if (operation ==
"leave" &&
135 iter != updater_with_channels->channels_.end()) {
136 updater_with_channels->channels_.erase(iter);
141bool SocketManager::Subscribe(
const Json &json) {
142 const std::string url = json[
"data"][
"info"][
"websocketName"];
143 double time_interval_ms = 0;
146 std::string channel_name;
151 Json subscribe_param = {};
152 std::vector<std::string> json_path = {
"data",
"info",
"param"};
154 return updater_manager_->
Start(url, time_interval_ms, channel_name,
158bool SocketManager::UnSubscribe(
const Json &json) {
159 const std::string url = json[
"data"][
"info"][
"websocketName"];
160 std::string channel_name;
165 return updater_manager_->
Stop(url, channel_name);
168Json SocketManager::ClearDataHandlerChannelMsgs() {
169 Json response_data({});
173 for (
auto iter = data_handler_conf_.mutable_data_handler_info()->begin();
174 iter != data_handler_conf_.mutable_data_handler_info()->end();
176 if (iter->second.different_for_channels()) {
177 const std::string &
data_type = (*iter).first;
178 (*data_handler_conf_.mutable_data_handler_info())[
data_type]
182 complete_channel_count_.clear();
183 data_handler_channel_count_.clear();
187 response_data[
"info"] = data;
188 response_data[
"info"][
"code"] = 0;
189 return response_data;
192Json SocketManager::GetDataHandlerInfo() {
193 Json response_data({});
195 data_handler_conf_.Clear();
196 data_handler_channel_count_.clear();
197 complete_channel_count_.clear();
201 data_handler_base_conf_.mutable_data_handler_info()->begin();
202 iter != data_handler_base_conf_.mutable_data_handler_info()->end();
204 const std::string &
data_type = (*iter).first;
205 DataHandlerInfo updater_info;
206 updater_info.CopyFrom(iter->second);
207 if (
data_type == FLAGS_cyber_channels_key) {
209 auto topology_manager =
210 apollo::cyber::service_discovery::TopologyManager::Instance();
211 auto channel_manager = topology_manager->channel_manager();
213 std::vector<apollo::cyber::proto::RoleAttributes> role_attributes;
214 channel_manager->GetWriters(&role_attributes);
216 for (
const auto &role : role_attributes) {
217 if (!complete_channel_count_[role.channel_name()]) {
218 std::string proto_path;
219 apollo::cyber::message::ProtobufFactory::Instance()->GetProtoPath(
220 role.message_type(), proto_path);
221 if (proto_path.empty()) {
222 AWARN <<
"Cannot find proto location for message type "
223 << role.message_type();
226 auto channel_item = updater_info.add_channels();
227 channel_item->set_proto_path(proto_path);
228 channel_item->set_channel_name(role.channel_name());
229 channel_item->set_msg_type(role.message_type());
231 complete_channel_count_[role.channel_name()]++;
235 const std::string &updater_path = (*iter).second.data_name();
236 std::string proto_path;
237 apollo::cyber::message::ProtobufFactory::Instance()->GetProtoPath(
239 if (proto_path.empty()) {
240 AWARN <<
"Cannot find proto location for message type " <<
data_type;
243 updater_info.set_proto_path(proto_path);
244 if (iter->second.different_for_channels()) {
245 std::vector<std::string> channels;
246 if (!GetDataUpdaterChannels(updater_path, &channels)) {
249 for (
auto iter = channels.begin(); iter != channels.end(); iter++) {
250 if (!data_handler_channel_count_[*iter]) {
251 auto channel_item = updater_info.add_channels();
252 channel_item->set_channel_name(*iter);
254 data_handler_channel_count_[*iter]++;
258 (*data_handler_conf_.mutable_data_handler_info())[
data_type] =
264 response_data[
"info"] = data;
265 response_data[
"info"][
"code"] = 0;
266 return response_data;
272 clear_channel_msg ? ClearDataHandlerChannelMsgs() : GetDataHandlerInfo();
273 response[
"action"] =
"metadata";
277bool SocketManager::UpdateUpdaterInfo(
281 auto channel_item = updater_info.add_channels();
282 channel_item->set_channel_name(role_attr.channel_name());
285 channel_item->set_msg_type(role_attr.message_type());
286 std::string proto_path;
287 apollo::cyber::message::ProtobufFactory::Instance()->GetProtoPath(
288 role_attr.message_type(), proto_path);
289 if (proto_path.empty()) {
290 AWARN <<
"Cannot find proto location for message type "
291 << role_attr.message_type();
294 channel_item->set_proto_path(proto_path);
299bool SocketManager::UpdateCyberChannels(
301 DataHandlerInfo &updater_info) {
307 if (!complete_channel_count_[channel_name]) {
308 if (!UpdateUpdaterInfo(change_msg, updater_info))
return false;
309 complete_channel_count_[channel_name]++;
312 complete_channel_count_[channel_name]++;
314 complete_channel_count_[channel_name]--;
315 if (complete_channel_count_[channel_name] == 0) {
316 if (!UpdateUpdaterInfo(change_msg, updater_info))
return false;
323void SocketManager::RefreshDataHandlerChannels(
325 DataHandlerConf &data_handler_conf_diff,
bool &flag) {
328 std::string channel_name = role_attr.channel_name();
332 std::vector<std::string> records;
333 std::vector<std::string> other_record_node_name;
334 auto *record_player_factory = RecordPlayerFactory::Instance();
335 record_player_factory->GetAllRecords(&records);
336 const std::string current_record = record_player_factory->GetCurrentRecord();
337 for (
auto iter = records.begin(); iter != records.end(); iter++) {
338 if (current_record.empty() || *iter != current_record) {
339 std::string other_node_name =
"record_player_factory_" + *iter;
340 other_record_node_name.push_back(other_node_name);
344 std::string node_name = role_attr.node_name();
345 for (
auto iter = data_handler_base_conf_.mutable_data_handler_info()->begin();
346 iter != data_handler_base_conf_.mutable_data_handler_info()->end();
348 const std::string &
data_type = (*iter).first;
349 DataHandlerInfo updater_info;
350 updater_info.set_data_name(iter->second.data_name());
351 updater_info.set_different_for_channels(
352 iter->second.different_for_channels());
354 if (
data_type == FLAGS_cyber_channels_key) {
356 flag = UpdateCyberChannels(change_msg, updater_info);
359 if (!iter->second.different_for_channels()) {
360 (*data_handler_conf_diff.mutable_data_handler_info())[
data_type] =
365 (*iter).second.data_name(), message_type, channel_name);
366 if (is_channel_in_updater &&
367 (current_record.empty() ||
368 std::find(other_record_node_name.begin(),
369 other_record_node_name.end(),
370 node_name) == other_record_node_name.end())) {
373 if (!data_handler_channel_count_[channel_name]) {
375 auto channel_item = updater_info.add_channels();
376 channel_item->set_channel_name(channel_name);
377 if (!ModifyUpdaterChannels(iter->second.data_name(), channel_name,
381 data_handler_channel_count_[channel_name]++;
383 data_handler_channel_count_[channel_name]--;
384 if (data_handler_channel_count_[channel_name] == 0) {
386 auto channel_item = updater_info.add_channels();
387 channel_item->set_channel_name(channel_name);
388 if (!ModifyUpdaterChannels(iter->second.data_name(), channel_name,
395 (*data_handler_conf_diff.mutable_data_handler_info())[
data_type] =
400void SocketManager::RefreshChannels(
407 DataHandlerConf data_handler_conf_diff;
410 RefreshDataHandlerChannels(change_msg, data_handler_conf_diff, flag);
421 response[
"data"][
"info"] = data;
422 response[
"data"][
"info"][
"code"] = 0;
static bool GetStringByPath(const nlohmann::json &json, const std::string &path, std::string *value)
Get a string value from the given json and path.
static bool GetJsonByPath(const nlohmann::json &json, const std::vector< std::string > &paths, nlohmann::json *value)
Get the json from the given json and path.
static nlohmann::json ProtoToTypedJson(const std::string &json_type, const google::protobuf::Message &proto)
Convert proto to a json string.
static bool GetNumberByPath(const nlohmann::json &json, const std::string &path, T *value)
Get a number value from the given json and path.
A class that manages dreamview plus plug-ins.
void BrocastDataHandlerConf(bool clear_channel_msg=false)
Broadcast data handler conf for all connections
SocketManager(WebSocketHandler *websocket, UpdaterManager *updater_manager, DvPluginManager *dv_plugin_manager)
Constructor of SocketManager.
Management for all data updater.
bool Start(const std::string &path_name, const double &time_interval_ms, const std::string &channel_name="", nlohmann::json *subscribe_param=nullptr)
Start a updater implemetnent.
UpdaterBase * GetUpdater(const std::string &path_name)
Get registered updater
bool Stop(const std::string &path_name, const std::string &channel_name)
Stop updater publish data.
bool IsChannelInUpdater(const std::string &path_name, const std::string &message_type, const std::string &channel_name)
Check if the channel belongs to an updater.
The WebSocketHandler, built on top of CivetWebSocketHandler, is a websocket handler that handles diff...
bool BroadcastData(const std::string &data, bool skippable=false)
Sends the provided data to all the connected clients.
void RegisterMessageHandler(std::string type, MessageHandler handler)
Add a new message handler for a message type.
bool SendData(Connection *conn, const std::string &data, bool skippable=false, int op_code=MG_WEBSOCKET_OPCODE_TEXT)
Sends the provided data to a specific connected client.
void RegisterConnectionReadyHandler(ConnectionReadyHandler handler)
Add a new handler for new connections.
bool GetProtoFromFile(const std::string &file_name, google::protobuf::Message *message)
Parses the content of the file specified by the file_name as a representation of protobufs,...
SocketManager to manage all websocket
optional RoleAttributes role_attr
optional OperateType operate_type
optional RoleType role_type
optional string channel_name
optional string message_type
map< string, DataHandlerInfo > data_handler_info