25#include "absl/strings/str_cat.h"
27#include "modules/drivers/gnss/proto/config.pb.h"
41 StreamStatus_Type *report_status_type) {
58 time_t it = std::time(0);
59 char local_time_char[64];
61 localtime_r(&it, &time_tm);
63 std::strftime(local_time_char,
sizeof(local_time_char),
"%Y%m%d_%H%M%S",
65 std::string local_time_str = local_time_char;
67 <<
"gbsbin folder : " << gpsbin_folder <<
" create fail";
68 std::string local_time_file_str =
69 gpsbin_folder +
"/" + local_time_str +
".bin";
70 return local_time_file_str;
74 switch (sd.type_case()) {
75 case config::Stream::kSerial:
76 if (!sd.serial().has_device()) {
77 AERROR <<
"Serial def has no device field.";
80 if (!sd.serial().has_baud_rate()) {
81 AERROR <<
"Serial def has no baud_rate field. Use default baud rate "
82 << sd.serial().baud_rate();
86 sd.serial().baud_rate());
88 case config::Stream::kTcp:
89 if (!sd.tcp().has_address()) {
90 AERROR <<
"tcp def has no address field.";
93 if (!sd.tcp().has_port()) {
94 AERROR <<
"tcp def has no port field.";
98 static_cast<uint16_t
>(sd.tcp().port()));
100 case config::Stream::kUdp:
101 if (!sd.udp().has_address()) {
102 AERROR <<
"tcp def has no address field.";
105 if (!sd.udp().has_port()) {
106 AERROR <<
"tcp def has no port field.";
110 static_cast<uint16_t
>(sd.udp().port()));
112 case config::Stream::kNtrip:
113 if (!sd.ntrip().has_address()) {
114 AERROR <<
"ntrip def has no address field.";
117 if (!sd.ntrip().has_port()) {
118 AERROR <<
"ntrip def has no port field.";
121 if (!sd.ntrip().has_mount_point()) {
122 AERROR <<
"ntrip def has no mount point field.";
125 if (!sd.ntrip().has_user()) {
126 AERROR <<
"ntrip def has no user field.";
129 if (!sd.ntrip().has_password()) {
130 AERROR <<
"ntrip def has no passwd field.";
134 sd.ntrip().address(),
static_cast<uint16_t
>(sd.ntrip().port()),
135 sd.ntrip().mount_point(), sd.ntrip().user(), sd.ntrip().password(),
136 sd.ntrip().timeout_s());
137 case config::Stream::kCanCardParameter:
138 if (!sd.can_card_parameter().has_brand()) {
139 AERROR <<
"can_card_parameter def has no brand field.";
142 if (!sd.can_card_parameter().has_type()) {
143 AERROR <<
"can_card_parameter def has no type field.";
146 if (!sd.can_card_parameter().has_channel_id()) {
147 AERROR <<
"can_card_parameter def has no channel_id field.";
150 if (!sd.can_card_parameter().has_channel_id()) {
151 AERROR <<
"can_card_parameter def has no channel_id field.";
161 const std::shared_ptr<apollo::cyber::Node> &node)
162 : config_(config), node_(node) {
163 data_parser_ptr_.reset(
new DataParser(config_, node_));
164 rtcm_parser_ptr_.reset(
new RtcmParser(config_, node_));
170 if (gpsbin_stream_ !=
nullptr) {
171 gpsbin_stream_->close();
173 if (data_thread_ptr_ !=
nullptr && data_thread_ptr_->joinable()) {
174 data_thread_ptr_->join();
176 if (rtk_thread_ptr_ !=
nullptr && rtk_thread_ptr_->joinable()) {
177 rtk_thread_ptr_->join();
182 CHECK_NOTNULL(data_parser_ptr_);
183 CHECK_NOTNULL(rtcm_parser_ptr_);
184 if (!data_parser_ptr_->Init()) {
185 AERROR <<
"Init data parser failed.";
188 if (!rtcm_parser_ptr_->Init()) {
189 AERROR <<
"Init rtcm parser failed.";
195 stream_writer_ = node_->CreateWriter<
StreamStatus>(FLAGS_stream_status_topic);
197 common::util::FillHeader(
"gnss", &stream_status_);
198 stream_writer_->Write(stream_status_);
202 if (!config_.has_data()) {
203 AINFO <<
"Error: Config file must provide the data stream.";
208 AERROR <<
"Failed to create data stream.";
211 data_stream_.reset(s);
215 AERROR <<
"Failed to create data stream status.";
218 data_stream_status_.reset(status);
220 if (config_.has_command()) {
223 AERROR <<
"Failed to create command stream.";
226 command_stream_.reset(s);
230 AERROR <<
"Failed to create command stream status.";
233 command_stream_status_.reset(status);
235 command_stream_ = data_stream_;
236 command_stream_status_ = data_stream_status_;
239 if (config_.has_rtk_from()) {
242 AERROR <<
"Failed to create rtk_from stream.";
245 in_rtk_stream_.reset(s);
247 if (config_.
rtk_from().has_push_location()) {
253 AERROR <<
"Failed to create rtk_from stream status.";
256 in_rtk_stream_status_.reset(status);
258 if (config_.has_rtk_to()) {
261 AERROR <<
"Failed to create rtk_to stream.";
264 out_rtk_stream_.reset(s);
268 AERROR <<
"Failed to create rtk_to stream status.";
271 out_rtk_stream_status_.reset(status);
273 out_rtk_stream_ = data_stream_;
274 out_rtk_stream_status_ = data_stream_status_;
277 if (config_.has_rtk_solution_type()) {
280 rtk_software_solution_ =
true;
286 AWARN <<
"No login_commands in config file.";
290 AWARN <<
"No logout_commands in config file.";
295 AERROR <<
"gnss driver connect failed.";
300 AERROR <<
"gnss driver login failed.";
305 gpsbin_stream_.reset(
new std::ofstream(
306 gpsbin_file, std::ios::app | std::ios::out | std::ios::binary));
307 stream_writer_ = node_->CreateWriter<
StreamStatus>(FLAGS_stream_status_topic);
308 raw_writer_ = node_->CreateWriter<
RawData>(FLAGS_gnss_raw_data_topic);
309 rtcm_writer_ = node_->CreateWriter<
RawData>(FLAGS_rtcm_data_topic);
312 reader_config.pending_queue_size = 100;
313 gpsbin_reader_ = node_->CreateReader<
RawData>(
314 reader_config, [&](
const std::shared_ptr<RawData> &raw_data) {
315 GpsbinCallback(raw_data);
317 chassis_reader_ = node_->CreateReader<
Chassis>(
319 [&](
const std::shared_ptr<Chassis> &chassis) { chassis_ptr_ = chassis; });
325 data_thread_ptr_.reset(
new std::thread(&RawStream::DataSpin,
this));
326 rtk_thread_ptr_.reset(
new std::thread(&RawStream::RtkSpin,
this));
327 if (config_.has_wheel_parameters()) {
329 1000, [
this]() { this->OnWheelVelocityTimer(); },
false));
330 wheel_velocity_timer_->Start();
334void RawStream::OnWheelVelocityTimer() {
335 if (chassis_ptr_ ==
nullptr) {
336 AINFO <<
"No chassis message received";
341 auto latency_ms = std::lround(latency_sec * 1000);
342 auto speed_cmps = std::lround(chassis_ptr_->speed_mps() * 100);
343 auto cmd_wheelvelocity = absl::StrCat(
"WHEELVELOCITY ", latency_ms,
344 " 100 0 0 0 0 0 ", speed_cmps,
"\r\n");
345 AINFO <<
"Write command: " << cmd_wheelvelocity;
346 command_stream_->write(cmd_wheelvelocity);
349bool RawStream::Connect() {
352 if (!data_stream_->Connect()) {
353 AERROR <<
"data stream connect failed.";
361 if (command_stream_) {
363 if (!command_stream_->Connect()) {
364 AERROR <<
"command stream connect failed.";
371 if (in_rtk_stream_) {
373 if (!in_rtk_stream_->Connect()) {
374 AERROR <<
"in rtk stream connect failed.";
384 if (out_rtk_stream_) {
386 if (!out_rtk_stream_->Connect()) {
387 AERROR <<
"out rtk stream connect failed.";
399bool RawStream::Disconnect() {
402 if (!data_stream_->Disconnect()) {
403 AERROR <<
"data stream disconnect failed.";
409 if (command_stream_) {
411 if (!command_stream_->Disconnect()) {
412 AERROR <<
"command stream disconnect failed.";
417 if (in_rtk_stream_) {
419 if (!in_rtk_stream_->Disconnect()) {
420 AERROR <<
"in rtk stream disconnect failed.";
425 if (out_rtk_stream_) {
427 if (!out_rtk_stream_->Disconnect()) {
428 AERROR <<
"out rtk stream disconnect failed.";
437bool RawStream::Login() {
438 std::vector<std::string> login_data;
439 for (
const auto &login_command : config_.login_commands()) {
440 data_stream_->write(login_command);
441 login_data.emplace_back(login_command);
442 AINFO <<
"Login command: " << login_command;
444 cyber::Duration(0.5).Sleep();
446 data_stream_->RegisterLoginData(login_data);
448 if (config_.has_wheel_parameters()) {
456bool RawStream::Logout() {
457 for (
const auto &logout_command : config_.logout_commands()) {
458 data_stream_->write(logout_command);
459 AINFO <<
"Logout command: " << logout_command;
464void RawStream::StreamStatusCheck() {
465 bool status_report =
false;
466 StreamStatus_Type report_stream_status;
469 (data_stream_->get_status() != data_stream_status_->status)) {
470 data_stream_status_->status = data_stream_->get_status();
471 status_report =
true;
473 stream_status_.set_ins_stream_type(report_stream_status);
476 if (in_rtk_stream_ &&
477 (in_rtk_stream_->get_status() != in_rtk_stream_status_->status)) {
478 in_rtk_stream_status_->status = in_rtk_stream_->get_status();
479 status_report =
true;
481 stream_status_.set_rtk_stream_in_type(report_stream_status);
484 if (out_rtk_stream_ &&
485 (out_rtk_stream_->get_status() != out_rtk_stream_status_->status)) {
486 out_rtk_stream_status_->status = out_rtk_stream_->get_status();
487 status_report =
true;
489 stream_status_.set_rtk_stream_out_type(report_stream_status);
493 common::util::FillHeader(
"gnss", &stream_status_);
494 stream_writer_->Write(stream_status_);
498void RawStream::DataSpin() {
499 common::util::FillHeader(
"gnss", &stream_status_);
500 stream_writer_->Write(stream_status_);
502 size_t length = data_stream_->read(buffer_, BUFFER_SIZE);
504 std::shared_ptr<RawData> msg_pub = std::make_shared<RawData>();
506 AERROR <<
"New data sting msg failed.";
509 msg_pub->set_data(
reinterpret_cast<const char *
>(buffer_), length);
510 raw_writer_->Write(msg_pub);
511 data_parser_ptr_->ParseRawData(msg_pub->data());
512 if (push_location_) {
520void RawStream::RtkSpin() {
521 if (in_rtk_stream_ ==
nullptr) {
525 size_t length = in_rtk_stream_->read(buffer_rtk_, BUFFER_SIZE);
527 if (rtk_software_solution_) {
528 PublishRtkData(length);
530 PublishRtkData(length);
531 if (out_rtk_stream_ ==
nullptr) {
534 size_t ret = out_rtk_stream_->write(buffer_rtk_, length);
536 AERROR <<
"Expect write out rtk stream bytes " << length
537 <<
" but got " << ret;
544void RawStream::PublishRtkData(
const size_t length) {
545 std::shared_ptr<RawData> rtk_msg = std::make_shared<RawData>();
546 CHECK_NOTNULL(rtk_msg);
547 rtk_msg->set_data(
reinterpret_cast<const char *
>(buffer_rtk_), length);
548 rtcm_writer_->Write(rtk_msg);
549 rtcm_parser_ptr_->ParseRtcmData(rtk_msg->data());
552void RawStream::PushGpgga(
const size_t length) {
553 if (!in_rtk_stream_) {
557 char *gpgga = strstr(
reinterpret_cast<char *
>(buffer_),
"$GPGGA");
559 char *p = strchr(gpgga,
'*');
562 if (
size_t(p -
reinterpret_cast<char *
>(buffer_)) <= length) {
564 in_rtk_stream_->write(
reinterpret_cast<uint8_t *
>(gpgga),
565 reinterpret_cast<uint8_t *
>(p) - buffer_);
571void RawStream::GpsbinCallback(
const std::shared_ptr<RawData const> &raw_data) {
572 if (gpsbin_stream_ ==
nullptr) {
575 gpsbin_stream_->write(raw_data->data().c_str(), raw_data->data().size());
static Time Now()
get the current time.
double ToSecond() const
convert time to second.
Used to perform oneshot or periodic timing tasks
RawStream(const config::Config &config, const std::shared_ptr< apollo::cyber::Node > &node)
static Stream * create_serial(const char *device_name, uint32_t baud_rate, uint32_t timeout_usec=0)
static Stream * create_udp(const char *address, uint16_t port, uint32_t timeout_usec=1000000)
static Stream * create_can(const apollo::drivers::canbus::CANCardParameter ¶meter)
static Stream * create_tcp(const char *address, uint16_t port, uint32_t timeout_usec=1000000)
static Stream * create_ntrip(const std::string &address, uint16_t port, const std::string &mountpoint, const std::string &user, const std::string &passwd, uint32_t timeout_s=30)
#define AINFO_EVERY(freq)
Some string util functions.
bool EnsureDirectory(const std::string &directory_path)
Check if a specified directory specified by directory_path exists.
Stream * create_stream(const config::Stream &sd)
void switch_stream_status(const apollo::drivers::gnss::Stream::Status &status, StreamStatus_Type *report_status_type)
std::string getLocalTimeFileStr(const std::string &gpsbin_folder)
repeated bytes logout_commands
optional RtkSolutionType rtk_solution_type
repeated bytes login_commands
optional string gpsbin_folder
optional string wheel_parameters
optional bool push_location