Apollo 10.0
自动驾驶开放平台
perception_camera_updater.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2019 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <limits>
20#include <string>
21#include <vector>
22
23#include "opencv2/opencv.hpp"
24
25#include "modules/common_msgs/basic_msgs/geometry.pb.h"
26#include "modules/common_msgs/perception_msgs/perception_obstacle.pb.h"
27#include "modules/dreamview/proto/camera_update.pb.h"
28
29#include "cyber/common/file.h"
31namespace apollo {
32namespace dreamview {
33
39
40namespace {
41void ConvertMatrixToArray(const Eigen::Matrix4d &matrix,
42 std::vector<double> *array) {
43 const double *pointer = matrix.data();
44 for (int i = 0; i < matrix.size(); ++i) {
45 array->push_back(pointer[i]);
46 }
47}
48
49template <typename Point>
50void ConstructTransformationMatrix(const Quaternion &quaternion,
51 const Point &translation,
52 Eigen::Matrix4d *matrix) {
53 matrix->setConstant(0);
54 Eigen::Quaterniond q;
55 q.x() = quaternion.qx();
56 q.y() = quaternion.qy();
57 q.z() = quaternion.qz();
58 q.w() = quaternion.qw();
59 matrix->block<3, 3>(0, 0) = q.normalized().toRotationMatrix();
60 (*matrix)(0, 3) = translation.x();
61 (*matrix)(1, 3) = translation.y();
62 (*matrix)(2, 3) = translation.z();
63 (*matrix)(3, 3) = 1;
64}
65} // namespace
66
67PerceptionCameraUpdater::PerceptionCameraUpdater(WebSocketHandler *websocket)
68 : UpdaterWithChannelsBase({"drivers.Image"}, {""}),
69 websocket_(websocket),
70 node_(cyber::CreateNode("perception_camera_updater")) {
71 InitReaders();
72}
73
74void PerceptionCameraUpdater::StartStream(const double &time_interval_ms,
75 const std::string &channel_name,
76 nlohmann::json *subscribe_param) {
77 if (channel_name.empty()) {
78 AERROR << "Failed to subscribe channel for channel is empty!";
79 return;
80 }
81 if (std::find(channels_.begin(), channels_.end(), channel_name) ==
82 channels_.end()) {
83 AERROR << "Failed to subscribe channel: " << channel_name
84 << " for channels_ not registered!";
85 return;
86 }
87 if (time_interval_ms > 0) {
88 CameraChannelUpdater *updater = GetCameraChannelUpdater(channel_name);
89 updater->enabled_ = true;
90 updater->timer_.reset(new cyber::Timer(
91 time_interval_ms,
92 [channel_name, this]() { this->OnTimer(channel_name); }, false));
93 updater->timer_->Start();
94 } else {
95 this->OnTimer(channel_name);
96 }
97}
98
99CameraChannelUpdater *PerceptionCameraUpdater::GetCameraChannelUpdater(
100 const std::string &channel_name) {
101 std::lock_guard<std::mutex> lck(channel_updater_map_mutex_);
102 if (channel_updaters_.find(channel_name) == channel_updaters_.end()) {
103 channel_updaters_[channel_name] = new CameraChannelUpdater(channel_name);
104 channel_updaters_[channel_name]->perception_camera_reader_ =
105 node_->CreateReader<drivers::Image>(
106 channel_name,
107 [channel_name, this](const std::shared_ptr<drivers::Image> &image) {
108 OnImage(image, channel_name);
109 });
110 // get obstacle channel reader
111 channel_updaters_[channel_name]->perception_obstacle_reader_ =
112 GetObstacleReader(channel_name);
113 }
114 return channel_updaters_[channel_name];
115}
116
117void PerceptionCameraUpdater::Stop() {
118 channel_updaters_.clear();
119}
120
121void PerceptionCameraUpdater::StopStream(const std::string &channel_name) {
122 if (channel_name.empty()) {
123 AERROR << "Failed to unsubscribe channel for channel is empty!";
124 return;
125 }
126 CameraChannelUpdater *updater = GetCameraChannelUpdater(channel_name);
127 if (updater->enabled_) {
128 if (updater->timer_) {
129 updater->timer_->Stop();
130 }
131 updater->localization_queue_.clear();
132 updater->image_buffer_.clear();
133 updater->current_image_timestamp_ = 0.0;
134 updater->camera_update_.Clear();
135 updater->enabled_ = false;
136 }
137}
138
139void PerceptionCameraUpdater::OnTimer(const std::string &channel_name) {
140 PublishMessage(channel_name);
141}
142
143void PerceptionCameraUpdater::PublishMessage(const std::string &channel_name) {
144 std::string to_send;
145 // the channel has no data input, clear the sending object.
146 if (!channel_updaters_[channel_name]
147 ->perception_camera_reader_->HasWriter()) {
148 CameraChannelUpdater *updater = GetCameraChannelUpdater(channel_name);
149 updater->image_buffer_.clear();
150 updater->current_image_timestamp_ = 0.0;
151 updater->camera_update_.Clear();
152 to_send = "";
153 } else {
154 GetUpdate(&to_send, channel_name);
155 }
156 StreamData stream_data;
157 std::string stream_data_string;
158 stream_data.set_action("stream");
159 stream_data.set_data_name("camera");
160 stream_data.set_channel_name(channel_name);
161 std::vector<uint8_t> byte_data(to_send.begin(), to_send.end());
162 stream_data.set_data(&(byte_data[0]), byte_data.size());
163 stream_data.set_type("camera");
164 stream_data.SerializeToString(&stream_data_string);
165 websocket_->BroadcastBinaryData(stream_data_string);
166}
167
168void PerceptionCameraUpdater::GetImageLocalization(
169 std::vector<double> *localization, const std::string &channel_name) {
170 CameraChannelUpdater *updater = GetCameraChannelUpdater(channel_name);
171 if (updater->localization_queue_.empty()) {
172 // AERROR << "Localization queue is empty, cannot get localization for
173 // image,"
174 // << "image_timestamp: " << current_image_timestamp_;
175 return;
176 }
177
178 double timestamp_diff = std::numeric_limits<double>::max();
179 Pose image_pos;
180 while (!updater->localization_queue_.empty()) {
181 const double tmp_diff =
182 updater->localization_queue_.front()->measurement_time() -
184 if (tmp_diff < 0) {
185 timestamp_diff = tmp_diff;
186 image_pos = updater->localization_queue_.front()->pose();
187 if (updater->localization_queue_.size() > 1) {
188 updater->localization_queue_.pop_front();
189 } else {
190 // At least keep one pose in queue, in case there's no localization
191 // coming between two requests.
192 break;
193 }
194 } else {
195 if (tmp_diff < std::fabs(timestamp_diff)) {
196 image_pos = updater->localization_queue_.front()->pose();
197 }
198 break;
199 }
200 }
201
202 Eigen::Matrix4d localization_matrix;
203 ConstructTransformationMatrix(image_pos.orientation(), image_pos.position(),
204 &localization_matrix);
205 ConvertMatrixToArray(localization_matrix, localization);
206}
207
208bool PerceptionCameraUpdater::QueryStaticTF(const std::string &frame_id,
209 const std::string &child_frame_id,
210 Eigen::Matrix4d *matrix) {
211 TransformStamped transform;
212 if (tf_buffer_->GetLatestStaticTF(frame_id, child_frame_id, &transform)) {
213 ConstructTransformationMatrix(transform.transform().rotation(),
214 transform.transform().translation(), matrix);
215 return true;
216 }
217 return false;
218}
219
220void PerceptionCameraUpdater::GetLocalization2CameraTF(
221 std::vector<double> *localization2camera_tf) {
222 Eigen::Matrix4d localization2camera_mat = Eigen::Matrix4d::Identity();
223
224 // Since "/tf" topic has dynamic updates of world->novatel and
225 // world->localization, novatel->localization in tf buffer is being changed
226 // and their transformation does not represent for static transform anymore.
227 // Thus we query static transform respectively and calculate by ourselves
228 Eigen::Matrix4d loc2novatel_mat;
229 if (QueryStaticTF("localization", "novatel", &loc2novatel_mat)) {
230 localization2camera_mat *= loc2novatel_mat;
231 }
232
233 Eigen::Matrix4d novatel2lidar_mat;
234 if (QueryStaticTF("novatel", "velodyne128", &novatel2lidar_mat)) {
235 localization2camera_mat *= novatel2lidar_mat;
236 }
237
238 Eigen::Matrix4d lidar2camera_mat;
239 if (QueryStaticTF("velodyne128", "front_6mm", &lidar2camera_mat)) {
240 localization2camera_mat *= lidar2camera_mat;
241 }
242
243 ConvertMatrixToArray(localization2camera_mat, localization2camera_tf);
244}
245
246// void PerceptionCameraUpdater::OnCompressedImage(
247// const std::shared_ptr<CompressedImage> &compressed_image) {
248// if (!enabled_ ||
249// compressed_image->format() == "h265" /* skip video format */) {
250// return;
251// }
252
253// std::vector<uint8_t> compressed_raw_data(compressed_image->data().begin(),
254// compressed_image->data().end());
255// cv::Mat mat_image = cv::imdecode(compressed_raw_data, cv::IMREAD_COLOR);
256// const int width = mat_image.cols;
257// const int height = mat_image.rows;
258
259// // Scale down image size properly to reduce data transfer latency through
260// // websocket and ensure image quality is acceptable meanwhile
261// cv::resize(mat_image, mat_image,
262// cv::Size(static_cast<int>(mat_image.cols * kImageScale),
263// static_cast<int>(mat_image.rows * kImageScale)),
264// 0, 0, cv::INTER_LINEAR);
265// std::vector<uint8_t> tmp_buffer;
266// cv::imencode(".jpg", mat_image, tmp_buffer, std::vector<int>() /* params
267// */);
268
269// double next_image_timestamp;
270// if (compressed_image->has_measurement_time()) {
271// next_image_timestamp = compressed_image->measurement_time();
272// } else {
273// next_image_timestamp = compressed_image->header().timestamp_sec();
274// }
275
276// std::lock_guard<std::mutex> lock(image_mutex_);
277// if (next_image_timestamp < current_image_timestamp_) {
278// // If replay different bags, the timestamp may jump to earlier time and
279// // we need to clear the localization queue
280// localization_queue_.clear();
281// }
282// current_image_timestamp_ = next_image_timestamp;
283// camera_update_.set_image(&(tmp_buffer[0]), tmp_buffer.size());
284// camera_update_.set_image_aspect_ratio(static_cast<double>(width) / height);
285// }
286
287void PerceptionCameraUpdater::OnImage(
288 const std::shared_ptr<apollo::drivers::Image> &image,
289 const std::string& channel_name) {
290 CameraChannelUpdater* updater = GetCameraChannelUpdater(channel_name);
291 if (!updater->enabled_) {
292 return;
293 }
294 cv::Mat mat(image->height(), image->width(), CV_8UC3,
295 const_cast<char *>(image->data().data()), image->step());
296 cv::cvtColor(mat, mat, cv::COLOR_RGB2BGR);
297 cv::resize(mat, mat,
298 cv::Size(static_cast<int>(image->width() * kImageScale),
299 static_cast<int>(image->height() * kImageScale)),
300 0, 0, cv::INTER_LINEAR);
301 cv::imencode(".jpg", mat, updater->image_buffer_, std::vector<int>());
302 double next_image_timestamp;
303 if (image->has_measurement_time()) {
304 next_image_timestamp = image->measurement_time();
305 } else {
306 next_image_timestamp = image->header().timestamp_sec();
307 }
308 std::lock_guard<std::mutex> lock(updater->image_mutex_);
309 if (next_image_timestamp < updater->current_image_timestamp_) {
310 updater->localization_queue_.clear();
311 }
312 updater->current_image_timestamp_ = next_image_timestamp;
313 updater->camera_update_.set_image(&(updater->image_buffer_[0]),
314 updater->image_buffer_.size());
315 updater->camera_update_.set_k_image_scale(kImageScale);
316}
317
318void PerceptionCameraUpdater::OnLocalization(
319 const std::shared_ptr<LocalizationEstimate> &localization) {
320 for (auto iter = channel_updaters_.begin(); iter != channel_updaters_.end();
321 iter++) {
322 if (iter->second->enabled_) {
323 std::lock_guard<std::mutex> lock(iter->second->localization_mutex_);
324 iter->second->localization_queue_.push_back(localization);
325 }
326 }
327}
328
329void PerceptionCameraUpdater::OnObstacles(
330 const std::shared_ptr<apollo::perception::PerceptionObstacles> &obstacles,
331 const std::string &channel_name) {
332 CameraChannelUpdater *updater = GetCameraChannelUpdater(channel_name);
333 perception_obstacle_enable_ = true;
334 std::lock_guard<std::mutex> lock(updater->obstacle_mutex_);
335 updater->bbox2ds.clear();
336 updater->obstacle_id.clear();
337 updater->obstacle_sub_type.clear();
338 for (const auto &obstacle : obstacles->perception_obstacle()) {
339 updater->bbox2ds.push_back(obstacle.bbox2d());
340 updater->obstacle_id.push_back(obstacle.id());
341 updater->obstacle_sub_type.push_back(obstacle.sub_type());
342 }
343}
344
345void PerceptionCameraUpdater::InitReaders() {
346 node_->CreateReader<LocalizationEstimate>(
347 FLAGS_localization_topic,
348 [this](const std::shared_ptr<LocalizationEstimate> &localization) {
349 OnLocalization(localization);
350 });
351}
352
353void PerceptionCameraUpdater::GetUpdate(std::string *camera_update,
354 const std::string &channel_name) {
355 {
356 std::vector<double> localization;
357 GetImageLocalization(&localization, channel_name);
358 CameraChannelUpdater *updater = GetCameraChannelUpdater(channel_name);
359 std::lock(updater->image_mutex_, updater->localization_mutex_,
360 updater->obstacle_mutex_);
361 std::lock_guard<std::mutex> lock1(updater->image_mutex_, std::adopt_lock);
362 std::lock_guard<std::mutex> lock2(updater->localization_mutex_,
363 std::adopt_lock);
364 std::lock_guard<std::mutex> lock3(updater->obstacle_mutex_,
365 std::adopt_lock);
366 *(updater->camera_update_).mutable_localization() = {localization.begin(),
367 localization.end()};
368 std::vector<double> localization2camera_tf;
369 GetLocalization2CameraTF(&localization2camera_tf);
370 *(updater->camera_update_).mutable_localization2camera_tf() = {
371 localization2camera_tf.begin(), localization2camera_tf.end()};
372 // Concurrently modify protobuf msg can cause ByteSizeConsistencyError
373 // when serializing, so we need lock.
374 if (perception_obstacle_enable_) {
375 *(updater->camera_update_).mutable_bbox2d() = {updater->bbox2ds.begin(),
376 updater->bbox2ds.end()};
377 *(updater->camera_update_).mutable_obstacles_id() = {
378 updater->obstacle_id.begin(), updater->obstacle_id.end()};
379 *(updater->camera_update_).mutable_obstacles_sub_type() = {
380 updater->obstacle_sub_type.begin(), updater->obstacle_sub_type.end()};
381 }
382 updater->camera_update_.SerializeToString(camera_update);
383 }
384}
385void PerceptionCameraUpdater::GetChannelMsg(
386 std::vector<std::string> *channels) {
387 GetChannelMsgWithFilter(channels);
388}
389
390std::shared_ptr<cyber::Reader<apollo::perception::PerceptionObstacles>>
391PerceptionCameraUpdater::GetObstacleReader(const std::string &channel_name) {
392 // TODO(fanyueqiao): Temporarily supports the demonstration of the demo,
393 // and needs to adapt to the changes of the new perception module in the
394 // future.
395 size_t camera_string_end_pos = channel_name.find_last_of("/");
396 size_t camera_string_start_pos =
397 channel_name.find_last_of("/", camera_string_end_pos - 1);
398 std::string camera_name =
399 channel_name.substr(camera_string_start_pos + 1,
400 camera_string_end_pos - camera_string_start_pos - 1);
401 std::string perception_obstacle_channel =
402 "/apollo/prediction/perception_obstacles_" + camera_name;
403 auto channel_manager =
404 apollo::cyber::service_discovery::TopologyManager::Instance()
405 ->channel_manager();
406 std::shared_ptr<cyber::Reader<apollo::perception::PerceptionObstacles>>
407 perception_obstacle_reader;
408 // The channel is invalid or there is no data to writer.
409 if (!channel_manager->HasWriter(perception_obstacle_channel)) {
410 perception_obstacle_reader =
411 node_->CreateReader<apollo::perception::PerceptionObstacles>(
412 FLAGS_perception_obstacle_topic,
413 [channel_name, this](
414 const std::shared_ptr<apollo::perception::PerceptionObstacles>
415 &obstacles) { OnObstacles(obstacles, channel_name); });
416
417 } else {
418 perception_obstacle_reader =
419 node_->CreateReader<apollo::perception::PerceptionObstacles>(
420 perception_obstacle_channel,
421 [channel_name, this](
422 const std::shared_ptr<apollo::perception::PerceptionObstacles>
423 &obstacles) { OnObstacles(obstacles, channel_name); });
424 }
425 return perception_obstacle_reader;
426}
427
428} // namespace dreamview
429} // namespace apollo
Used to perform oneshot or periodic timing tasks
Definition timer.h:71
#define AERROR
Definition log.h:44
class register implement
Definition arena_queue.h:37
optional double x
Definition frame.proto:18
optional double y
Definition frame.proto:19
optional double z
Definition frame.proto:20
optional double w
Definition frame.proto:21
std::vector< apollo::perception::BBox2D > bbox2ds
std::deque< std::shared_ptr< apollo::localization::LocalizationEstimate > > localization_queue_