31constexpr bool is_zero(T value) {
32 return value ==
static_cast<T
>(0);
42 NtripStream(
const std::string& address, uint16_t port,
43 const std::string& mountpoint,
const std::string& user,
44 const std::string& passwd, uint32_t timeout_s);
47 virtual size_t read(uint8_t* buffer,
size_t max_length);
48 virtual size_t write(
const uint8_t* data,
size_t length);
54 bool is_login_ =
false;
55 const std::string mountpoint_;
56 const std::string write_data_prefix_;
57 const std::string login_data_;
58 double timeout_s_ = 60.0;
59 double data_active_s_ = 0.0;
60 std::unique_ptr<TcpStream> tcp_stream_;
61 std::mutex internal_mutex_;
65 const std::string& mountpoint,
const std::string& user,
66 const std::string& passwd, uint32_t timeout_s)
67 : mountpoint_(mountpoint),
68 write_data_prefix_(
"GET /" + mountpoint +
70 "User-Agent: NTRIP gnss_driver/0.0\r\n"
71 "accept: */* \r\n\r\n"),
73 login_data_(
"GET /" + mountpoint +
75 "User-Agent: NTRIP gnss_driver/0.0\r\n"
77 "Authorization: Basic " +
78 common::util::EncodeBase64(user +
":" + passwd) +
"\r\n\r\n"),
79 timeout_s_(timeout_s),
80 tcp_stream_(new
TcpStream(address.c_str(), port, 0, false)) {}
89 AERROR <<
"New tcp stream failed.";
93 if (!tcp_stream_->Connect()) {
95 AERROR <<
"Tcp connect failed.";
101 size_t try_times = 0;
103 size = tcp_stream_->write(
104 reinterpret_cast<const uint8_t*
>(login_data_.data()), login_data_.size());
105 if (size != login_data_.size()) {
106 tcp_stream_->Disconnect();
108 AERROR <<
"Send ntrip request failed.";
112 bzero(buffer,
sizeof(buffer));
113 AINFO <<
"Read ntrip response.";
114 size = tcp_stream_->read(buffer,
sizeof(buffer) - 1);
115 while ((size == 0) && (try_times < 3)) {
117 size = tcp_stream_->read(buffer,
sizeof(buffer) - 1);
122 tcp_stream_->Disconnect();
124 AERROR <<
"No response from ntripcaster.";
128 if (std::strstr(
reinterpret_cast<char*
>(buffer),
"ICY 200 OK\r\n")) {
131 AINFO <<
"Ntrip login successfully.";
135 if (std::strstr(
reinterpret_cast<char*
>(buffer),
"SOURCETABLE 200 OK\r\n")) {
136 AERROR <<
"Mountpoint " << mountpoint_ <<
" not exist.";
139 if (std::strstr(
reinterpret_cast<char*
>(buffer),
"HTTP/")) {
140 AERROR <<
"Authentication failed.";
143 AINFO <<
"No expect data.";
144 AINFO <<
"Recv data length: " << size;
147 tcp_stream_->Disconnect();
154 bool ret = tcp_stream_->Disconnect();
165void NtripStream::Reconnect() {
166 AINFO <<
"Reconnect ntrip caster.";
167 std::unique_lock<std::mutex> lock(internal_mutex_);
171 AINFO <<
"Reconnect ntrip caster failed.";
176 AINFO <<
"Reconnect ntrip caster success.";
193 if (is_zero(data_active_s_)) {
197 ret = tcp_stream_->read(buffer, max_length);
204 AINFO <<
"Ntrip timeout.";
215 std::unique_lock<std::mutex> lock(internal_mutex_, std::defer_lock);
216 if (!lock.try_lock()) {
217 AINFO <<
"Try lock failed.";
225 std::string data(
reinterpret_cast<const char*
>(buffer), length);
226 data = write_data_prefix_ + data;
227 size_t ret = tcp_stream_->write(
reinterpret_cast<const uint8_t*
>(data.data()),
229 if (ret != data.size()) {
230 AERROR <<
"Send ntrip data size " << data.size() <<
", return " << ret;
239 const std::string& mountpoint,
240 const std::string& user,
const std::string& passwd,
241 uint32_t timeout_s) {
242 return new NtripStream(address, port, mountpoint, user, passwd, timeout_s);
static Time Now()
get the current time.
double ToSecond() const
convert time to second.
virtual bool Disconnect()
NtripStream(const std::string &address, uint16_t port, const std::string &mountpoint, const std::string &user, const std::string &passwd, uint32_t timeout_s)
virtual size_t read(uint8_t *buffer, size_t max_length)
virtual size_t write(const uint8_t *data, size_t length)
static Stream * create_ntrip(const std::string &address, uint16_t port, const std::string &mountpoint, const std::string &user, const std::string &passwd, uint32_t timeout_s=30)
Some string util functions.