Apollo 10.0
自动驾驶开放平台
player.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <termios.h>
20
21#include "cyber/init.h"
22
23namespace apollo {
24namespace cyber {
25namespace record {
26
27const uint64_t Player::kSleepIntervalMiliSec = 100;
28
29Player::Player(const PlayParam& play_param, const NodePtr& node,
30 const bool preload_fill_buffer_mode)
31 : is_initialized_(false),
32 is_stopped_(true),
33 consumer_(nullptr),
34 producer_(nullptr),
35 task_buffer_(nullptr) {
36 task_buffer_ = std::make_shared<PlayTaskBuffer>();
37 consumer_.reset(new PlayTaskConsumer(task_buffer_, play_param.play_rate));
38 producer_.reset(new PlayTaskProducer(task_buffer_, play_param, node,
39 preload_fill_buffer_mode));
40}
41
43
45 if (is_initialized_.exchange(true)) {
46 AERROR << "player has been initialized.";
47 return false;
48 }
49
50 if (producer_->Init()) {
51 return true;
52 }
53
54 is_initialized_.store(false);
55 return false;
56}
57
58static char Getch() {
59 char buf = 0;
60 struct termios old = {0};
61 fflush(stdout);
62 if (tcgetattr(0, &old) < 0) {
63 perror("tcsetattr()");
64 }
65 old.c_lflag &= ~ICANON;
66 old.c_lflag &= ~ECHO;
67 old.c_cc[VMIN] = 0;
68 old.c_cc[VTIME] = 1;
69 if (tcsetattr(0, TCSANOW, &old) < 0) {
70 perror("tcsetattr ICANON");
71 }
72 if (read(0, &buf, 1) < 0) {
73 perror("read()");
74 }
75 old.c_lflag |= ICANON;
76 old.c_lflag |= ECHO;
77 if (tcsetattr(0, TCSADRAIN, &old) < 0) {
78 perror("tcsetattr ~ICANON");
79 }
80 return buf;
81}
82
83bool Player::ThreadFunc_Play_Nohup() {
84 if (!is_initialized_.load()) {
85 AERROR << "please call Init firstly.";
86 return false;
87 }
88
89 if (!is_stopped_.exchange(false)) {
90 AERROR << "player has been stopped.";
91 return false;
92 }
93 auto& play_param = producer_->play_param();
94 producer_->Start();
95 consumer_->Start(play_param.begin_time_ns);
96 const double total_progress_time_s =
97 static_cast<double>(play_param.end_time_ns - play_param.begin_time_ns) /
98 1e9 +
99 static_cast<double>(play_param.start_time_s);
100 while (!is_stopped_.load() && apollo::cyber::OK()) {
101 if (is_paused_) {
102 consumer_->Pause();
103 } else {
104 consumer_->Continue();
105 }
106
107 double progress_time_s =
108 static_cast<double>(producer_->play_param().start_time_s);
109 if (consumer_->last_played_msg_real_time_ns() > 0) {
110 progress_time_s +=
111 static_cast<double>(consumer_->last_played_msg_real_time_ns() -
112 consumer_->base_msg_play_time_ns() +
113 consumer_->base_msg_real_time_ns() -
114 producer_->play_param().begin_time_ns) /
115 1e9;
116 }
117
118 producer_->WriteRecordProgress(progress_time_s, total_progress_time_s);
119
120 if (producer_->is_stopped() && task_buffer_->Empty()) {
121 consumer_->Stop();
122 break;
123 }
124 std::this_thread::sleep_for(
125 std::chrono::milliseconds(kSleepIntervalMiliSec));
126 }
127 return true;
128}
129
130void Player::HandleNohupThreadStatus() { is_paused_ = !is_paused_; }
131
132void Player::ThreadFunc_Term() {
133 while (!is_stopped_.load()) {
134 char ch = Getch();
135 switch (ch) {
136 case 's':
137 is_playonce_ = true;
138 break;
139 case ' ':
140 is_paused_ = !is_paused_;
141 break;
142 default:
143 break;
144 }
145 }
146}
147
149 nohup_play_th_.reset(new std::thread(&Player::ThreadFunc_Play_Nohup, this));
150}
151
152bool Player::PreloadPlayRecord(const double& progress_s, bool paused_status) {
153 if (!producer_->is_initialized()) {
154 return false;
155 }
156 if (is_preloaded_.load()) {
157 producer_->Reset(progress_s);
158 is_preloaded_.store(false);
159 }
160 if (progress_s == 0) {
161 // When the progress is 0, it is completely reloaded. At this
162 // time, the is_paused_ of the player should change to the initial
163 // state to avoid being disturbed by the last state.On the contrary,
164 // reset the progress needs to preserve the past state to ensure the
165 // unity of the state before and after.
166 is_paused_.exchange(false);
167 } else {
168 is_paused_.exchange(paused_status);
169 }
170 producer_->FillPlayTaskBuffer();
171 is_preloaded_.store(true);
172 return true;
173}
174
176 if (!is_initialized_.load()) {
177 AERROR << "please call Init firstly.";
178 return false;
179 }
180
181 if (!is_stopped_.exchange(false)) {
182 AERROR << "player has been stopped.";
183 return false;
184 }
185
186 auto& play_param = producer_->play_param();
187 std::cout << "\nPlease wait " << play_param.preload_time_s
188 << " second(s) for loading...\n"
189 << "Hit Ctrl+C to stop, Space to pause, or 's' to step.\n"
190 << std::endl;
191 producer_->Start();
192
193 auto preload_sec = play_param.preload_time_s;
194 while (preload_sec > 0 && !is_stopped_.load() && apollo::cyber::OK()) {
195 std::this_thread::sleep_for(std::chrono::seconds(1));
196 --preload_sec;
197 }
198
199 auto delay_sec = play_param.delay_time_s;
200 while (delay_sec > 0 && !is_stopped_.load() && apollo::cyber::OK()) {
201 std::this_thread::sleep_for(std::chrono::seconds(1));
202 --delay_sec;
203 }
204
205 consumer_->Start(play_param.begin_time_ns);
206
207 std::ios::fmtflags before(std::cout.flags());
208 std::cout << std::fixed;
209 const double total_progress_time_s =
210 static_cast<double>(play_param.end_time_ns - play_param.begin_time_ns) /
211 1e9 +
212 static_cast<double>(play_param.start_time_s);
213
214 term_thread_.reset(new std::thread(&Player::ThreadFunc_Term, this));
215 while (!is_stopped_.load() && apollo::cyber::OK()) {
216 if (is_playonce_) {
217 consumer_->PlayOnce();
218 is_playonce_ = false;
219 }
220 if (is_paused_) {
221 consumer_->Pause();
222 std::cout << "\r[PAUSED ] Record Time: ";
223 } else {
224 consumer_->Continue();
225 std::cout << "\r[RUNNING] Record Time: ";
226 }
227
228 double last_played_msg_real_time_s =
229 static_cast<double>(consumer_->last_played_msg_real_time_ns()) / 1e9;
230
231 double progress_time_s =
232 static_cast<double>(producer_->play_param().start_time_s);
233 if (consumer_->last_played_msg_real_time_ns() > 0) {
234 progress_time_s +=
235 static_cast<double>(consumer_->last_played_msg_real_time_ns() -
236 consumer_->base_msg_play_time_ns() +
237 consumer_->base_msg_real_time_ns() -
238 producer_->play_param().begin_time_ns) /
239 1e9;
240 }
241
242 std::cout << std::setprecision(3) << last_played_msg_real_time_s
243 << " Progress: " << progress_time_s << " / "
244 << total_progress_time_s;
245 std::cout.flush();
246
247 if (producer_->is_stopped() && task_buffer_->Empty()) {
248 consumer_->Stop();
249 break;
250 }
251 std::this_thread::sleep_for(
252 std::chrono::milliseconds(kSleepIntervalMiliSec));
253 }
254
255 std::cout << "\nplay finished." << std::endl;
256 std::cout.flags(before);
257 return true;
258}
259
261 if (is_stopped_.exchange(true)) {
262 return false;
263 }
264 producer_->Stop();
265 consumer_->Stop();
266 if (term_thread_ != nullptr && term_thread_->joinable()) {
267 term_thread_->join();
268 term_thread_ = nullptr;
269 }
270 return true;
271}
272
274 if (is_stopped_.exchange(true)) {
275 return false;
276 }
277 // produer may not be stopped under reset progress
278 // reset is_stopped_ to true to ensure logical unity
279 producer_->set_stopped();
280 producer_->Stop();
281 consumer_->Stop();
282 // clear task buffer for refill it
283 task_buffer_->Clear();
284 if (nohup_play_th_ != nullptr && nohup_play_th_->joinable()) {
285 nohup_play_th_->join();
286 nohup_play_th_ = nullptr;
287 }
288 return true;
289}
290
291} // namespace record
292} // namespace cyber
293} // namespace apollo
Player(const PlayParam &play_param, const NodePtr &node=nullptr, const bool preload_fill_buffer_mode=false)
Definition player.cc:29
void HandleNohupThreadStatus()
Pause or continue to play record by change the nohup process status.
Definition player.cc:130
bool Reset()
Reset player for dv will repeatedly use
Definition player.cc:273
void NohupPlayRecord()
set nohup process to play record.
Definition player.cc:148
bool PreloadPlayRecord(const double &progress_s=0, bool paused_status=false)
Preload the player,fill play_task_buffer ahead ofr time to ensure fast playback and avoid consumer wa...
Definition player.cc:152
std::shared_ptr< apollo::cyber::Node > NodePtr
Definition player.h:39
#define AERROR
Definition log.h:44
bool OK()
Definition state.h:44
class register implement
Definition arena_queue.h:37