Apollo 10.0
自动驾驶开放平台
raw_stream.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2017 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <cmath>
20#include <ctime>
21#include <memory>
22#include <thread>
23#include <vector>
24
25#include "absl/strings/str_cat.h"
26
27#include "modules/drivers/gnss/proto/config.pb.h"
28
29#include "cyber/cyber.h"
33
34namespace apollo {
35namespace drivers {
36namespace gnss {
37
39
41 StreamStatus_Type *report_status_type) {
42 switch (status) {
44 *report_status_type = StreamStatus::CONNECTED;
45 break;
46
48 *report_status_type = StreamStatus::DISCONNECTED;
49 break;
50
52 default:
53 *report_status_type = StreamStatus::DISCONNECTED;
54 break;
55 }
56}
57std::string getLocalTimeFileStr(const std::string &gpsbin_folder) {
58 time_t it = std::time(0);
59 char local_time_char[64];
60 std::tm time_tm;
61 localtime_r(&it, &time_tm);
62
63 std::strftime(local_time_char, sizeof(local_time_char), "%Y%m%d_%H%M%S",
64 &time_tm);
65 std::string local_time_str = local_time_char;
67 << "gbsbin folder : " << gpsbin_folder << " create fail";
68 std::string local_time_file_str =
69 gpsbin_folder + "/" + local_time_str + ".bin";
70 return local_time_file_str;
71}
72
74 switch (sd.type_case()) {
75 case config::Stream::kSerial:
76 if (!sd.serial().has_device()) {
77 AERROR << "Serial def has no device field.";
78 return nullptr;
79 }
80 if (!sd.serial().has_baud_rate()) {
81 AERROR << "Serial def has no baud_rate field. Use default baud rate "
82 << sd.serial().baud_rate();
83 return nullptr;
84 }
85 return Stream::create_serial(sd.serial().device().c_str(),
86 sd.serial().baud_rate());
87
88 case config::Stream::kTcp:
89 if (!sd.tcp().has_address()) {
90 AERROR << "tcp def has no address field.";
91 return nullptr;
92 }
93 if (!sd.tcp().has_port()) {
94 AERROR << "tcp def has no port field.";
95 return nullptr;
96 }
97 return Stream::create_tcp(sd.tcp().address().c_str(),
98 static_cast<uint16_t>(sd.tcp().port()));
99
100 case config::Stream::kUdp:
101 if (!sd.udp().has_address()) {
102 AERROR << "tcp def has no address field.";
103 return nullptr;
104 }
105 if (!sd.udp().has_port()) {
106 AERROR << "tcp def has no port field.";
107 return nullptr;
108 }
109 return Stream::create_udp(sd.udp().address().c_str(),
110 static_cast<uint16_t>(sd.udp().port()));
111
112 case config::Stream::kNtrip:
113 if (!sd.ntrip().has_address()) {
114 AERROR << "ntrip def has no address field.";
115 return nullptr;
116 }
117 if (!sd.ntrip().has_port()) {
118 AERROR << "ntrip def has no port field.";
119 return nullptr;
120 }
121 if (!sd.ntrip().has_mount_point()) {
122 AERROR << "ntrip def has no mount point field.";
123 return nullptr;
124 }
125 if (!sd.ntrip().has_user()) {
126 AERROR << "ntrip def has no user field.";
127 return nullptr;
128 }
129 if (!sd.ntrip().has_password()) {
130 AERROR << "ntrip def has no passwd field.";
131 return nullptr;
132 }
134 sd.ntrip().address(), static_cast<uint16_t>(sd.ntrip().port()),
135 sd.ntrip().mount_point(), sd.ntrip().user(), sd.ntrip().password(),
136 sd.ntrip().timeout_s());
137 case config::Stream::kCanCardParameter:
138 if (!sd.can_card_parameter().has_brand()) {
139 AERROR << "can_card_parameter def has no brand field.";
140 return nullptr;
141 }
142 if (!sd.can_card_parameter().has_type()) {
143 AERROR << "can_card_parameter def has no type field.";
144 return nullptr;
145 }
146 if (!sd.can_card_parameter().has_channel_id()) {
147 AERROR << "can_card_parameter def has no channel_id field.";
148 return nullptr;
149 }
150 if (!sd.can_card_parameter().has_channel_id()) {
151 AERROR << "can_card_parameter def has no channel_id field.";
152 return nullptr;
153 }
154 return Stream::create_can(sd.can_card_parameter());
155 default:
156 return nullptr;
157 }
158}
159
161 const std::shared_ptr<apollo::cyber::Node> &node)
162 : config_(config), node_(node) {
163 data_parser_ptr_.reset(new DataParser(config_, node_));
164 rtcm_parser_ptr_.reset(new RtcmParser(config_, node_));
165}
166
168 this->Logout();
169 this->Disconnect();
170 if (gpsbin_stream_ != nullptr) {
171 gpsbin_stream_->close();
172 }
173 if (data_thread_ptr_ != nullptr && data_thread_ptr_->joinable()) {
174 data_thread_ptr_->join();
175 }
176 if (rtk_thread_ptr_ != nullptr && rtk_thread_ptr_->joinable()) {
177 rtk_thread_ptr_->join();
178 }
179}
180
182 CHECK_NOTNULL(data_parser_ptr_);
183 CHECK_NOTNULL(rtcm_parser_ptr_);
184 if (!data_parser_ptr_->Init()) {
185 AERROR << "Init data parser failed.";
186 return false;
187 }
188 if (!rtcm_parser_ptr_->Init()) {
189 AERROR << "Init rtcm parser failed.";
190 return false;
191 }
192 stream_status_.set_ins_stream_type(StreamStatus::DISCONNECTED);
193 stream_status_.set_rtk_stream_in_type(StreamStatus::DISCONNECTED);
194 stream_status_.set_rtk_stream_out_type(StreamStatus::DISCONNECTED);
195 stream_writer_ = node_->CreateWriter<StreamStatus>(FLAGS_stream_status_topic);
196
197 common::util::FillHeader("gnss", &stream_status_);
198 stream_writer_->Write(stream_status_);
199
200 // Creates streams.
201 Stream *s = nullptr;
202 if (!config_.has_data()) {
203 AINFO << "Error: Config file must provide the data stream.";
204 return false;
205 }
206 s = create_stream(config_.data());
207 if (s == nullptr) {
208 AERROR << "Failed to create data stream.";
209 return false;
210 }
211 data_stream_.reset(s);
212
213 Status *status = new Status();
214 if (!status) {
215 AERROR << "Failed to create data stream status.";
216 return false;
217 }
218 data_stream_status_.reset(status);
219
220 if (config_.has_command()) {
221 s = create_stream(config_.command());
222 if (s == nullptr) {
223 AERROR << "Failed to create command stream.";
224 return false;
225 }
226 command_stream_.reset(s);
227
228 status = new Status();
229 if (!status) {
230 AERROR << "Failed to create command stream status.";
231 return false;
232 }
233 command_stream_status_.reset(status);
234 } else {
235 command_stream_ = data_stream_;
236 command_stream_status_ = data_stream_status_;
237 }
238
239 if (config_.has_rtk_from()) {
240 s = create_stream(config_.rtk_from());
241 if (s == nullptr) {
242 AERROR << "Failed to create rtk_from stream.";
243 return false;
244 }
245 in_rtk_stream_.reset(s);
246
247 if (config_.rtk_from().has_push_location()) {
248 push_location_ = config_.rtk_from().push_location();
249 }
250
251 status = new Status();
252 if (!status) {
253 AERROR << "Failed to create rtk_from stream status.";
254 return false;
255 }
256 in_rtk_stream_status_.reset(status);
257
258 if (config_.has_rtk_to()) {
259 s = create_stream(config_.rtk_to());
260 if (s == nullptr) {
261 AERROR << "Failed to create rtk_to stream.";
262 return false;
263 }
264 out_rtk_stream_.reset(s);
265
266 status = new Status();
267 if (!status) {
268 AERROR << "Failed to create rtk_to stream status.";
269 return false;
270 }
271 out_rtk_stream_status_.reset(status);
272 } else {
273 out_rtk_stream_ = data_stream_;
274 out_rtk_stream_status_ = data_stream_status_;
275 }
276
277 if (config_.has_rtk_solution_type()) {
278 if (config_.rtk_solution_type() ==
280 rtk_software_solution_ = true;
281 }
282 }
283 }
284
285 if (config_.login_commands().empty()) {
286 AWARN << "No login_commands in config file.";
287 }
288
289 if (config_.logout_commands().empty()) {
290 AWARN << "No logout_commands in config file.";
291 }
292
293 // connect and login
294 if (!Connect()) {
295 AERROR << "gnss driver connect failed.";
296 return false;
297 }
298
299 if (!Login()) {
300 AERROR << "gnss driver login failed.";
301 return false;
302 }
303
304 const std::string gpsbin_file = getLocalTimeFileStr(config_.gpsbin_folder());
305 gpsbin_stream_.reset(new std::ofstream(
306 gpsbin_file, std::ios::app | std::ios::out | std::ios::binary));
307 stream_writer_ = node_->CreateWriter<StreamStatus>(FLAGS_stream_status_topic);
308 raw_writer_ = node_->CreateWriter<RawData>(FLAGS_gnss_raw_data_topic);
309 rtcm_writer_ = node_->CreateWriter<RawData>(FLAGS_rtcm_data_topic);
310 cyber::ReaderConfig reader_config;
311 reader_config.channel_name = FLAGS_gnss_raw_data_topic;
312 reader_config.pending_queue_size = 100;
313 gpsbin_reader_ = node_->CreateReader<RawData>(
314 reader_config, [&](const std::shared_ptr<RawData> &raw_data) {
315 GpsbinCallback(raw_data);
316 });
317 chassis_reader_ = node_->CreateReader<Chassis>(
318 FLAGS_chassis_topic,
319 [&](const std::shared_ptr<Chassis> &chassis) { chassis_ptr_ = chassis; });
320
321 return true;
322}
323
325 data_thread_ptr_.reset(new std::thread(&RawStream::DataSpin, this));
326 rtk_thread_ptr_.reset(new std::thread(&RawStream::RtkSpin, this));
327 if (config_.has_wheel_parameters()) {
328 wheel_velocity_timer_.reset(new cyber::Timer(
329 1000, [this]() { this->OnWheelVelocityTimer(); }, false));
330 wheel_velocity_timer_->Start();
331 }
332}
333
334void RawStream::OnWheelVelocityTimer() {
335 if (chassis_ptr_ == nullptr) {
336 AINFO << "No chassis message received";
337 return;
338 }
339 auto latency_sec =
340 cyber::Time::Now().ToSecond() - chassis_ptr_->header().timestamp_sec();
341 auto latency_ms = std::lround(latency_sec * 1000);
342 auto speed_cmps = std::lround(chassis_ptr_->speed_mps() * 100);
343 auto cmd_wheelvelocity = absl::StrCat("WHEELVELOCITY ", latency_ms,
344 " 100 0 0 0 0 0 ", speed_cmps, "\r\n");
345 AINFO << "Write command: " << cmd_wheelvelocity;
346 command_stream_->write(cmd_wheelvelocity);
347}
348
349bool RawStream::Connect() {
350 if (data_stream_) {
351 if (data_stream_->get_status() != Stream::Status::CONNECTED) {
352 if (!data_stream_->Connect()) {
353 AERROR << "data stream connect failed.";
354 return false;
355 }
356 data_stream_status_->status = Stream::Status::CONNECTED;
357 stream_status_.set_ins_stream_type(StreamStatus::CONNECTED);
358 }
359 }
360
361 if (command_stream_) {
362 if (command_stream_->get_status() != Stream::Status::CONNECTED) {
363 if (!command_stream_->Connect()) {
364 AERROR << "command stream connect failed.";
365 return false;
366 }
367 command_stream_status_->status = Stream::Status::CONNECTED;
368 }
369 }
370
371 if (in_rtk_stream_) {
372 if (in_rtk_stream_->get_status() != Stream::Status::CONNECTED) {
373 if (!in_rtk_stream_->Connect()) {
374 AERROR << "in rtk stream connect failed.";
375 } else {
376 in_rtk_stream_status_->status = Stream::Status::CONNECTED;
377 stream_status_.set_rtk_stream_in_type(StreamStatus::CONNECTED);
378 }
379 }
380 } else {
381 stream_status_.set_rtk_stream_in_type(StreamStatus::CONNECTED);
382 }
383
384 if (out_rtk_stream_) {
385 if (out_rtk_stream_->get_status() != Stream::Status::CONNECTED) {
386 if (!out_rtk_stream_->Connect()) {
387 AERROR << "out rtk stream connect failed.";
388 } else {
389 out_rtk_stream_status_->status = Stream::Status::CONNECTED;
390 stream_status_.set_rtk_stream_out_type(StreamStatus::CONNECTED);
391 }
392 }
393 } else {
394 stream_status_.set_rtk_stream_out_type(StreamStatus::CONNECTED);
395 }
396 return true;
397}
398
399bool RawStream::Disconnect() {
400 if (data_stream_) {
401 if (data_stream_->get_status() == Stream::Status::CONNECTED) {
402 if (!data_stream_->Disconnect()) {
403 AERROR << "data stream disconnect failed.";
404 return false;
405 }
406 }
407 }
408
409 if (command_stream_) {
410 if (command_stream_->get_status() == Stream::Status::CONNECTED) {
411 if (!command_stream_->Disconnect()) {
412 AERROR << "command stream disconnect failed.";
413 return false;
414 }
415 }
416 }
417 if (in_rtk_stream_) {
418 if (in_rtk_stream_->get_status() == Stream::Status::CONNECTED) {
419 if (!in_rtk_stream_->Disconnect()) {
420 AERROR << "in rtk stream disconnect failed.";
421 return false;
422 }
423 }
424 }
425 if (out_rtk_stream_) {
426 if (out_rtk_stream_->get_status() == Stream::Status::CONNECTED) {
427 if (!out_rtk_stream_->Disconnect()) {
428 AERROR << "out rtk stream disconnect failed.";
429 return false;
430 }
431 }
432 }
433
434 return true;
435}
436
437bool RawStream::Login() {
438 std::vector<std::string> login_data;
439 for (const auto &login_command : config_.login_commands()) {
440 data_stream_->write(login_command);
441 login_data.emplace_back(login_command);
442 AINFO << "Login command: " << login_command;
443 // sleep a little to avoid overrun of the slow serial interface.
444 cyber::Duration(0.5).Sleep();
445 }
446 data_stream_->RegisterLoginData(login_data);
447
448 if (config_.has_wheel_parameters()) {
449 AINFO << "Write command: " << config_.wheel_parameters();
450 command_stream_->write(config_.wheel_parameters());
451 }
452
453 return true;
454}
455
456bool RawStream::Logout() {
457 for (const auto &logout_command : config_.logout_commands()) {
458 data_stream_->write(logout_command);
459 AINFO << "Logout command: " << logout_command;
460 }
461 return true;
462}
463
464void RawStream::StreamStatusCheck() {
465 bool status_report = false;
466 StreamStatus_Type report_stream_status;
467
468 if (data_stream_ &&
469 (data_stream_->get_status() != data_stream_status_->status)) {
470 data_stream_status_->status = data_stream_->get_status();
471 status_report = true;
472 switch_stream_status(data_stream_status_->status, &report_stream_status);
473 stream_status_.set_ins_stream_type(report_stream_status);
474 }
475
476 if (in_rtk_stream_ &&
477 (in_rtk_stream_->get_status() != in_rtk_stream_status_->status)) {
478 in_rtk_stream_status_->status = in_rtk_stream_->get_status();
479 status_report = true;
480 switch_stream_status(in_rtk_stream_status_->status, &report_stream_status);
481 stream_status_.set_rtk_stream_in_type(report_stream_status);
482 }
483
484 if (out_rtk_stream_ &&
485 (out_rtk_stream_->get_status() != out_rtk_stream_status_->status)) {
486 out_rtk_stream_status_->status = out_rtk_stream_->get_status();
487 status_report = true;
488 switch_stream_status(out_rtk_stream_status_->status, &report_stream_status);
489 stream_status_.set_rtk_stream_out_type(report_stream_status);
490 }
491
492 if (status_report) {
493 common::util::FillHeader("gnss", &stream_status_);
494 stream_writer_->Write(stream_status_);
495 }
496}
497
498void RawStream::DataSpin() {
499 common::util::FillHeader("gnss", &stream_status_);
500 stream_writer_->Write(stream_status_);
501 while (cyber::OK()) {
502 size_t length = data_stream_->read(buffer_, BUFFER_SIZE);
503 if (length > 0) {
504 std::shared_ptr<RawData> msg_pub = std::make_shared<RawData>();
505 if (!msg_pub) {
506 AERROR << "New data sting msg failed.";
507 continue;
508 }
509 msg_pub->set_data(reinterpret_cast<const char *>(buffer_), length);
510 raw_writer_->Write(msg_pub);
511 data_parser_ptr_->ParseRawData(msg_pub->data());
512 if (push_location_) {
513 PushGpgga(length);
514 }
515 }
516 StreamStatusCheck();
517 }
518}
519
520void RawStream::RtkSpin() {
521 if (in_rtk_stream_ == nullptr) {
522 return;
523 }
524 while (cyber::OK()) {
525 size_t length = in_rtk_stream_->read(buffer_rtk_, BUFFER_SIZE);
526 if (length > 0) {
527 if (rtk_software_solution_) {
528 PublishRtkData(length);
529 } else {
530 PublishRtkData(length);
531 if (out_rtk_stream_ == nullptr) {
532 continue;
533 }
534 size_t ret = out_rtk_stream_->write(buffer_rtk_, length);
535 if (ret != length) {
536 AERROR << "Expect write out rtk stream bytes " << length
537 << " but got " << ret;
538 }
539 }
540 }
541 }
542}
543
544void RawStream::PublishRtkData(const size_t length) {
545 std::shared_ptr<RawData> rtk_msg = std::make_shared<RawData>();
546 CHECK_NOTNULL(rtk_msg);
547 rtk_msg->set_data(reinterpret_cast<const char *>(buffer_rtk_), length);
548 rtcm_writer_->Write(rtk_msg);
549 rtcm_parser_ptr_->ParseRtcmData(rtk_msg->data());
550}
551
552void RawStream::PushGpgga(const size_t length) {
553 if (!in_rtk_stream_) {
554 return;
555 }
556
557 char *gpgga = strstr(reinterpret_cast<char *>(buffer_), "$GPGGA");
558 if (gpgga) {
559 char *p = strchr(gpgga, '*');
560 if (p) {
561 p += 5;
562 if (size_t(p - reinterpret_cast<char *>(buffer_)) <= length) {
563 AINFO_EVERY(5) << "Push gpgga.";
564 in_rtk_stream_->write(reinterpret_cast<uint8_t *>(gpgga),
565 reinterpret_cast<uint8_t *>(p) - buffer_);
566 }
567 }
568 }
569}
570
571void RawStream::GpsbinCallback(const std::shared_ptr<RawData const> &raw_data) {
572 if (gpsbin_stream_ == nullptr) {
573 return;
574 }
575 gpsbin_stream_->write(raw_data->data().c_str(), raw_data->data().size());
576}
577
578} // namespace gnss
579} // namespace drivers
580} // namespace apollo
static Time Now()
get the current time.
Definition time.cc:57
double ToSecond() const
convert time to second.
Definition time.cc:77
Used to perform oneshot or periodic timing tasks
Definition timer.h:71
RawStream(const config::Config &config, const std::shared_ptr< apollo::cyber::Node > &node)
static Stream * create_serial(const char *device_name, uint32_t baud_rate, uint32_t timeout_usec=0)
static Stream * create_udp(const char *address, uint16_t port, uint32_t timeout_usec=1000000)
Definition udp_stream.cc:57
static Stream * create_can(const apollo::drivers::canbus::CANCardParameter &parameter)
Definition can_stream.cc:74
static Stream * create_tcp(const char *address, uint16_t port, uint32_t timeout_usec=1000000)
static Stream * create_ntrip(const std::string &address, uint16_t port, const std::string &mountpoint, const std::string &user, const std::string &passwd, uint32_t timeout_s=30)
#define ACHECK(cond)
Definition log.h:80
#define AINFO_EVERY(freq)
Definition log.h:82
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
#define AWARN
Definition log.h:43
Some string util functions.
bool EnsureDirectory(const std::string &directory_path)
Check if a specified directory specified by directory_path exists.
Definition file.cc:299
bool OK()
Definition state.h:44
Stream * create_stream(const config::Stream &sd)
Definition raw_stream.cc:73
void switch_stream_status(const apollo::drivers::gnss::Stream::Status &status, StreamStatus_Type *report_status_type)
Definition raw_stream.cc:40
std::string getLocalTimeFileStr(const std::string &gpsbin_folder)
Definition raw_stream.cc:57
class register implement
Definition arena_queue.h:37
optional RtkSolutionType rtk_solution_type
Definition config.proto:133