Apollo 10.0
自动驾驶开放平台
play_task_consumer.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include "cyber/common/log.h"
20#include "cyber/time/time.h"
21
22namespace apollo {
23namespace cyber {
24namespace record {
25
26const uint64_t PlayTaskConsumer::kPauseSleepNanoSec = 100000000UL;
27const uint64_t PlayTaskConsumer::kWaitProduceSleepNanoSec = 5000000UL;
28const uint64_t PlayTaskConsumer::MIN_SLEEP_DURATION_NS = 200000000UL;
29
31 double play_rate)
32 : play_rate_(play_rate),
33 consume_th_(nullptr),
34 task_buffer_(task_buffer),
35 is_stopped_(true),
36 is_paused_(false),
37 is_playonce_(false),
38 base_msg_play_time_ns_(0),
39 base_msg_real_time_ns_(0),
40 last_played_msg_real_time_ns_(0) {
41 if (play_rate_ <= 0) {
42 AERROR << "invalid play rate: " << play_rate_
43 << " , we will use default value(1.0).";
44 play_rate_ = 1.0;
45 }
46}
47
49
50void PlayTaskConsumer::Start(uint64_t begin_time_ns) {
51 if (!is_stopped_.exchange(false)) {
52 return;
53 }
54 begin_time_ns_ = begin_time_ns;
55 consume_th_.reset(new std::thread(&PlayTaskConsumer::ThreadFunc, this));
56}
57
59 if (is_stopped_.exchange(true)) {
60 return;
61 }
62 if (consume_th_ != nullptr && consume_th_->joinable()) {
63 consume_th_->join();
64 consume_th_ = nullptr;
65 }
66 // reset value for next use consumer
67 base_msg_play_time_ns_ = 0;
68 base_msg_real_time_ns_ = 0;
69 last_played_msg_real_time_ns_ = 0;
70}
71
72void PlayTaskConsumer::ThreadFunc() {
73 uint64_t base_real_time_ns = 0;
74 uint64_t accumulated_pause_time_ns = 0;
75
76 while (!is_stopped_.load()) {
77 auto task = task_buffer_->Front();
78 if (task == nullptr) {
79 std::this_thread::sleep_for(
80 std::chrono::nanoseconds(kWaitProduceSleepNanoSec));
81 continue;
82 }
83
84 uint64_t sleep_ns = 0;
85
86 if (base_msg_play_time_ns_ == 0) {
87 base_msg_play_time_ns_ = task->msg_play_time_ns();
88 base_msg_real_time_ns_ = task->msg_real_time_ns();
89 if (base_msg_play_time_ns_ > begin_time_ns_) {
90 sleep_ns = static_cast<uint64_t>(
91 static_cast<double>(base_msg_play_time_ns_ - begin_time_ns_) /
92 play_rate_);
93 while (sleep_ns > MIN_SLEEP_DURATION_NS && !is_stopped_.load()) {
94 std::this_thread::sleep_for(
95 std::chrono::nanoseconds(MIN_SLEEP_DURATION_NS));
96 sleep_ns -= MIN_SLEEP_DURATION_NS;
97 }
98
99 if (is_stopped_.load()) {
100 break;
101 }
102
103 std::this_thread::sleep_for(std::chrono::nanoseconds(sleep_ns));
104 }
105 base_real_time_ns = Time::Now().ToNanosecond();
106 ADEBUG << "base_msg_play_time_ns: " << base_msg_play_time_ns_
107 << "base_real_time_ns: " << base_real_time_ns;
108 }
109
110 uint64_t task_interval_ns = static_cast<uint64_t>(
111 static_cast<double>(task->msg_play_time_ns() - base_msg_play_time_ns_) /
112 play_rate_);
113 uint64_t real_time_interval_ns = Time::Now().ToNanosecond() -
114 base_real_time_ns -
115 accumulated_pause_time_ns;
116 if (task_interval_ns > real_time_interval_ns) {
117 sleep_ns = task_interval_ns - real_time_interval_ns;
118 std::this_thread::sleep_for(std::chrono::nanoseconds(sleep_ns));
119 }
120
121 task->Play();
122 is_playonce_.store(false);
123
124 last_played_msg_real_time_ns_ = task->msg_real_time_ns();
125 while (is_paused_.load() && !is_stopped_.load()) {
126 if (is_playonce_.load()) {
127 break;
128 }
129 std::this_thread::sleep_for(std::chrono::nanoseconds(kPauseSleepNanoSec));
130 accumulated_pause_time_ns += kPauseSleepNanoSec;
131 }
132 task_buffer_->PopFront();
133 }
134}
135
136} // namespace record
137} // namespace cyber
138} // namespace apollo
uint64_t ToNanosecond() const
convert time to nanosecond.
Definition time.cc:83
static Time Now()
get the current time.
Definition time.cc:57
std::shared_ptr< PlayTaskBuffer > TaskBufferPtr
PlayTaskConsumer(const TaskBufferPtr &task_buffer, double play_rate=1.0)
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
class register implement
Definition arena_queue.h:37