21#include "google/protobuf/util/json_util.h"
31using Json = nlohmann::json;
34using apollo::planning::DrivingAction;
35using apollo::planning::ScenarioConfig;
36using ::google::protobuf::util::MessageToJsonString;
49static const unsigned int kEncoderCount = 3;
51static const unsigned int kWriteWaitMs = 50;
61 "/apollo/teleop/daemon/remote/cmd";
63 "/apollo/teleop/daemon/remote/rpt";
70 : node_(cyber::CreateNode(
"teleop")), websocket_(websocket) {
71 RegisterMessageHandlers();
73 teleop_status_[
"audio"] =
false;
74 teleop_status_[
"audio_starting"] =
false;
75 teleop_status_[
"audio_stopping"] =
false;
76 teleop_status_[
"mic"] =
false;
77 teleop_status_[
"mic_starting"] =
false;
78 teleop_status_[
"mic_stopping"] =
false;
79 teleop_status_[
"video"] =
false;
80 teleop_status_[
"video_starting"] =
false;
81 teleop_status_[
"video_stopping"] =
false;
82 teleop_status_[
"pulling_over"] =
false;
83 teleop_status_[
"e_stopping"] =
false;
84 teleop_status_[
"resuming_autonomy"] =
false;
91 modem0_info_reader_ = node_->CreateReader<
ModemInfo>(
96 modem1_info_reader_ = node_->CreateReader<
ModemInfo>(
101 modem2_info_reader_ = node_->CreateReader<
ModemInfo>(
111 remote_daemon_cmd_writer_ =
114 local_daemon_cmd_writer_ =
117 remote_daemon_rpt_reader_ = node_->CreateReader<
DaemonRpt>(
119 [
this](
const std::shared_ptr<DaemonRpt> &msg) {
120 UpdateCarDaemonRpt(msg);
123 local_daemon_rpt_reader_ = node_->CreateReader<
DaemonRpt>(
125 [
this](
const std::shared_ptr<DaemonRpt> &msg) {
126 UpdateOperatorDaemonRpt(msg);
129 action_command_client_ =
134void TeleopService::RegisterMessageHandlers() {
146 boost::unique_lock<boost::shared_mutex> writer_lock(mutex_);
148 if (teleop_status_[
"audio_starting"]) {
149 teleop_status_[
"audio_starting"] =
false;
150 teleop_status_[
"audio_stopping"] =
true;
151 }
else if (teleop_status_[
"audio_stopping"]) {
152 teleop_status_[
"audio_starting"] =
true;
153 teleop_status_[
"audio_stopping"] =
false;
158 if (teleop_status_[
"audio"]) {
159 teleop_status_[
"audio_stopping"] =
true;
160 teleop_status_[
"audio_starting"] =
false;
162 teleop_status_[
"audio_stopping"] =
false;
163 teleop_status_[
"audio_starting"] =
true;
166 start = teleop_status_[
"audio_starting"];
168 AINFO <<
"ToggleAudio: " << start;
169 SendAudioStreamCmd(start);
178 boost::unique_lock<boost::shared_mutex> writer_lock(mutex_);
180 if (teleop_status_[
"mic_starting"]) {
181 teleop_status_[
"mic_starting"] =
false;
182 teleop_status_[
"mic_stopping"] =
true;
183 }
else if (teleop_status_[
"mic_stopping"]) {
184 teleop_status_[
"mic_starting"] =
true;
185 teleop_status_[
"mic_stopping"] =
false;
190 if (teleop_status_[
"mic"]) {
191 teleop_status_[
"mic_stopping"] =
true;
192 teleop_status_[
"mic_starting"] =
false;
194 teleop_status_[
"mic_stopping"] =
false;
195 teleop_status_[
"mic_starting"] =
true;
198 start = teleop_status_[
"mic_starting"];
200 AINFO <<
"ToggleMic: " << start;
201 SendMicStreamCmd(start);
211 boost::shared_lock<boost::shared_mutex> writer_lock(mutex_);
213 if (teleop_status_[
"video_starting"]) {
214 teleop_status_[
"video_starting"] =
false;
215 teleop_status_[
"video_stopping"] =
true;
216 }
else if (teleop_status_[
"video_stopping"]) {
217 teleop_status_[
"video_starting"] =
true;
218 teleop_status_[
"video_stopping"] =
false;
223 if (teleop_status_[
"video"]) {
224 teleop_status_[
"video_stopping"] =
true;
225 teleop_status_[
"video_starting"] =
false;
227 teleop_status_[
"video_stopping"] =
false;
228 teleop_status_[
"video_starting"] =
true;
231 start = teleop_status_[
"video_starting"];
234 AINFO <<
"ToggleVideo: " << start;
235 SendVideoStreamCmd(start);
240 boost::shared_lock<boost::shared_mutex> reader_lock(mutex_);
246 boost::shared_lock<boost::shared_mutex> reader_lock(mutex_);
253 boost::shared_lock<boost::shared_mutex> reader_lock(mutex_);
254 SendResumeCruiseCmd();
258 "RequestTeleopStatus",
267 boost::shared_lock<boost::shared_mutex> reader_lock(mutex_);
268 to_send = teleop_status_.dump();
270 websocket_->
SendData(conn, to_send);
273void TeleopService::UpdateModem(
const std::string &modem_id,
274 const std::shared_ptr<ModemInfo> &modem_info) {
277 if (modem_info->has_technology()) {
280 boost::unique_lock<boost::shared_mutex> writer_lock(mutex_);
282 std::stringstream ss(str);
283 double rx = 1.0 * modem_info->rx() / (1024 * 1024);
284 double tx = 1.0 * modem_info->tx() / (1024 * 1024);
286 ss << modem_info->technology();
287 ss << std::fixed << std::setw(6) << std::setprecision(2)
288 << std::setfill(
'0');
289 ss <<
" rank: " << modem_info->rank();
290 ss <<
" sig: " << modem_info->signal();
291 ss <<
" q: " << modem_info->quality();
292 ss <<
" rx: " << rx <<
" MB";
293 ss <<
" tx: " << tx <<
" MB";
294 teleop_status_[
"modems"][modem_id] = ss.str();
299void TeleopService::UpdateCarDaemonRpt(
300 const std::shared_ptr<DaemonRpt> &daemon_rpt) {
302 bool videoIsRunning =
false;
303 bool voipIsRunning =
false;
304 unsigned int runningEncoders = 0;
305 for (
int i = 0; i < daemon_rpt->services_size(); i++) {
308 std::string service = daemon_rpt->services(i);
309 if (service.find(
"voip_encoder") != std::string::npos) {
310 voipIsRunning =
true;
311 }
else if (service.find(
"encoder") != std::string::npos) {
317 videoIsRunning = runningEncoders >= kEncoderCount;
320 bool sendStartVideo =
false;
321 bool sendStopVideo =
false;
323 bool sendStartAudio =
false;
324 bool sendStopAudio =
false;
327 boost::unique_lock<boost::shared_mutex> writer_lock(mutex_);
328 teleop_status_[
"video"] = videoIsRunning;
329 teleop_status_[
"audio"] = voipIsRunning;
332 if (teleop_status_[
"video"]) {
333 if (teleop_status_[
"video_starting"]) {
335 teleop_status_[
"video_starting"] =
false;
336 }
else if (teleop_status_[
"video_stopping"]) {
338 sendStopVideo =
true;
343 if (teleop_status_[
"video_starting"]) {
345 sendStartVideo =
true;
346 }
else if (teleop_status_[
"video_stopping"]) {
348 teleop_status_[
"video_stopping"] =
false;
352 if (teleop_status_[
"audio"]) {
353 if (teleop_status_[
"audio_starting"]) {
355 teleop_status_[
"audio_starting"] =
false;
356 }
else if (teleop_status_[
"audio_stopping"]) {
357 sendStopAudio =
true;
362 if (teleop_status_[
"audio_starting"]) {
364 sendStartAudio =
true;
365 }
else if (teleop_status_[
"audio_stopping"]) {
367 teleop_status_[
"audio_stopping"] =
false;
371 if (sendStartVideo || sendStopVideo) {
372 SendVideoStreamCmd(sendStartVideo);
374 if (sendStartAudio || sendStopAudio) {
375 SendAudioStreamCmd(sendStartAudio);
381void TeleopService::UpdateOperatorDaemonRpt(
382 const std::shared_ptr<DaemonRpt> &daemon_rpt) {
384 bool voipIsRunning =
false;
385 for (
int i = 0; i < daemon_rpt->services_size(); i++) {
386 std::string service = daemon_rpt->services(i);
387 if (service.find(
"voip_encoder") != std::string::npos) {
388 voipIsRunning =
true;
392 bool sendStartMic =
false;
393 bool sendStopMic =
false;
396 boost::unique_lock<boost::shared_mutex> writer_lock(mutex_);
397 teleop_status_[
"mic"] = voipIsRunning;
399 if (teleop_status_[
"mic"]) {
400 if (teleop_status_[
"mic_starting"]) {
402 teleop_status_[
"mic_starting"] =
false;
403 }
else if (teleop_status_[
"mic_stopping"]) {
409 if (teleop_status_[
"mic_starting"]) {
412 }
else if (teleop_status_[
"mic_stopping"]) {
414 teleop_status_[
"mic_stopping"] =
false;
418 if (sendStartMic || sendStopMic) {
419 SendMicStreamCmd(sendStartMic);
424void TeleopService::SendVideoStreamCmd(
bool start_stop) {
432 for (
unsigned int i = 0; i < kEncoderCount; i++) {
435 std::this_thread::sleep_for(std::chrono::milliseconds(kWriteWaitMs));
437 char encoderName[20];
438 snprintf(encoderName, 20,
"encoder%u", i);
439 msg.set_service(encoderName);
440 common::util::FillHeader(
"dreamview", &msg);
441 remote_daemon_cmd_writer_->Write(msg);
442 AINFO << encoderName <<
" " << msg.cmd();
446void TeleopService::SendAudioStreamCmd(
bool start_stop) {
453 msg.set_service(
"voip_encoder");
454 common::util::FillHeader(
"dreamview", &msg);
455 remote_daemon_cmd_writer_->Write(msg);
456 AINFO <<
"audio " << msg.cmd();
458 SendMicStreamCmd(start_stop);
461void TeleopService::SendMicStreamCmd(
bool start_stop) {
470 msg.set_service(
"voip_encoder");
471 common::util::FillHeader(
"dreamview", &msg);
472 local_daemon_cmd_writer_->Write(msg);
473 AINFO <<
"mic " << msg.cmd();
476void TeleopService::SendResumeCruiseCmd() {
477 AINFO <<
"Resume cruise";
478 auto command = std::make_shared<ActionCommand>();
480 action_command_client_->SendRequest(command);
483void TeleopService::SendPullOverCmd() {
484 AINFO <<
"Pull over";
485 auto command = std::make_shared<ActionCommand>();
487 action_command_client_->SendRequest(command);
490void TeleopService::SendEstopCmd() {
492 auto command = std::make_shared<ActionCommand>();
494 action_command_client_->SendRequest(command);
497void TeleopService::UpdatePlanning(
const std::shared_ptr<ADCTrajectory> &msg) {
498 static int count = 0;
501 if (count % 10 == 0) {
502 AINFO <<
"Update Planning";
504 auto scenario_type = msg->debug().planning_data().scenario().scenario_type();
506 bool pulled_over = scenario_type ==
"PULL_OVER";
507 bool autonomy_resumed = scenario_type ==
"PARK_AND_GO";
508 bool e_stopped = scenario_type ==
"EMERGENCY_PULL_OVER";
510 bool sendPullOver =
false;
511 bool sendStop =
false;
512 bool sendResume =
false;
514 boost::unique_lock<boost::shared_mutex> writer_lock(mutex_);
516 if (teleop_status_[
"pulling_over"]) {
518 teleop_status_[
"pulling_over"] =
false;
523 if (teleop_status_[
"pulling_over"]) {
529 if (teleop_status_[
"e_stopping"]) {
531 teleop_status_[
"e_stopping"] =
false;
536 if (teleop_status_[
"e_stopping"]) {
541 if (autonomy_resumed) {
542 if (teleop_status_[
"resuming_autonomy"]) {
543 teleop_status_[
"resuming_autonomy"] =
false;
546 if (teleop_status_[
"resuming_autonomy"]) {
553 SendResumeCruiseCmd();
Cyber has builtin time type Time.
TeleopService(WebSocketHandler *websocket)
The WebSocketHandler, built on top of CivetWebSocketHandler, is a websocket handler that handles diff...
void RegisterMessageHandler(std::string type, MessageHandler handler)
Add a new message handler for a message type.
bool SendData(Connection *conn, const std::string &data, bool skippable=false, int op_code=MG_WEBSOCKET_OPCODE_TEXT)
Sends the provided data to a specific connected client.
void RegisterConnectionReadyHandler(ConnectionReadyHandler handler)
Add a new handler for new connections.
Some string util functions.
const std::string local_daemon_cmd_channel
const std::string modem2_channel
const std::string modem1_id
const std::string modem0_channel
const std::string modem1_channel
const std::string planning_pad_channel
const std::string planning_channel
const std::string modem2_id
const std::string local_daemon_rpt_channel
const std::string remote_daemon_rpt_channel
const std::string start_cmd
const std::string modem0_id
const std::string remote_daemon_cmd_channel
const std::string stop_cmd