Apollo 10.0
自动驾驶开放平台
apollo::cyber::record::PlayTaskProducer类 参考

#include <play_task_producer.h>

apollo::cyber::record::PlayTaskProducer 的协作图:

Public 类型

using NodePtr = std::shared_ptr< Node >
 
using ThreadPtr = std::unique_ptr< std::thread >
 
using TaskBufferPtr = std::shared_ptr< PlayTaskBuffer >
 
using RecordReaderPtr = std::shared_ptr< RecordReader >
 
using WriterPtr = std::shared_ptr< Writer< message::RawMessage > >
 
using WriterMap = std::unordered_map< std::string, WriterPtr >
 
using MessageTypeMap = std::unordered_map< std::string, std::string >
 
using RecordViewerPtr = std::shared_ptr< RecordViewer >
 

Public 成员函数

 PlayTaskProducer (const TaskBufferPtr &task_buffer, const PlayParam &play_param, const NodePtr &node=nullptr, const bool preload_fill_buffer_mode=false)
 
virtual ~PlayTaskProducer ()
 
bool Init ()
 
void Start ()
 
void Stop ()
 
const PlayParamplay_param () const
 
bool is_stopped () const
 
bool is_initialized () const
 
void set_stopped ()
 
void WriteRecordProgress (const double &curr_time_s, const double &total_time_s)
 
void FillPlayTaskBuffer ()
 Preload the player,producer fill play_task_buffer before playing.
 
void Reset (const double &progress_time_s)
 Reset player producer for dv will repeatedly use it.
 

详细描述

在文件 play_task_producer.h45 行定义.

成员类型定义说明

◆ MessageTypeMap

using apollo::cyber::record::PlayTaskProducer::MessageTypeMap = std::unordered_map<std::string, std::string>

在文件 play_task_producer.h53 行定义.

◆ NodePtr

在文件 play_task_producer.h47 行定义.

◆ RecordReaderPtr

◆ RecordViewerPtr

◆ TaskBufferPtr

◆ ThreadPtr

using apollo::cyber::record::PlayTaskProducer::ThreadPtr = std::unique_ptr<std::thread>

在文件 play_task_producer.h48 行定义.

◆ WriterMap

using apollo::cyber::record::PlayTaskProducer::WriterMap = std::unordered_map<std::string, WriterPtr>

在文件 play_task_producer.h52 行定义.

◆ WriterPtr

构造及析构函数说明

◆ PlayTaskProducer()

apollo::cyber::record::PlayTaskProducer::PlayTaskProducer ( const TaskBufferPtr task_buffer,
const PlayParam play_param,
const NodePtr node = nullptr,
const bool  preload_fill_buffer_mode = false 
)

在文件 play_task_producer.cc36 行定义.

40 : play_param_(play_param),
41 task_buffer_(task_buffer),
42 produce_th_(nullptr),
43 is_initialized_(false),
44 is_stopped_(true),
45 node_(node),
46 record_viewer_ptr_(nullptr),
47 earliest_begin_time_(std::numeric_limits<uint64_t>::max()),
48 latest_end_time_(0),
49 total_msg_num_(0),
50 preload_fill_buffer_mode_(preload_fill_buffer_mode) {}

◆ ~PlayTaskProducer()

apollo::cyber::record::PlayTaskProducer::~PlayTaskProducer ( )
virtual

成员函数说明

◆ FillPlayTaskBuffer()

void apollo::cyber::record::PlayTaskProducer::FillPlayTaskBuffer ( )

Preload the player,producer fill play_task_buffer before playing.

在文件 play_task_producer.cc277 行定义.

277 {
278 task_buffer_->Clear();
279 // use fixed preload buffer size
280 uint32_t preload_size = kMinTaskBufferSize * 2;
281
282 if (!record_viewer_ptr_) {
283 record_viewer_ptr_ = std::make_shared<RecordViewer>(
284 record_readers_, play_param_.begin_time_ns, play_param_.end_time_ns,
285 play_param_.channels_to_play);
286 record_viewer_ptr_->set_curr_itr(record_viewer_ptr_->begin());
287 }
288
289 auto itr = record_viewer_ptr_->curr_itr();
290
291 for (; itr != record_viewer_ptr_->end(); ++itr) {
292 if (task_buffer_->Size() > preload_size) {
293 record_viewer_ptr_->set_curr_itr(itr);
294 break;
295 }
296
297 auto search = writers_.find(itr->channel_name);
298 if (search == writers_.end()) {
299 continue;
300 }
301
302 auto raw_msg = std::make_shared<message::RawMessage>(itr->content);
303 auto task = std::make_shared<PlayTask>(raw_msg, search->second, itr->time,
304 itr->time);
305 task_buffer_->Push(task);
306 }
307}
std::set< std::string > channels_to_play
Definition play_param.h:40

