26const uint64_t PlayTaskConsumer::kPauseSleepNanoSec = 100000000UL;
27const uint64_t PlayTaskConsumer::kWaitProduceSleepNanoSec = 5000000UL;
28const uint64_t PlayTaskConsumer::MIN_SLEEP_DURATION_NS = 200000000UL;
32 : play_rate_(play_rate),
34 task_buffer_(task_buffer),
38 base_msg_play_time_ns_(0),
39 base_msg_real_time_ns_(0),
40 last_played_msg_real_time_ns_(0) {
41 if (play_rate_ <= 0) {
42 AERROR <<
"invalid play rate: " << play_rate_
43 <<
" , we will use default value(1.0).";
51 if (!is_stopped_.exchange(
false)) {
54 begin_time_ns_ = begin_time_ns;
55 consume_th_.reset(
new std::thread(&PlayTaskConsumer::ThreadFunc,
this));
59 if (is_stopped_.exchange(
true)) {
62 if (consume_th_ !=
nullptr && consume_th_->joinable()) {
64 consume_th_ =
nullptr;
67 base_msg_play_time_ns_ = 0;
68 base_msg_real_time_ns_ = 0;
69 last_played_msg_real_time_ns_ = 0;
72void PlayTaskConsumer::ThreadFunc() {
73 uint64_t base_real_time_ns = 0;
74 uint64_t accumulated_pause_time_ns = 0;
76 while (!is_stopped_.load()) {
77 auto task = task_buffer_->Front();
78 if (task ==
nullptr) {
79 std::this_thread::sleep_for(
80 std::chrono::nanoseconds(kWaitProduceSleepNanoSec));
84 uint64_t sleep_ns = 0;
86 if (base_msg_play_time_ns_ == 0) {
87 base_msg_play_time_ns_ = task->msg_play_time_ns();
88 base_msg_real_time_ns_ = task->msg_real_time_ns();
89 if (base_msg_play_time_ns_ > begin_time_ns_) {
90 sleep_ns =
static_cast<uint64_t
>(
91 static_cast<double>(base_msg_play_time_ns_ - begin_time_ns_) /
93 while (sleep_ns > MIN_SLEEP_DURATION_NS && !is_stopped_.load()) {
94 std::this_thread::sleep_for(
95 std::chrono::nanoseconds(MIN_SLEEP_DURATION_NS));
96 sleep_ns -= MIN_SLEEP_DURATION_NS;
99 if (is_stopped_.load()) {
103 std::this_thread::sleep_for(std::chrono::nanoseconds(sleep_ns));
106 ADEBUG <<
"base_msg_play_time_ns: " << base_msg_play_time_ns_
107 <<
"base_real_time_ns: " << base_real_time_ns;
110 uint64_t task_interval_ns =
static_cast<uint64_t
>(
111 static_cast<double>(task->msg_play_time_ns() - base_msg_play_time_ns_) /
115 accumulated_pause_time_ns;
116 if (task_interval_ns > real_time_interval_ns) {
117 sleep_ns = task_interval_ns - real_time_interval_ns;
118 std::this_thread::sleep_for(std::chrono::nanoseconds(sleep_ns));
122 is_playonce_.store(
false);
124 last_played_msg_real_time_ns_ = task->msg_real_time_ns();
125 while (is_paused_.load() && !is_stopped_.load()) {
126 if (is_playonce_.load()) {
129 std::this_thread::sleep_for(std::chrono::nanoseconds(kPauseSleepNanoSec));
130 accumulated_pause_time_ns += kPauseSleepNanoSec;
132 task_buffer_->PopFront();
uint64_t ToNanosecond() const
convert time to nanosecond.
static Time Now()
get the current time.
std::shared_ptr< PlayTaskBuffer > TaskBufferPtr
void Start(uint64_t begin_time_ns)
PlayTaskConsumer(const TaskBufferPtr &task_buffer, double play_rate=1.0)
virtual ~PlayTaskConsumer()