31const uint32_t PlayTaskProducer::kMinTaskBufferSize = 500;
32const uint32_t PlayTaskProducer::kPreloadTimeSec = 3;
33const uint64_t PlayTaskProducer::kSleepIntervalNanoSec = 1000000;
39 const bool preload_fill_buffer_mode)
40 : play_param_(play_param),
41 task_buffer_(task_buffer),
43 is_initialized_(false),
46 record_viewer_ptr_(nullptr),
47 earliest_begin_time_(
std::numeric_limits<uint64_t>::max()),
50 preload_fill_buffer_mode_(preload_fill_buffer_mode) {}
55 if (is_initialized_.exchange(
true)) {
56 AERROR <<
"producer has been initialized.";
59 if (preload_fill_buffer_mode_ && node_ ==
nullptr) {
60 AERROR <<
"node from same process should pass node param for construct";
63 if (!preload_fill_buffer_mode_ && node_ !=
nullptr) {
64 AERROR <<
"invalid param: nullptr node";
68 if (!ReadRecordInfo() || !UpdatePlayParam() || !CreateWriters()) {
69 is_initialized_.store(
false);
77 if (!is_initialized_.load()) {
78 AERROR <<
"please call Init firstly.";
82 if (!is_stopped_.exchange(
false)) {
83 AERROR <<
"producer has been started.";
87 if (preload_fill_buffer_mode_) {
89 new std::thread(&PlayTaskProducer::ThreadFuncUnderPreloadMode,
this));
91 produce_th_.reset(
new std::thread(&PlayTaskProducer::ThreadFunc,
this));
96 if (!is_stopped_.exchange(
true)) {
99 if (produce_th_ !=
nullptr && produce_th_->joinable()) {
101 produce_th_ =
nullptr;
105bool PlayTaskProducer::ReadRecordInfo() {
107 AINFO <<
"no file to play.";
111 auto pb_factory = message::ProtobufFactory::Instance();
114 for (
auto& file : play_param_.files_to_play) {
115 auto record_reader = std::make_shared<RecordReader>(file);
116 if (!record_reader->IsValid()) {
119 if (!record_reader->GetHeader().is_complete()) {
120 std::cout <<
"file: " << file <<
" is not complete." << std::endl;
124 record_readers_.emplace_back(record_reader);
126 auto channel_list = record_reader->GetChannelList();
128 for (
auto& channel_name : channel_list) {
132 total_msg_num_ -= record_reader->GetMessageNumber(channel_name);
136 auto& msg_type = record_reader->GetMessageType(channel_name);
137 msg_types_[channel_name] = msg_type;
141 total_msg_num_ += record_reader->GetMessageNumber(channel_name);
144 auto& proto_desc = record_reader->GetProtoDesc(channel_name);
145 pb_factory->RegisterMessage(proto_desc);
148 auto& header = record_reader->GetHeader();
150 total_msg_num_ += header.message_number();
153 if (header.begin_time() < earliest_begin_time_) {
154 earliest_begin_time_ = header.begin_time();
156 if (header.end_time() > latest_end_time_) {
157 latest_end_time_ = header.end_time();
160 auto begin_time_s =
static_cast<double>(header.begin_time()) / 1e9;
161 auto end_time_s =
static_cast<double>(header.end_time()) / 1e9;
162 auto begin_time_str =
167 std::cout <<
"file: " << file <<
", chunk_number: " << header.chunk_number()
168 <<
", begin_time: " << header.begin_time() <<
" ("
169 << begin_time_str <<
")"
170 <<
", end_time: " << header.end_time() <<
" (" << end_time_str
172 <<
", message_number: " << header.message_number() << std::endl;
175 std::cout <<
"earliest_begin_time: " << earliest_begin_time_
176 <<
", latest_end_time: " << latest_end_time_
177 <<
", total_msg_num: " << total_msg_num_ << std::endl;
182bool PlayTaskProducer::UpdatePlayParam() {
190 static_cast<uint64_t
>(
static_cast<double>(play_param_.
start_time_s) *
197 AERROR <<
"begin time are equal or larger than end time"
203 AINFO <<
"preload time is zero, we will use defalut value: "
204 << kPreloadTimeSec <<
" seconds.";
211 const double& total_time_s) {
212 RecordInfo record_info;
213 record_info.set_total_time_s(total_time_s);
214 record_info.set_curr_time_s(curr_time_s);
215 record_info.set_progress(curr_time_s / total_time_s);
216 record_info.set_record_name(play_param_.
record_id);
218 record_info.SerializeToString(&content);
219 auto raw_msg = std::make_shared<message::RawMessage>(content);
226 record_viewer_ptr_ =
nullptr;
227 record_viewer_ptr_ = std::make_shared<RecordViewer>(
230 record_viewer_ptr_->set_curr_itr(record_viewer_ptr_->begin());
233bool PlayTaskProducer::CreatePlayTaskWriter(
const std::string& channel_name,
234 const std::string& msg_type) {
236 attr.set_channel_name(channel_name);
237 attr.set_message_type(msg_type);
239 if (writer ==
nullptr) {
240 AERROR <<
"create writer failed. channel name: " << channel_name
241 <<
", message type: " << msg_type;
244 writers_[channel_name] = writer;
248bool PlayTaskProducer::CreateWriters() {
249 if (node_ ==
nullptr && !preload_fill_buffer_mode_) {
250 std::string node_name =
"cyber_recorder_play_" + std::to_string(getpid());
252 if (node_ ==
nullptr) {
253 AERROR <<
"create node failed.";
258 for (
auto& item : msg_types_) {
259 auto& channel_name = item.first;
260 auto& msg_type = item.second;
268 if (!CreatePlayTaskWriter(channel_name, msg_type)) {
274 "apollo.cyber.proto.RecordInfo");
278 task_buffer_->Clear();
280 uint32_t preload_size = kMinTaskBufferSize * 2;
282 if (!record_viewer_ptr_) {
283 record_viewer_ptr_ = std::make_shared<RecordViewer>(
286 record_viewer_ptr_->set_curr_itr(record_viewer_ptr_->begin());
289 auto itr = record_viewer_ptr_->curr_itr();
291 for (; itr != record_viewer_ptr_->end(); ++itr) {
292 if (task_buffer_->Size() > preload_size) {
293 record_viewer_ptr_->set_curr_itr(itr);
297 auto search = writers_.find(itr->channel_name);
298 if (search == writers_.end()) {
302 auto raw_msg = std::make_shared<message::RawMessage>(itr->content);
303 auto task = std::make_shared<PlayTask>(raw_msg, search->second, itr->time,
305 task_buffer_->Push(task);
309void PlayTaskProducer::ThreadFuncUnderPreloadMode() {
310 const uint64_t loop_time_ns =
312 uint64_t avg_interval_time_ns = kSleepIntervalNanoSec;
313 if (total_msg_num_ > 0) {
314 avg_interval_time_ns = loop_time_ns / total_msg_num_;
317 uint32_t preload_size = kMinTaskBufferSize * 2;
319 if (preload_fill_buffer_mode_ && !record_viewer_ptr_) {
320 AERROR <<
"Preload should not nullptr";
323 if (!preload_fill_buffer_mode_ && record_viewer_ptr_) {
324 AERROR <<
"No preload should nullptr";
327 if (!record_viewer_ptr_) {
328 record_viewer_ptr_ = std::make_shared<RecordViewer>(
331 record_viewer_ptr_->set_curr_itr(record_viewer_ptr_->begin());
334 while (!is_stopped_.load()) {
335 auto itr = record_viewer_ptr_->curr_itr();
336 auto itr_end = record_viewer_ptr_->end();
338 while (itr != itr_end && !is_stopped_.load()) {
339 while (!is_stopped_.load() && task_buffer_->Size() > preload_size) {
340 std::this_thread::sleep_for(
341 std::chrono::nanoseconds(avg_interval_time_ns));
343 for (; itr != itr_end && !is_stopped_.load(); ++itr) {
344 if (task_buffer_->Size() > preload_size) {
348 auto search = writers_.find(itr->channel_name);
349 if (search == writers_.end()) {
353 auto raw_msg = std::make_shared<message::RawMessage>(itr->content);
354 auto task = std::make_shared<PlayTask>(raw_msg, search->second,
355 itr->time, itr->time);
356 task_buffer_->Push(task);
360 is_stopped_.store(
true);
365void PlayTaskProducer::ThreadFunc() {
366 const uint64_t loop_time_ns =
368 uint64_t avg_interval_time_ns = kSleepIntervalNanoSec;
369 if (total_msg_num_ > 0) {
370 avg_interval_time_ns = loop_time_ns / total_msg_num_;
373 double avg_freq_hz =
static_cast<double>(total_msg_num_) /
374 (
static_cast<double>(loop_time_ns) * 1e-9);
375 uint32_t preload_size = (uint32_t)avg_freq_hz * play_param_.
preload_time_s;
376 AINFO <<
"preload_size: " << preload_size;
377 if (preload_size < kMinTaskBufferSize) {
378 preload_size = kMinTaskBufferSize;
381 record_viewer_ptr_ = std::make_shared<RecordViewer>(
384 record_viewer_ptr_->set_curr_itr(record_viewer_ptr_->begin());
386 uint32_t loop_num = 0;
387 while (!is_stopped_.load()) {
388 uint64_t plus_time_ns = loop_num * loop_time_ns;
389 auto itr = record_viewer_ptr_->begin();
390 auto itr_end = record_viewer_ptr_->end();
392 while (itr != itr_end && !is_stopped_.load()) {
393 while (!is_stopped_.load() && task_buffer_->Size() > preload_size) {
394 std::this_thread::sleep_for(
395 std::chrono::nanoseconds(avg_interval_time_ns));
397 for (; itr != itr_end && !is_stopped_.load(); ++itr) {
398 if (task_buffer_->Size() > preload_size) {
402 auto search = writers_.find(itr->channel_name);
403 if (search == writers_.end()) {
407 auto raw_msg = std::make_shared<message::RawMessage>(itr->content);
408 auto task = std::make_shared<PlayTask>(
409 raw_msg, search->second, itr->time, itr->time + plus_time_ns);
410 task_buffer_->Push(task);
415 is_stopped_.store(
true);
void WriteRecordProgress(const double &curr_time_s, const double &total_time_s)
std::shared_ptr< PlayTaskBuffer > TaskBufferPtr
std::shared_ptr< Node > NodePtr
virtual ~PlayTaskProducer()
PlayTaskProducer(const TaskBufferPtr &task_buffer, const PlayParam &play_param, const NodePtr &node=nullptr, const bool preload_fill_buffer_mode=false)
void FillPlayTaskBuffer()
Preload the player,producer fill play_task_buffer before playing.
void Reset(const double &progress_time_s)
Reset player producer for dv will repeatedly use it.
std::string UnixSecondsToString(uint64_t unix_seconds, const std::string &format_str="%Y-%m-%d-%H:%M:%S")
const char record_info_channel[]
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space)
std::set< std::string > black_channels
bool is_play_all_channels
std::set< std::string > files_to_play
uint64_t base_begin_time_ns
std::set< std::string > channels_to_play