24#include "modules/dreamview_plus/proto/data_handler.pb.h"
33using apollo::common::util::ContainsKey;
37 : node_(cyber::CreateNode(
"channels_info_updater")),
38 websocket_(websocket) {}
41 if (channel_updaters_.count(channel_name) == 0) {
42 AERROR <<
"Failed to publish channel info for channel has not been "
46 if (GetThrottleFlag()) {
51 ControlThrottle(
true);
52 std::string channel_data_str =
53 channel_updaters_[channel_name].channel_data_str;
54 StreamData stream_data;
55 std::string stream_data_string;
56 stream_data.set_action(
"stream");
57 stream_data.set_data_name(
"cyber");
58 std::vector<uint8_t> byte_data(channel_data_str.begin(),
59 channel_data_str.end());
60 stream_data.set_data(&(byte_data[0]), byte_data.size());
61 stream_data.set_type(
"cyber");
62 stream_data.set_channel_name(channel_name);
63 stream_data.SerializeToString(&stream_data_string);
67void ChannelsUpdater::ControlThrottle(
bool add_lock) {
69 boost::unique_lock<boost::shared_mutex> wlock(throttle_mutex_);
74bool ChannelsUpdater::GetThrottleFlag() {
75 boost::shared_lock<boost::shared_mutex> rlock(throttle_mutex_);
80 const std::string &channel_name,
81 nlohmann::json *subscribe_param) {
82 if (!timer_initialized_) {
84 time_interval_ms, [
this]() { this->ControlThrottle(
false); },
false));
87 SubscribeChannel(channel_name);
90bool ChannelsUpdater::SubscribeChannel(
const std::string &channel_name) {
91 if (channel_name.empty() || channel_updaters_.count(channel_name) == 1) {
92 AERROR <<
"Invalid channel name or channel name has already "
93 "registered,avoid dumplicate register!";
96 struct ChannelUpdater channel_updater;
97 channel_updaters_[channel_name] = channel_updater;
98 auto cb = [
this, channel_name](
const std::shared_ptr<const RawMessage> &msg) {
100 msg->SerializeToString(&str);
101 channel_updaters_[channel_name].channel_data_str = str;
104 auto reader = node_->CreateReader<RawMessage>(channel_name, cb);
106 channel_updaters_.erase(channel_name);
109 channel_updaters_[channel_name].reader = reader;
114 if (channel_name.empty()) {
115 AERROR <<
"Unsubscribe channelsInfo must bring channel param.";
118 if (channel_updaters_.count(channel_name) == 0) {
119 AERROR <<
"This channel is unsubscribed,no need to unsubscribe.";
122 node_->DeleteReader(channel_name);
123 channel_updaters_.erase(channel_name);
Used to perform oneshot or periodic timing tasks
void StopStream(const std::string &channel_name="") override
Stop data flow.
void StartStream(const double &time_interval_ms, const std::string &channel_name="", nlohmann::json *subscribe_param=nullptr) override
Start data flow.
void PublishMessage(const std::string &channel_name="") override
Publish Message to dreamview frontend.
ChannelsUpdater(WebSocketHandler *websocket)
Constructor with the websocket handler.
The WebSocketHandler, built on top of CivetWebSocketHandler, is a websocket handler that handles diff...
bool BroadcastBinaryData(const std::string &data, bool skippable=false)
Sends the provided binary data to all the connected clients.