◆ Init()

bool apollo::cyber::record::PlayTaskProducer::Init ( )

在文件 play_task_producer.cc54 行定义.

54 {
55 if (is_initialized_.exchange(true)) {
56 AERROR << "producer has been initialized.";
57 return false;
58 }
59 if (preload_fill_buffer_mode_ && node_ == nullptr) {
60 AERROR << "node from same process should pass node param for construct";
61 return false;
62 }
63 if (!preload_fill_buffer_mode_ && node_ != nullptr) {
64 AERROR << "invalid param: nullptr node";
65 node_ = nullptr;
66 }
67
68 if (!ReadRecordInfo() || !UpdatePlayParam() || !CreateWriters()) {
69 is_initialized_.store(false);
70 return false;
71 }
72
73 return true;
74}
#define AERROR
Definition log.h:44

◆ is_initialized()

bool apollo::cyber::record::PlayTaskProducer::is_initialized ( ) const
inline

在文件 play_task_producer.h68 行定义.

68{ return is_initialized_.load(); }

◆ is_stopped()

bool apollo::cyber::record::PlayTaskProducer::is_stopped ( ) const
inline

在文件 play_task_producer.h67 行定义.

67{ return is_stopped_.load(); }

◆ play_param()

const PlayParam & apollo::cyber::record::PlayTaskProducer::play_param ( ) const
inline

在文件 play_task_producer.h66 行定义.

66{ return play_param_; }

◆ Reset()

void apollo::cyber::record::PlayTaskProducer::Reset ( const double &  progress_time_s)

Reset player producer for dv will repeatedly use it.

reset the start time when dv reset play record progress.

在文件 play_task_producer.cc223 行定义.

223 {
224 play_param_.begin_time_ns = play_param_.base_begin_time_ns + progress_s * 1e9;
225 play_param_.start_time_s = progress_s;
226 record_viewer_ptr_ = nullptr;
227 record_viewer_ptr_ = std::make_shared<RecordViewer>(
228 record_readers_, play_param_.begin_time_ns, play_param_.end_time_ns,
229 play_param_.channels_to_play);
230 record_viewer_ptr_->set_curr_itr(record_viewer_ptr_->begin());
231}

◆ set_stopped()

void apollo::cyber::record::PlayTaskProducer::set_stopped ( )
inline

在文件 play_task_producer.h69 行定义.

69{ is_stopped_.exchange(true); }

◆ Start()

void apollo::cyber::record::PlayTaskProducer::Start ( )

在文件 play_task_producer.cc76 行定义.

76 {
77 if (!is_initialized_.load()) {
78 AERROR << "please call Init firstly.";
79 return;
80 }
81
82 if (!is_stopped_.exchange(false)) {
83 AERROR << "producer has been started.";
84 return;
85 }
86
87 if (preload_fill_buffer_mode_) {
88 produce_th_.reset(
89 new std::thread(&PlayTaskProducer::ThreadFuncUnderPreloadMode, this));
90 } else {
91 produce_th_.reset(new std::thread(&PlayTaskProducer::ThreadFunc, this));
92 }
93}

◆ Stop()

void apollo::cyber::record::PlayTaskProducer::Stop ( )

在文件 play_task_producer.cc95 行定义.

95 {
96 if (!is_stopped_.exchange(true)) {
97 return;
98 }
99 if (produce_th_ != nullptr && produce_th_->joinable()) {
100 produce_th_->join();
101 produce_th_ = nullptr;
102 }
103}

◆ WriteRecordProgress()

void apollo::cyber::record::PlayTaskProducer::WriteRecordProgress ( const double &  curr_time_s,
const double &  total_time_s 
)

在文件 play_task_producer.cc210 行定义.

211 {
212 RecordInfo record_info;
213 record_info.set_total_time_s(total_time_s);
214 record_info.set_curr_time_s(curr_time_s);
215 record_info.set_progress(curr_time_s / total_time_s);
216 record_info.set_record_name(play_param_.record_id);
217 std::string content;
218 record_info.SerializeToString(&content);
219 auto raw_msg = std::make_shared<message::RawMessage>(content);
220 writers_[record_info_channel]->Write(raw_msg);
221}

该类的文档由以下文件生成: