Apollo 10.0
自动驾驶开放平台
play_task_producer.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <iostream>
20#include <limits>
21
22#include "cyber/common/log.h"
24#include "cyber/cyber.h"
26
27namespace apollo {
28namespace cyber {
29namespace record {
30
31const uint32_t PlayTaskProducer::kMinTaskBufferSize = 500;
32const uint32_t PlayTaskProducer::kPreloadTimeSec = 3;
33const uint64_t PlayTaskProducer::kSleepIntervalNanoSec = 1000000;
34const char record_info_channel[] = "/apollo/cyber/record_info";
35
37 const PlayParam& play_param,
38 const NodePtr& node,
39 const bool preload_fill_buffer_mode)
40 : play_param_(play_param),
41 task_buffer_(task_buffer),
42 produce_th_(nullptr),
43 is_initialized_(false),
44 is_stopped_(true),
45 node_(node),
46 record_viewer_ptr_(nullptr),
47 earliest_begin_time_(std::numeric_limits<uint64_t>::max()),
48 latest_end_time_(0),
49 total_msg_num_(0),
50 preload_fill_buffer_mode_(preload_fill_buffer_mode) {}
51
53
55 if (is_initialized_.exchange(true)) {
56 AERROR << "producer has been initialized.";
57 return false;
58 }
59 if (preload_fill_buffer_mode_ && node_ == nullptr) {
60 AERROR << "node from same process should pass node param for construct";
61 return false;
62 }
63 if (!preload_fill_buffer_mode_ && node_ != nullptr) {
64 AERROR << "invalid param: nullptr node";
65 node_ = nullptr;
66 }
67
68 if (!ReadRecordInfo() || !UpdatePlayParam() || !CreateWriters()) {
69 is_initialized_.store(false);
70 return false;
71 }
72
73 return true;
74}
75
77 if (!is_initialized_.load()) {
78 AERROR << "please call Init firstly.";
79 return;
80 }
81
82 if (!is_stopped_.exchange(false)) {
83 AERROR << "producer has been started.";
84 return;
85 }
86
87 if (preload_fill_buffer_mode_) {
88 produce_th_.reset(
89 new std::thread(&PlayTaskProducer::ThreadFuncUnderPreloadMode, this));
90 } else {
91 produce_th_.reset(new std::thread(&PlayTaskProducer::ThreadFunc, this));
92 }
93}
94
96 if (!is_stopped_.exchange(true)) {
97 return;
98 }
99 if (produce_th_ != nullptr && produce_th_->joinable()) {
100 produce_th_->join();
101 produce_th_ = nullptr;
102 }
103}
104
105bool PlayTaskProducer::ReadRecordInfo() {
106 if (play_param_.files_to_play.empty()) {
107 AINFO << "no file to play.";
108 return false;
109 }
110
111 auto pb_factory = message::ProtobufFactory::Instance();
112
113 // loop each file
114 for (auto& file : play_param_.files_to_play) {
115 auto record_reader = std::make_shared<RecordReader>(file);
116 if (!record_reader->IsValid()) {
117 continue;
118 }
119 if (!record_reader->GetHeader().is_complete()) {
120 std::cout << "file: " << file << " is not complete." << std::endl;
121 continue;
122 }
123
124 record_readers_.emplace_back(record_reader);
125
126 auto channel_list = record_reader->GetChannelList();
127 // loop each channel info
128 for (auto& channel_name : channel_list) {
129 if (play_param_.black_channels.find(channel_name) !=
130 play_param_.black_channels.end()) {
131 // minus the black message number from record file header
132 total_msg_num_ -= record_reader->GetMessageNumber(channel_name);
133 continue;
134 }
135
136 auto& msg_type = record_reader->GetMessageType(channel_name);
137 msg_types_[channel_name] = msg_type;
138
139 if (!play_param_.is_play_all_channels &&
140 play_param_.channels_to_play.count(channel_name) > 0) {
141 total_msg_num_ += record_reader->GetMessageNumber(channel_name);
142 }
143
144 auto& proto_desc = record_reader->GetProtoDesc(channel_name);
145 pb_factory->RegisterMessage(proto_desc);
146 }
147
148 auto& header = record_reader->GetHeader();
149 if (play_param_.is_play_all_channels) {
150 total_msg_num_ += header.message_number();
151 }
152
153 if (header.begin_time() < earliest_begin_time_) {
154 earliest_begin_time_ = header.begin_time();
155 }
156 if (header.end_time() > latest_end_time_) {
157 latest_end_time_ = header.end_time();
158 }
159
160 auto begin_time_s = static_cast<double>(header.begin_time()) / 1e9;
161 auto end_time_s = static_cast<double>(header.end_time()) / 1e9;
162 auto begin_time_str =
163 common::UnixSecondsToString(static_cast<int>(begin_time_s));
164 auto end_time_str =
165 common::UnixSecondsToString(static_cast<int>(end_time_s));
166
167 std::cout << "file: " << file << ", chunk_number: " << header.chunk_number()
168 << ", begin_time: " << header.begin_time() << " ("
169 << begin_time_str << ")"
170 << ", end_time: " << header.end_time() << " (" << end_time_str
171 << ")"
172 << ", message_number: " << header.message_number() << std::endl;
173 }
174
175 std::cout << "earliest_begin_time: " << earliest_begin_time_
176 << ", latest_end_time: " << latest_end_time_
177 << ", total_msg_num: " << total_msg_num_ << std::endl;
178
179 return true;
180}
181
182bool PlayTaskProducer::UpdatePlayParam() {
183 if (play_param_.begin_time_ns < earliest_begin_time_) {
184 play_param_.begin_time_ns = earliest_begin_time_;
185 play_param_.base_begin_time_ns = earliest_begin_time_;
186 }
187 if (play_param_.start_time_s > 0) {
188 play_param_.begin_time_ns =
189 play_param_.base_begin_time_ns +
190 static_cast<uint64_t>(static_cast<double>(play_param_.start_time_s) *
191 1e9);
192 }
193 if (play_param_.end_time_ns > latest_end_time_) {
194 play_param_.end_time_ns = latest_end_time_;
195 }
196 if (play_param_.begin_time_ns >= play_param_.end_time_ns) {
197 AERROR << "begin time are equal or larger than end time"
198 << ", begin_time_ns=" << play_param_.begin_time_ns
199 << ", end_time_ns=" << play_param_.end_time_ns;
200 return false;
201 }
202 if (play_param_.preload_time_s == 0) {
203 AINFO << "preload time is zero, we will use defalut value: "
204 << kPreloadTimeSec << " seconds.";
205 play_param_.preload_time_s = kPreloadTimeSec;
206 }
207 return true;
208}
209
210void PlayTaskProducer::WriteRecordProgress(const double& curr_time_s,
211 const double& total_time_s) {
212 RecordInfo record_info;
213 record_info.set_total_time_s(total_time_s);
214 record_info.set_curr_time_s(curr_time_s);
215 record_info.set_progress(curr_time_s / total_time_s);
216 record_info.set_record_name(play_param_.record_id);
217 std::string content;
218 record_info.SerializeToString(&content);
219 auto raw_msg = std::make_shared<message::RawMessage>(content);
220 writers_[record_info_channel]->Write(raw_msg);
221}
222
223void PlayTaskProducer::Reset(const double& progress_s) {
224 play_param_.begin_time_ns = play_param_.base_begin_time_ns + progress_s * 1e9;
225 play_param_.start_time_s = progress_s;
226 record_viewer_ptr_ = nullptr;
227 record_viewer_ptr_ = std::make_shared<RecordViewer>(
228 record_readers_, play_param_.begin_time_ns, play_param_.end_time_ns,
229 play_param_.channels_to_play);
230 record_viewer_ptr_->set_curr_itr(record_viewer_ptr_->begin());
231}
232
233bool PlayTaskProducer::CreatePlayTaskWriter(const std::string& channel_name,
234 const std::string& msg_type) {
236 attr.set_channel_name(channel_name);
237 attr.set_message_type(msg_type);
238 auto writer = node_->CreateWriter<message::RawMessage>(attr);
239 if (writer == nullptr) {
240 AERROR << "create writer failed. channel name: " << channel_name
241 << ", message type: " << msg_type;
242 return false;
243 }
244 writers_[channel_name] = writer;
245 return true;
246}
247
248bool PlayTaskProducer::CreateWriters() {
249 if (node_ == nullptr && !preload_fill_buffer_mode_) {
250 std::string node_name = "cyber_recorder_play_" + std::to_string(getpid());
251 node_ = apollo::cyber::CreateNode(node_name);
252 if (node_ == nullptr) {
253 AERROR << "create node failed.";
254 return false;
255 }
256 }
257
258 for (auto& item : msg_types_) {
259 auto& channel_name = item.first;
260 auto& msg_type = item.second;
261
262 if (play_param_.is_play_all_channels ||
263 play_param_.channels_to_play.count(channel_name) > 0) {
264 if (play_param_.black_channels.find(channel_name) !=
265 play_param_.black_channels.end()) {
266 continue;
267 }
268 if (!CreatePlayTaskWriter(channel_name, msg_type)) {
269 return false;
270 }
271 }
272 }
273 return CreatePlayTaskWriter(record_info_channel,
274 "apollo.cyber.proto.RecordInfo");
275}
276
278 task_buffer_->Clear();
279 // use fixed preload buffer size
280 uint32_t preload_size = kMinTaskBufferSize * 2;
281
282 if (!record_viewer_ptr_) {
283 record_viewer_ptr_ = std::make_shared<RecordViewer>(
284 record_readers_, play_param_.begin_time_ns, play_param_.end_time_ns,
285 play_param_.channels_to_play);
286 record_viewer_ptr_->set_curr_itr(record_viewer_ptr_->begin());
287 }
288
289 auto itr = record_viewer_ptr_->curr_itr();
290
291 for (; itr != record_viewer_ptr_->end(); ++itr) {
292 if (task_buffer_->Size() > preload_size) {
293 record_viewer_ptr_->set_curr_itr(itr);
294 break;
295 }
296
297 auto search = writers_.find(itr->channel_name);
298 if (search == writers_.end()) {
299 continue;
300 }
301
302 auto raw_msg = std::make_shared<message::RawMessage>(itr->content);
303 auto task = std::make_shared<PlayTask>(raw_msg, search->second, itr->time,
304 itr->time);
305 task_buffer_->Push(task);
306 }
307}
308
309void PlayTaskProducer::ThreadFuncUnderPreloadMode() {
310 const uint64_t loop_time_ns =
311 play_param_.end_time_ns - play_param_.begin_time_ns;
312 uint64_t avg_interval_time_ns = kSleepIntervalNanoSec;
313 if (total_msg_num_ > 0) {
314 avg_interval_time_ns = loop_time_ns / total_msg_num_;
315 }
316
317 uint32_t preload_size = kMinTaskBufferSize * 2;
318
319 if (preload_fill_buffer_mode_ && !record_viewer_ptr_) {
320 AERROR << "Preload should not nullptr";
321 return;
322 }
323 if (!preload_fill_buffer_mode_ && record_viewer_ptr_) {
324 AERROR << "No preload should nullptr";
325 return;
326 }
327 if (!record_viewer_ptr_) {
328 record_viewer_ptr_ = std::make_shared<RecordViewer>(
329 record_readers_, play_param_.begin_time_ns, play_param_.end_time_ns,
330 play_param_.channels_to_play);
331 record_viewer_ptr_->set_curr_itr(record_viewer_ptr_->begin());
332 }
333
334 while (!is_stopped_.load()) {
335 auto itr = record_viewer_ptr_->curr_itr();
336 auto itr_end = record_viewer_ptr_->end();
337
338 while (itr != itr_end && !is_stopped_.load()) {
339 while (!is_stopped_.load() && task_buffer_->Size() > preload_size) {
340 std::this_thread::sleep_for(
341 std::chrono::nanoseconds(avg_interval_time_ns));
342 }
343 for (; itr != itr_end && !is_stopped_.load(); ++itr) {
344 if (task_buffer_->Size() > preload_size) {
345 break;
346 }
347
348 auto search = writers_.find(itr->channel_name);
349 if (search == writers_.end()) {
350 continue;
351 }
352
353 auto raw_msg = std::make_shared<message::RawMessage>(itr->content);
354 auto task = std::make_shared<PlayTask>(raw_msg, search->second,
355 itr->time, itr->time);
356 task_buffer_->Push(task);
357 }
358 }
359 // not support loop
360 is_stopped_.store(true);
361 break;
362 }
363}
364
365void PlayTaskProducer::ThreadFunc() {
366 const uint64_t loop_time_ns =
367 play_param_.end_time_ns - play_param_.begin_time_ns;
368 uint64_t avg_interval_time_ns = kSleepIntervalNanoSec;
369 if (total_msg_num_ > 0) {
370 avg_interval_time_ns = loop_time_ns / total_msg_num_;
371 }
372
373 double avg_freq_hz = static_cast<double>(total_msg_num_) /
374 (static_cast<double>(loop_time_ns) * 1e-9);
375 uint32_t preload_size = (uint32_t)avg_freq_hz * play_param_.preload_time_s;
376 AINFO << "preload_size: " << preload_size;
377 if (preload_size < kMinTaskBufferSize) {
378 preload_size = kMinTaskBufferSize;
379 }
380
381 record_viewer_ptr_ = std::make_shared<RecordViewer>(
382 record_readers_, play_param_.begin_time_ns, play_param_.end_time_ns,
383 play_param_.channels_to_play);
384 record_viewer_ptr_->set_curr_itr(record_viewer_ptr_->begin());
385
386 uint32_t loop_num = 0;
387 while (!is_stopped_.load()) {
388 uint64_t plus_time_ns = loop_num * loop_time_ns;
389 auto itr = record_viewer_ptr_->begin();
390 auto itr_end = record_viewer_ptr_->end();
391
392 while (itr != itr_end && !is_stopped_.load()) {
393 while (!is_stopped_.load() && task_buffer_->Size() > preload_size) {
394 std::this_thread::sleep_for(
395 std::chrono::nanoseconds(avg_interval_time_ns));
396 }
397 for (; itr != itr_end && !is_stopped_.load(); ++itr) {
398 if (task_buffer_->Size() > preload_size) {
399 break;
400 }
401
402 auto search = writers_.find(itr->channel_name);
403 if (search == writers_.end()) {
404 continue;
405 }
406
407 auto raw_msg = std::make_shared<message::RawMessage>(itr->content);
408 auto task = std::make_shared<PlayTask>(
409 raw_msg, search->second, itr->time, itr->time + plus_time_ns);
410 task_buffer_->Push(task);
411 }
412 }
413
414 if (!play_param_.is_loop_playback) {
415 is_stopped_.store(true);
416 break;
417 }
418 ++loop_num;
419 }
420}
421
422} // namespace record
423} // namespace cyber
424} // namespace apollo
void WriteRecordProgress(const double &curr_time_s, const double &total_time_s)
std::shared_ptr< PlayTaskBuffer > TaskBufferPtr
PlayTaskProducer(const TaskBufferPtr &task_buffer, const PlayParam &play_param, const NodePtr &node=nullptr, const bool preload_fill_buffer_mode=false)
void FillPlayTaskBuffer()
Preload the player,producer fill play_task_buffer before playing.
void Reset(const double &progress_time_s)
Reset player producer for dv will repeatedly use it.
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
std::string UnixSecondsToString(uint64_t unix_seconds, const std::string &format_str="%Y-%m-%d-%H:%M:%S")
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space)
Definition cyber.cc:33
class register implement
Definition arena_queue.h:37
Definition future.h:29
std::set< std::string > black_channels
Definition play_param.h:41
std::set< std::string > files_to_play
Definition play_param.h:39
std::set< std::string > channels_to_play
Definition play_param.h:40