Apollo 10.0
自动驾驶开放平台
teleop.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2019 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include "cyber/common/log.h"
20#include "cyber/time/time.h"
21#include "google/protobuf/util/json_util.h"
24
25#include <iomanip>
26#include <sstream>
27
28namespace apollo {
29namespace dreamview {
30
31using Json = nlohmann::json;
34using apollo::planning::DrivingAction;
35using apollo::planning::ScenarioConfig;
36using ::google::protobuf::util::MessageToJsonString;
42
43// modem ids
44const std::string modem0_id = "0";
45const std::string modem1_id = "1";
46const std::string modem2_id = "2";
47
48// number of simultaneous video encoders
49static const unsigned int kEncoderCount = 3;
50// delay between to consecutive msg writes
51static const unsigned int kWriteWaitMs = 50;
52
53const std::string start_cmd = "start";
54const std::string stop_cmd = "kill";
55
56// channels
57const std::string modem0_channel = "/apollo/teleop/modem/modem0";
58const std::string modem1_channel = "/apollo/teleop/modem/modem1";
59const std::string modem2_channel = "/apollo/teleop/modem/modem2";
60const std::string remote_daemon_cmd_channel =
61 "/apollo/teleop/daemon/remote/cmd";
62const std::string remote_daemon_rpt_channel =
63 "/apollo/teleop/daemon/remote/rpt";
64const std::string local_daemon_cmd_channel = "/apollo/teleop/daemon/local/cmd";
65const std::string local_daemon_rpt_channel = "/apollo/teleop/daemon/local/rpt";
66const std::string planning_channel = "/apollo/planning";
67const std::string planning_pad_channel = "/apollo/planning/pad";
68
70 : node_(cyber::CreateNode("teleop")), websocket_(websocket) {
71 RegisterMessageHandlers();
72
73 teleop_status_["audio"] = false;
74 teleop_status_["audio_starting"] = false;
75 teleop_status_["audio_stopping"] = false;
76 teleop_status_["mic"] = false;
77 teleop_status_["mic_starting"] = false;
78 teleop_status_["mic_stopping"] = false;
79 teleop_status_["video"] = false;
80 teleop_status_["video_starting"] = false;
81 teleop_status_["video_stopping"] = false;
82 teleop_status_["pulling_over"] = false;
83 teleop_status_["e_stopping"] = false;
84 teleop_status_["resuming_autonomy"] = false;
85}
86
88 // TODO get topic names from proto
89 // TODO update proto to get all modems' info combined with rank
90
91 modem0_info_reader_ = node_->CreateReader<ModemInfo>(
92 modem0_channel, [this](const std::shared_ptr<ModemInfo> &msg) {
93 UpdateModem(modem0_id, msg);
94 });
95
96 modem1_info_reader_ = node_->CreateReader<ModemInfo>(
97 modem1_channel, [this](const std::shared_ptr<ModemInfo> &msg) {
98 UpdateModem(modem1_id, msg);
99 });
100
101 modem2_info_reader_ = node_->CreateReader<ModemInfo>(
102 modem2_channel, [this](const std::shared_ptr<ModemInfo> &msg) {
103 UpdateModem(modem2_id, msg);
104 });
105
106 planning_reader_ = node_->CreateReader<ADCTrajectory>(
107 planning_channel, [this](const std::shared_ptr<ADCTrajectory> &msg) {
108 UpdatePlanning(msg);
109 });
110
111 remote_daemon_cmd_writer_ =
112 node_->CreateWriter<DaemonCmd>(remote_daemon_cmd_channel);
113
114 local_daemon_cmd_writer_ =
115 node_->CreateWriter<DaemonCmd>(local_daemon_cmd_channel);
116
117 remote_daemon_rpt_reader_ = node_->CreateReader<DaemonRpt>(
119 [this](const std::shared_ptr<DaemonRpt> &msg) {
120 UpdateCarDaemonRpt(msg);
121 });
122
123 local_daemon_rpt_reader_ = node_->CreateReader<DaemonRpt>(
125 [this](const std::shared_ptr<DaemonRpt> &msg) {
126 UpdateOperatorDaemonRpt(msg);
127 });
128
129 action_command_client_ =
130 node_->CreateClient<apollo::external_command::ActionCommand,
131 CommandStatus>(FLAGS_action_command_topic);
132}
133
134void TeleopService::RegisterMessageHandlers() {
135 // Send current teleop status to the new client.
137 [this](WebSocketHandler::Connection *conn) { SendStatus(conn); });
138 // Start/Stop local and remote audio
139 websocket_->RegisterMessageHandler(
140 "ToggleAudio",
141 [this](const Json &json, WebSocketHandler::Connection *conn) {
142 {
143 bool start = false; // false means stop
144 // create a scope for the mutex lock
145 {
146 boost::unique_lock<boost::shared_mutex> writer_lock(mutex_);
147 // toggle depending on current state change
148 if (teleop_status_["audio_starting"]) {
149 teleop_status_["audio_starting"] = false;
150 teleop_status_["audio_stopping"] = true;
151 } else if (teleop_status_["audio_stopping"]) {
152 teleop_status_["audio_starting"] = true;
153 teleop_status_["audio_stopping"] = false;
154 }
155 // not currently starting or stopping video
156 else {
157 // toggle depending on current state
158 if (teleop_status_["audio"]) {
159 teleop_status_["audio_stopping"] = true;
160 teleop_status_["audio_starting"] = false;
161 } else {
162 teleop_status_["audio_stopping"] = false;
163 teleop_status_["audio_starting"] = true;
164 }
165 }
166 start = teleop_status_["audio_starting"];
167 }
168 AINFO << "ToggleAudio: " << start;
169 SendAudioStreamCmd(start);
170 }
171 });
172 // Mute/Unmute local microphone
173 websocket_->RegisterMessageHandler(
174 "ToggleMic",
175 [this](const Json &json, WebSocketHandler::Connection *conn) {
176 bool start = false;
177 {
178 boost::unique_lock<boost::shared_mutex> writer_lock(mutex_);
179 // toggle depending on current state change
180 if (teleop_status_["mic_starting"]) {
181 teleop_status_["mic_starting"] = false;
182 teleop_status_["mic_stopping"] = true;
183 } else if (teleop_status_["mic_stopping"]) {
184 teleop_status_["mic_starting"] = true;
185 teleop_status_["mic_stopping"] = false;
186 }
187 // not currently starting or stopping video
188 else {
189 // toggle depending on current state
190 if (teleop_status_["mic"]) {
191 teleop_status_["mic_stopping"] = true;
192 teleop_status_["mic_starting"] = false;
193 } else {
194 teleop_status_["mic_stopping"] = false;
195 teleop_status_["mic_starting"] = true;
196 }
197 }
198 start = teleop_status_["mic_starting"];
199 }
200 AINFO << "ToggleMic: " << start;
201 SendMicStreamCmd(start);
202 });
203 // Start/stop local decoder and viewer, start/stop remote encoder and
204 // compositor
205 websocket_->RegisterMessageHandler(
206 "ToggleVideo",
207 [this](const Json &json, WebSocketHandler::Connection *conn) {
208 bool start = false; // false means stop
209 // create a scope for the mutex lock
210 {
211 boost::shared_lock<boost::shared_mutex> writer_lock(mutex_);
212 // toggle depending on current state change
213 if (teleop_status_["video_starting"]) {
214 teleop_status_["video_starting"] = false;
215 teleop_status_["video_stopping"] = true;
216 } else if (teleop_status_["video_stopping"]) {
217 teleop_status_["video_starting"] = true;
218 teleop_status_["video_stopping"] = false;
219 }
220 // not currently starting or stopping video
221 else {
222 // toggle depending on current state
223 if (teleop_status_["video"]) {
224 teleop_status_["video_stopping"] = true;
225 teleop_status_["video_starting"] = false;
226 } else {
227 teleop_status_["video_stopping"] = false;
228 teleop_status_["video_starting"] = true;
229 }
230 }
231 start = teleop_status_["video_starting"];
232 }
233 // send a start or stop message to the video encoders
234 AINFO << "ToggleVideo: " << start;
235 SendVideoStreamCmd(start);
236 });
237 // Issue pull-over command to remote
238 websocket_->RegisterMessageHandler(
239 "PullOver", [this](const Json &json, WebSocketHandler::Connection *conn) {
240 boost::shared_lock<boost::shared_mutex> reader_lock(mutex_);
241 SendPullOverCmd();
242 });
243 // Issue emergency-stop command to remote
244 websocket_->RegisterMessageHandler(
245 "EStop", [this](const Json &json, WebSocketHandler::Connection *conn) {
246 boost::shared_lock<boost::shared_mutex> reader_lock(mutex_);
247 SendEstopCmd();
248 });
249 // Issue resume-cruise command to remote
250 websocket_->RegisterMessageHandler(
251 "ResumeCruise",
252 [this](const Json &json, WebSocketHandler::Connection *conn) {
253 boost::shared_lock<boost::shared_mutex> reader_lock(mutex_);
254 SendResumeCruiseCmd();
255 });
256 // Request to get updated modem info for client display
257 websocket_->RegisterMessageHandler(
258 "RequestTeleopStatus",
259 [this](const Json &json, WebSocketHandler::Connection *conn) {
260 SendStatus(conn);
261 });
262}
263
264void TeleopService::SendStatus(WebSocketHandler::Connection *conn) {
265 std::string to_send;
266 {
267 boost::shared_lock<boost::shared_mutex> reader_lock(mutex_);
268 to_send = teleop_status_.dump();
269 }
270 websocket_->SendData(conn, to_send);
271}
272
273void TeleopService::UpdateModem(const std::string &modem_id,
274 const std::shared_ptr<ModemInfo> &modem_info) {
275 // TODO simplify data and only send necessary info for display
276 // update modem_info_
277 if (modem_info->has_technology()) {
278 // teleop_status_["modems"][modem_info->provider()] =
279 // modem_info->technology();
280 boost::unique_lock<boost::shared_mutex> writer_lock(mutex_);
281 std::string str;
282 std::stringstream ss(str);
283 double rx = 1.0 * modem_info->rx() / (1024 * 1024);
284 double tx = 1.0 * modem_info->tx() / (1024 * 1024);
285
286 ss << modem_info->technology();
287 ss << std::fixed << std::setw(6) << std::setprecision(2)
288 << std::setfill('0');
289 ss << " rank: " << modem_info->rank();
290 ss << " sig: " << modem_info->signal();
291 ss << " q: " << modem_info->quality();
292 ss << " rx: " << rx << " MB";
293 ss << " tx: " << tx << " MB";
294 teleop_status_["modems"][modem_id] = ss.str();
295 }
296}
297
298// callback for messages that originate from the remote computer
299void TeleopService::UpdateCarDaemonRpt(
300 const std::shared_ptr<DaemonRpt> &daemon_rpt) {
301 {
302 bool videoIsRunning = false;
303 bool voipIsRunning = false;
304 unsigned int runningEncoders = 0;
305 for (int i = 0; i < daemon_rpt->services_size(); i++) {
306 // look for voip_encoder or encoder0..1.2
307 // check 'voip_encoder' first because it contains 'encoder'
308 std::string service = daemon_rpt->services(i);
309 if (service.find("voip_encoder") != std::string::npos) {
310 voipIsRunning = true;
311 } else if (service.find("encoder") != std::string::npos) {
312 runningEncoders++;
313 }
314 }
315
316 // all video encoders are running.
317 videoIsRunning = runningEncoders >= kEncoderCount;
318
319 // we may need to write commands to start/stop the video stream
320 bool sendStartVideo = false;
321 bool sendStopVideo = false;
322
323 bool sendStartAudio = false;
324 bool sendStopAudio = false;
325 // scope for the lock
326 {
327 boost::unique_lock<boost::shared_mutex> writer_lock(mutex_);
328 teleop_status_["video"] = videoIsRunning;
329 teleop_status_["audio"] = voipIsRunning;
330
331 // video currently running
332 if (teleop_status_["video"]) {
333 if (teleop_status_["video_starting"]) {
334 // video has started
335 teleop_status_["video_starting"] = false;
336 } else if (teleop_status_["video_stopping"]) {
337 // not stopped yet
338 sendStopVideo = true;
339 }
340 }
341 // video not running
342 else {
343 if (teleop_status_["video_starting"]) {
344 // not started yet
345 sendStartVideo = true;
346 } else if (teleop_status_["video_stopping"]) {
347 // video is stopped
348 teleop_status_["video_stopping"] = false;
349 }
350 }
351 // audio currently running
352 if (teleop_status_["audio"]) {
353 if (teleop_status_["audio_starting"]) {
354 // audio has started
355 teleop_status_["audio_starting"] = false;
356 } else if (teleop_status_["audio_stopping"]) {
357 sendStopAudio = true;
358 }
359 }
360 // audio not running
361 else {
362 if (teleop_status_["audio_starting"]) {
363 // not started yet
364 sendStartAudio = true;
365 } else if (teleop_status_["audio_stopping"]) {
366 // video is stopped
367 teleop_status_["audio_stopping"] = false;
368 }
369 }
370 }
371 if (sendStartVideo || sendStopVideo) {
372 SendVideoStreamCmd(sendStartVideo);
373 }
374 if (sendStartAudio || sendStopAudio) {
375 SendAudioStreamCmd(sendStartAudio);
376 }
377 }
378}
379
380// callback for messages that originate from this computer
381void TeleopService::UpdateOperatorDaemonRpt(
382 const std::shared_ptr<DaemonRpt> &daemon_rpt) {
383 {
384 bool voipIsRunning = false;
385 for (int i = 0; i < daemon_rpt->services_size(); i++) {
386 std::string service = daemon_rpt->services(i);
387 if (service.find("voip_encoder") != std::string::npos) {
388 voipIsRunning = true;
389 break;
390 }
391 }
392 bool sendStartMic = false;
393 bool sendStopMic = false;
394 // scope for the lock
395 {
396 boost::unique_lock<boost::shared_mutex> writer_lock(mutex_);
397 teleop_status_["mic"] = voipIsRunning;
398 // mic currently running
399 if (teleop_status_["mic"]) {
400 if (teleop_status_["mic_starting"]) {
401 // mic has started
402 teleop_status_["mic_starting"] = false;
403 } else if (teleop_status_["mic_stopping"]) {
404 sendStopMic = true;
405 }
406 }
407 // mic not running
408 else {
409 if (teleop_status_["mic_starting"]) {
410 // not started yet
411 sendStartMic = true;
412 } else if (teleop_status_["mic_stopping"]) {
413 // video is stopped
414 teleop_status_["mic_stopping"] = false;
415 }
416 }
417 }
418 if (sendStartMic || sendStopMic) {
419 SendMicStreamCmd(sendStartMic);
420 }
421 }
422}
423
424void TeleopService::SendVideoStreamCmd(bool start_stop) {
425 DaemonCmd msg;
426 if (start_stop) {
427 msg.set_cmd(start_cmd);
428 } else {
429 msg.set_cmd(stop_cmd);
430 }
431 // we send a message to each encoder.
432 for (unsigned int i = 0; i < kEncoderCount; i++) {
433 if (i > 0) {
434 // delay between sending 2 messages to ensure they are received
435 std::this_thread::sleep_for(std::chrono::milliseconds(kWriteWaitMs));
436 }
437 char encoderName[20];
438 snprintf(encoderName, 20, "encoder%u", i);
439 msg.set_service(encoderName);
440 common::util::FillHeader("dreamview", &msg);
441 remote_daemon_cmd_writer_->Write(msg);
442 AINFO << encoderName << " " << msg.cmd();
443 }
444}
445
446void TeleopService::SendAudioStreamCmd(bool start_stop) {
447 DaemonCmd msg;
448 if (start_stop) {
449 msg.set_cmd(start_cmd);
450 } else {
451 msg.set_cmd(stop_cmd);
452 }
453 msg.set_service("voip_encoder");
454 common::util::FillHeader("dreamview", &msg);
455 remote_daemon_cmd_writer_->Write(msg);
456 AINFO << "audio " << msg.cmd();
457 // audio start / stop implies mic start/stop
458 SendMicStreamCmd(start_stop);
459}
460
461void TeleopService::SendMicStreamCmd(bool start_stop) {
462 // by switching on or off the voip_encoder in the local console
463 // we are controlling the mic
464 DaemonCmd msg;
465 if (start_stop) {
466 msg.set_cmd(start_cmd);
467 } else {
468 msg.set_cmd(stop_cmd);
469 }
470 msg.set_service("voip_encoder");
471 common::util::FillHeader("dreamview", &msg);
472 local_daemon_cmd_writer_->Write(msg);
473 AINFO << "mic " << msg.cmd();
474}
475
476void TeleopService::SendResumeCruiseCmd() {
477 AINFO << "Resume cruise";
478 auto command = std::make_shared<ActionCommand>();
480 action_command_client_->SendRequest(command);
481}
482
483void TeleopService::SendPullOverCmd() {
484 AINFO << "Pull over";
485 auto command = std::make_shared<ActionCommand>();
487 action_command_client_->SendRequest(command);
488}
489
490void TeleopService::SendEstopCmd() {
491 AINFO << "EStop";
492 auto command = std::make_shared<ActionCommand>();
494 action_command_client_->SendRequest(command);
495}
496
497void TeleopService::UpdatePlanning(const std::shared_ptr<ADCTrajectory> &msg) {
498 static int count = 0;
499 ++count;
500
501 if (count % 10 == 0) {
502 AINFO << "Update Planning";
503 }
504 auto scenario_type = msg->debug().planning_data().scenario().scenario_type();
505
506 bool pulled_over = scenario_type == "PULL_OVER";
507 bool autonomy_resumed = scenario_type == "PARK_AND_GO";
508 bool e_stopped = scenario_type == "EMERGENCY_PULL_OVER";
509
510 bool sendPullOver = false;
511 bool sendStop = false;
512 bool sendResume = false;
513 {
514 boost::unique_lock<boost::shared_mutex> writer_lock(mutex_);
515 if (pulled_over) {
516 if (teleop_status_["pulling_over"]) {
517 // pulled over confirmed
518 teleop_status_["pulling_over"] = false;
519 }
520 }
521 // not pulled over
522 else {
523 if (teleop_status_["pulling_over"]) {
524 sendPullOver = true;
525 }
526 }
527
528 if (e_stopped) {
529 if (teleop_status_["e_stopping"]) {
530 // e stop over confirmed
531 teleop_status_["e_stopping"] = false;
532 }
533 }
534 // not e stopped
535 else {
536 if (teleop_status_["e_stopping"]) {
537 sendStop = true;
538 }
539 }
540
541 if (autonomy_resumed) {
542 if (teleop_status_["resuming_autonomy"]) {
543 teleop_status_["resuming_autonomy"] = false;
544 }
545 } else {
546 if (teleop_status_["resuming_autonomy"]) {
547 sendResume = true;
548 }
549 }
550 } // writer lock scope
551
552 if (sendResume) {
553 SendResumeCruiseCmd();
554 }
555 if (sendStop) {
556 SendEstopCmd();
557 }
558 if (sendPullOver) {
559 SendPullOverCmd();
560 }
561}
562
563} // namespace dreamview
564} // namespace apollo
Cyber has builtin time type Time.
Definition time.h:31
TeleopService(WebSocketHandler *websocket)
Definition teleop.cc:69
The WebSocketHandler, built on top of CivetWebSocketHandler, is a websocket handler that handles diff...
void RegisterMessageHandler(std::string type, MessageHandler handler)
Add a new message handler for a message type.
bool SendData(Connection *conn, const std::string &data, bool skippable=false, int op_code=MG_WEBSOCKET_OPCODE_TEXT)
Sends the provided data to a specific connected client.
void RegisterConnectionReadyHandler(ConnectionReadyHandler handler)
Add a new handler for new connections.
#define AINFO
Definition log.h:42
Some string util functions.
nlohmann::json Json
const std::string local_daemon_cmd_channel
Definition teleop.cc:64
const std::string modem2_channel
Definition teleop.cc:59
const std::string modem1_id
Definition teleop.cc:45
const std::string modem0_channel
Definition teleop.cc:57
const std::string modem1_channel
Definition teleop.cc:58
const std::string planning_pad_channel
Definition teleop.cc:67
const std::string planning_channel
Definition teleop.cc:66
const std::string modem2_id
Definition teleop.cc:46
const std::string local_daemon_rpt_channel
Definition teleop.cc:65
const std::string remote_daemon_rpt_channel
Definition teleop.cc:62
const std::string start_cmd
Definition teleop.cc:53
const std::string modem0_id
Definition teleop.cc:44
const std::string remote_daemon_cmd_channel
Definition teleop.cc:60
const std::string stop_cmd
Definition teleop.cc:54
class register implement
Definition arena_queue.h:37