22#include "nlohmann/json.hpp"
23#include "pcl/filters/voxel_grid.h"
24#include "yaml-cpp/yaml.h"
26#include "modules/dreamview/proto/point_cloud.pb.h"
39using Json = nlohmann::json;
47 websocket_(websocket) {
48 localization_reader_ = node_->CreateReader<LocalizationEstimate>(
49 FLAGS_localization_topic,
50 [
this](
const std::shared_ptr<LocalizationEstimate> &msg) {
51 UpdateLocalizationTime(msg);
57PointCloudChannelUpdater* PointCloudUpdater::GetPointCloudChannelUpdater(
58 const std::string &channel_name) {
59 std::lock_guard<std::mutex> lck(channel_updater_map_mutex_);
60 if (channel_updaters_.find(channel_name) == channel_updaters_.end()) {
61 channel_updaters_[channel_name] =
62 new PointCloudChannelUpdater(channel_name);
63 if (channel_name == FLAGS_perception_edge_info_topic) {
64 channel_updaters_[channel_name]->perception_edge_reader_ =
65 node_->CreateReader<PerceptionEdgeInfo>(
68 this](
const std::shared_ptr<PerceptionEdgeInfo> &msg) {
69 UpdatePerceptionEdge(msg, channel_name);
72 channel_updaters_[channel_name]->point_cloud_reader_ =
73 node_->CreateReader<drivers::PointCloud>(
76 this](
const std::shared_ptr<drivers::PointCloud> &msg) {
77 UpdatePointCloud(msg, channel_name);
81 return channel_updaters_[channel_name];
85 const std::string &channel_name,
86 nlohmann::json *subscribe_param) {
87 if (channel_name.empty()) {
88 AERROR <<
"Failed to subscribe channel for channel is empty!";
91 if (std::find(channels_.begin(), channels_.end(), channel_name) ==
93 AERROR <<
"Failed to subscribe point cloud updater, for channel: "
94 << channel_name <<
" is invalid.";
97 if (time_interval_ms > 0) {
99 GetPointCloudChannelUpdater(channel_name);
102 [channel_name,
this]() { this->
OnTimer(channel_name); },
false));
113 if (channel_name.empty()) {
114 AERROR <<
"Failed to unsubscribe channel for channel is empty!";
141 boost::unique_lock<boost::shared_mutex> writer_lock(
mutex_);
145 boost::shared_lock<boost::shared_mutex> reader_lock(
mutex_);
149 StreamData stream_data;
150 std::string stream_data_string;
151 stream_data.set_action(
"stream");
152 stream_data.set_data_name(
"pointcloud");
153 stream_data.set_channel_name(channel_name);
154 std::vector<uint8_t> byte_data(to_send.begin(), to_send.end());
155 stream_data.set_data(&(byte_data[0]), byte_data.size());
156 stream_data.set_type(
"pointcloud");
157 stream_data.SerializeToString(&stream_data_string);
161void PointCloudUpdater::GetChannelMsg(std::vector<std::string> *channels) {
166pcl::PointCloud<pcl::PointXYZ>::Ptr PointCloudUpdater::ConvertPCLPointCloud(
167 const std::shared_ptr<drivers::PointCloud> &point_cloud,
168 const std::string &channel_name) {
169 pcl::PointCloud<pcl::PointXYZ>::Ptr pcl_ptr(
170 new pcl::PointCloud<pcl::PointXYZ>);
171 pcl_ptr->width = point_cloud->width();
172 pcl_ptr->height = point_cloud->height();
173 pcl_ptr->is_dense =
false;
175 if (point_cloud->width() * point_cloud->height() !=
176 static_cast<unsigned int>(point_cloud->point_size())) {
178 pcl_ptr->height = point_cloud->point_size();
180 pcl_ptr->points.resize(point_cloud->point_size());
182 for (
size_t i = 0; i < pcl_ptr->points.size(); ++i) {
183 const auto &point = point_cloud->point(
static_cast<int>(i));
184 pcl_ptr->points[i].x = point.x();
185 pcl_ptr->points[i].y = point.y();
186 pcl_ptr->points[i].z = point.z();
189 TransformPointCloud(pcl_ptr, point_cloud->header().frame_id());
194void PointCloudUpdater::UpdatePerceptionEdge(
195 const std::shared_ptr<PerceptionEdgeInfo> &perception_edge,
196 const std::string &channel_name) {
197 PointCloudChannelUpdater *updater = GetPointCloudChannelUpdater(channel_name);
198 if (perception_edge->header().has_timestamp_sec()) {
199 updater->last_point_cloud_time_ = perception_edge->header().timestamp_sec();
201 if (std::fabs(last_localization_time_ - updater->last_point_cloud_time_) >
203 AWARN <<
"skipping outdated perception edge data";
207 point_cloud_pb.set_is_edge(
true);
208 for (
size_t idx = 0; idx < perception_edge->edge_point_size(); ++idx) {
209 auto pt = perception_edge->edge_point(idx);
210 if (!std::isnan(pt.x()) && !std::isnan(pt.y()) && !std::isnan(pt.z())) {
211 point_cloud_pb.add_num(pt.x());
212 point_cloud_pb.add_num(pt.y());
213 point_cloud_pb.add_num(pt.z());
217 boost::unique_lock<boost::shared_mutex> writer_lock(
mutex_);
218 point_cloud_pb.SerializeToString(&updater->point_cloud_str_);
222void PointCloudUpdater::UpdatePointCloud(
223 const std::shared_ptr<drivers::PointCloud> &point_cloud,
224 const std::string& channel_name) {
225 PointCloudChannelUpdater *updater = GetPointCloudChannelUpdater(channel_name);
226 if (point_cloud->header().has_timestamp_sec()) {
227 updater->last_point_cloud_time_ = point_cloud->header().timestamp_sec();
229 updater->last_point_cloud_time_ = point_cloud->measurement_time();
231 if (std::fabs(last_localization_time_ - updater->last_point_cloud_time_) >
233 AWARN <<
"skipping outdated point cloud data";
236 pcl::PointCloud<pcl::PointXYZ>::Ptr pcl_ptr;
238 if (enable_voxel_filter_) {
239 if (updater->future_ready_) {
240 updater->future_ready_ =
false;
242 pcl_ptr = ConvertPCLPointCloud(point_cloud, channel_name);
243 std::future<void>
f = cyber::Async(&PointCloudUpdater::FilterPointCloud,
244 this, pcl_ptr, channel_name);
245 updater->async_future_ = std::move(
f);
248 pcl_ptr = ConvertPCLPointCloud(point_cloud, channel_name);
249 this->FilterPointCloud(pcl_ptr, channel_name);
253void PointCloudUpdater::FilterPointCloud(
254 pcl::PointCloud<pcl::PointXYZ>::Ptr pcl_ptr,
255 const std::string &channel_name) {
256 PointCloudChannelUpdater *updater = GetPointCloudChannelUpdater(channel_name);
257 pcl::PointCloud<pcl::PointXYZ>::Ptr pcl_filtered_ptr(
258 new pcl::PointCloud<pcl::PointXYZ>);
265 if (enable_voxel_filter_) {
266 pcl::VoxelGrid<pcl::PointXYZ> voxel_grid;
267 voxel_grid.setInputCloud(pcl_ptr);
268 voxel_grid.setLeafSize(
static_cast<float>(FLAGS_voxel_filter_size),
269 static_cast<float>(FLAGS_voxel_filter_size),
270 static_cast<float>(FLAGS_voxel_filter_height));
271 voxel_grid.filter(*pcl_filtered_ptr);
273 pcl_filtered_ptr = pcl_ptr;
277 for (
size_t idx = 0; idx < pcl_filtered_ptr->size(); ++idx) {
278 pcl::PointXYZ &pt = pcl_filtered_ptr->points[idx];
279 if (!std::isnan(pt.x) && !std::isnan(pt.y) && !std::isnan(pt.z)) {
280 point_cloud_pb.add_num(pt.x);
281 point_cloud_pb.add_num(pt.y);
282 point_cloud_pb.add_num(pt.z);
286 boost::unique_lock<boost::shared_mutex> writer_lock(
mutex_);
287 point_cloud_pb.SerializeToString(&updater->point_cloud_str_);
288 updater->future_ready_ =
true;
292void PointCloudUpdater::UpdateLocalizationTime(
293 const std::shared_ptr<LocalizationEstimate> &localization) {
294 last_localization_time_ = localization->header().timestamp_sec();
297void PointCloudUpdater::TransformPointCloud(
298 pcl::PointCloud<pcl::PointXYZ>::Ptr &point_cloud,
299 const std::string &frame_id) {
301 AERROR <<
"Failed to get frame id";
307 stamped_transform = tf2_buffer->
lookupTransform(
"localization", frame_id,
309 }
catch (tf2::TransformException &ex) {
315 auto &matrix = stamped_transform.
transform();
318 Eigen::Vector3f translation(
319 matrix.translation().x(), matrix.translation().y(),
320 matrix.translation().z());
323 Eigen::Quaternionf rotation(
324 matrix.rotation().qw(), matrix.rotation().qx(), matrix.rotation().qy(),
325 matrix.rotation().qz());
326 rotation.normalize();
329 Eigen::Affine3f transform = Eigen::Affine3f::Identity();
330 transform.translation() << translation;
331 transform.rotate(rotation);
334 pcl::transformPointCloud(*point_cloud, *point_cloud, transform);
337 Eigen::Matrix4f counter_transform = Eigen::Matrix4f::Identity();
338 counter_transform(0, 0) = 0;
339 counter_transform(0, 1) = 1;
340 counter_transform(1, 0) = -1;
341 counter_transform(1, 1) = 0;
344 pcl::transformPointCloud(*point_cloud, *point_cloud, counter_transform);
Cyber has builtin time type Time.
Used to perform oneshot or periodic timing tasks
void StopStream(const std::string &channel_name="") override
Stop data flow.
static boost::shared_mutex mutex_
void StartStream(const double &time_interval_ms, const std::string &channel_name="", nlohmann::json *subscribe_param=nullptr) override
Start data flow.
PointCloudUpdater(WebSocketHandler *websocket, SimulationWorldUpdater *sim_world_updater)
Constructor with the websocket handler.
void OnTimer(const std::string &channel_name="")
void PublishMessage(const std::string &channel_name="") override
Publish Message to dreamview frontend.
Base Class for data updater with multiply channels.
void GetChannelMsgWithFilter(std::vector< std::string > *channels)
The WebSocketHandler, built on top of CivetWebSocketHandler, is a websocket handler that handles diff...
bool BroadcastBinaryData(const std::string &data, bool skippable=false)
Sends the provided binary data to all the connected clients.
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space)
std::string frame_id
Point cloud frame id
std::atomic< bool > future_ready_
double last_point_cloud_time_
std::shared_ptr< cyber::Reader< apollo::perception::PerceptionEdgeInfo > > perception_edge_reader_
std::shared_ptr< cyber::Reader< drivers::PointCloud > > point_cloud_reader_
std::unique_ptr< cyber::Timer > timer_
std::string point_cloud_str_