Apollo 10.0
自动驾驶开放平台
channels_updater.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2023 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
17
18#include <memory>
19#include <string>
20#include <vector>
21
22// #include "google/protobuf/util/json_util.h"
23
24#include "modules/dreamview_plus/proto/data_handler.pb.h"
25
28
29namespace apollo {
30
31namespace dreamview {
32
33using apollo::common::util::ContainsKey;
35
37 : node_(cyber::CreateNode("channels_info_updater")),
38 websocket_(websocket) {}
39
40void ChannelsUpdater::PublishMessage(const std::string &channel_name) {
41 if (channel_updaters_.count(channel_name) == 0) {
42 AERROR << "Failed to publish channel info for channel has not been "
43 "registered!";
44 return;
45 }
46 if (GetThrottleFlag()) {
47 // Can continue to send data stream
48 return;
49 }
50 // lock to avoid excessive stream data
51 ControlThrottle(true);
52 std::string channel_data_str =
53 channel_updaters_[channel_name].channel_data_str;
54 StreamData stream_data;
55 std::string stream_data_string;
56 stream_data.set_action("stream");
57 stream_data.set_data_name("cyber");
58 std::vector<uint8_t> byte_data(channel_data_str.begin(),
59 channel_data_str.end());
60 stream_data.set_data(&(byte_data[0]), byte_data.size());
61 stream_data.set_type("cyber");
62 stream_data.set_channel_name(channel_name);
63 stream_data.SerializeToString(&stream_data_string);
64 websocket_->BroadcastBinaryData(stream_data_string);
65}
66
67void ChannelsUpdater::ControlThrottle(bool add_lock) {
68 {
69 boost::unique_lock<boost::shared_mutex> wlock(throttle_mutex_);
70 throttle = add_lock;
71 }
72}
73
74bool ChannelsUpdater::GetThrottleFlag() {
75 boost::shared_lock<boost::shared_mutex> rlock(throttle_mutex_);
76 return throttle;
77}
78
79void ChannelsUpdater::StartStream(const double &time_interval_ms,
80 const std::string &channel_name,
81 nlohmann::json *subscribe_param) {
82 if (!timer_initialized_) {
83 timer_.reset(new cyber::Timer(
84 time_interval_ms, [this]() { this->ControlThrottle(false); }, false));
85 timer_->Start();
86 }
87 SubscribeChannel(channel_name);
88}
89
90bool ChannelsUpdater::SubscribeChannel(const std::string &channel_name) {
91 if (channel_name.empty() || channel_updaters_.count(channel_name) == 1) {
92 AERROR << "Invalid channel name or channel name has already "
93 "registered,avoid dumplicate register!";
94 return false;
95 }
96 struct ChannelUpdater channel_updater;
97 channel_updaters_[channel_name] = channel_updater;
98 auto cb = [this, channel_name](const std::shared_ptr<const RawMessage> &msg) {
99 std::string str;
100 msg->SerializeToString(&str);
101 channel_updaters_[channel_name].channel_data_str = str;
102 PublishMessage(channel_name);
103 };
104 auto reader = node_->CreateReader<RawMessage>(channel_name, cb);
105 if (!reader) {
106 channel_updaters_.erase(channel_name);
107 return false;
108 }
109 channel_updaters_[channel_name].reader = reader;
110 return true;
111}
112
113void ChannelsUpdater::StopStream(const std::string &channel_name) {
114 if (channel_name.empty()) {
115 AERROR << "Unsubscribe channelsInfo must bring channel param.";
116 return;
117 }
118 if (channel_updaters_.count(channel_name) == 0) {
119 AERROR << "This channel is unsubscribed,no need to unsubscribe.";
120 return;
121 }
122 node_->DeleteReader(channel_name);
123 channel_updaters_.erase(channel_name);
124 return;
125}
126} // namespace dreamview
127} // namespace apollo
Used to perform oneshot or periodic timing tasks
Definition timer.h:71
void StopStream(const std::string &channel_name="") override
Stop data flow.
void StartStream(const double &time_interval_ms, const std::string &channel_name="", nlohmann::json *subscribe_param=nullptr) override
Start data flow.
void PublishMessage(const std::string &channel_name="") override
Publish Message to dreamview frontend.
ChannelsUpdater(WebSocketHandler *websocket)
Constructor with the websocket handler.
The WebSocketHandler, built on top of CivetWebSocketHandler, is a websocket handler that handles diff...
bool BroadcastBinaryData(const std::string &data, bool skippable=false)
Sends the provided binary data to all the connected clients.
#define AERROR
Definition log.h:44
Some map util functions.
class register implement
Definition arena_queue.h:37