Apollo 10.0
自动驾驶开放平台
ntrip_stream.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2017 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#include <unistd.h>
18#include <iostream>
19#include <mutex>
20
21#include "cyber/cyber.h"
22
27
28namespace {
29
30template <typename T>
31constexpr bool is_zero(T value) {
32 return value == static_cast<T>(0);
33}
34} // namespace
35
36namespace apollo {
37namespace drivers {
38namespace gnss {
39
40class NtripStream : public Stream {
41 public:
42 NtripStream(const std::string& address, uint16_t port,
43 const std::string& mountpoint, const std::string& user,
44 const std::string& passwd, uint32_t timeout_s);
46
47 virtual size_t read(uint8_t* buffer, size_t max_length);
48 virtual size_t write(const uint8_t* data, size_t length);
49 virtual bool Connect();
50 virtual bool Disconnect();
51
52 private:
53 void Reconnect();
54 bool is_login_ = false;
55 const std::string mountpoint_;
56 const std::string write_data_prefix_;
57 const std::string login_data_;
58 double timeout_s_ = 60.0;
59 double data_active_s_ = 0.0;
60 std::unique_ptr<TcpStream> tcp_stream_;
61 std::mutex internal_mutex_;
62};
63
64NtripStream::NtripStream(const std::string& address, uint16_t port,
65 const std::string& mountpoint, const std::string& user,
66 const std::string& passwd, uint32_t timeout_s)
67 : mountpoint_(mountpoint),
68 write_data_prefix_("GET /" + mountpoint +
69 " HTTP/1.0\r\n"
70 "User-Agent: NTRIP gnss_driver/0.0\r\n"
71 "accept: */* \r\n\r\n"),
72
73 login_data_("GET /" + mountpoint +
74 " HTTP/1.0\r\n"
75 "User-Agent: NTRIP gnss_driver/0.0\r\n"
76 "accept: */* \r\n"
77 "Authorization: Basic " +
78 common::util::EncodeBase64(user + ":" + passwd) + "\r\n\r\n"),
79 timeout_s_(timeout_s),
80 tcp_stream_(new TcpStream(address.c_str(), port, 0, false)) {}
81
83
85 if (is_login_) {
86 return true;
87 }
88 if (!tcp_stream_) {
89 AERROR << "New tcp stream failed.";
90 return true;
91 }
92
93 if (!tcp_stream_->Connect()) {
95 AERROR << "Tcp connect failed.";
96 return false;
97 }
98
99 uint8_t buffer[2048];
100 size_t size = 0;
101 size_t try_times = 0;
102
103 size = tcp_stream_->write(
104 reinterpret_cast<const uint8_t*>(login_data_.data()), login_data_.size());
105 if (size != login_data_.size()) {
106 tcp_stream_->Disconnect();
108 AERROR << "Send ntrip request failed.";
109 return false;
110 }
111
112 bzero(buffer, sizeof(buffer));
113 AINFO << "Read ntrip response.";
114 size = tcp_stream_->read(buffer, sizeof(buffer) - 1);
115 while ((size == 0) && (try_times < 3)) {
116 sleep(1);
117 size = tcp_stream_->read(buffer, sizeof(buffer) - 1);
118 ++try_times;
119 }
120
121 if (!size) {
122 tcp_stream_->Disconnect();
124 AERROR << "No response from ntripcaster.";
125 return false;
126 }
127
128 if (std::strstr(reinterpret_cast<char*>(buffer), "ICY 200 OK\r\n")) {
130 is_login_ = true;
131 AINFO << "Ntrip login successfully.";
132 return true;
133 }
134
135 if (std::strstr(reinterpret_cast<char*>(buffer), "SOURCETABLE 200 OK\r\n")) {
136 AERROR << "Mountpoint " << mountpoint_ << " not exist.";
137 }
138
139 if (std::strstr(reinterpret_cast<char*>(buffer), "HTTP/")) {
140 AERROR << "Authentication failed.";
141 }
142
143 AINFO << "No expect data.";
144 AINFO << "Recv data length: " << size;
145 // AINFO << "Data from server: " << reinterpret_cast<char*>(buffer);
146
147 tcp_stream_->Disconnect();
149 return false;
150}
151
153 if (is_login_) {
154 bool ret = tcp_stream_->Disconnect();
155 if (!ret) {
156 return false;
157 }
159 is_login_ = false;
160 }
161
162 return true;
163}
164
165void NtripStream::Reconnect() {
166 AINFO << "Reconnect ntrip caster.";
167 std::unique_lock<std::mutex> lock(internal_mutex_);
168 Disconnect();
169 Connect();
171 AINFO << "Reconnect ntrip caster failed.";
172 return;
173 }
174
175 data_active_s_ = cyber::Time::Now().ToSecond();
176 AINFO << "Reconnect ntrip caster success.";
177}
178
179size_t NtripStream::read(uint8_t* buffer, size_t max_length) {
180 if (!tcp_stream_) {
181 return 0;
182 }
183
184 size_t ret = 0;
185
186 if (tcp_stream_->get_status() != Stream::Status::CONNECTED) {
187 Reconnect();
189 return 0;
190 }
191 }
192
193 if (is_zero(data_active_s_)) {
194 data_active_s_ = cyber::Time::Now().ToSecond();
195 }
196
197 ret = tcp_stream_->read(buffer, max_length);
198 if (ret) {
199 data_active_s_ = cyber::Time::Now().ToSecond();
200 }
201
202 // timeout detect
203 if ((cyber::Time::Now().ToSecond() - data_active_s_) > timeout_s_) {
204 AINFO << "Ntrip timeout.";
205 Reconnect();
206 }
207
208 return ret;
209}
210
211size_t NtripStream::write(const uint8_t* buffer, size_t length) {
212 if (!tcp_stream_) {
213 return 0;
214 }
215 std::unique_lock<std::mutex> lock(internal_mutex_, std::defer_lock);
216 if (!lock.try_lock()) {
217 AINFO << "Try lock failed.";
218 return 0;
219 }
220
221 if (tcp_stream_->get_status() != Stream::Status::CONNECTED) {
222 return 0;
223 }
224
225 std::string data(reinterpret_cast<const char*>(buffer), length);
226 data = write_data_prefix_ + data;
227 size_t ret = tcp_stream_->write(reinterpret_cast<const uint8_t*>(data.data()),
228 data.size());
229 if (ret != data.size()) {
230 AERROR << "Send ntrip data size " << data.size() << ", return " << ret;
232 return 0;
233 }
234
235 return length;
236}
237
238Stream* Stream::create_ntrip(const std::string& address, uint16_t port,
239 const std::string& mountpoint,
240 const std::string& user, const std::string& passwd,
241 uint32_t timeout_s) {
242 return new NtripStream(address, port, mountpoint, user, passwd, timeout_s);
243}
244
245} // namespace gnss
246} // namespace drivers
247} // namespace apollo
static Time Now()
get the current time.
Definition time.cc:57
double ToSecond() const
convert time to second.
Definition time.cc:77
NtripStream(const std::string &address, uint16_t port, const std::string &mountpoint, const std::string &user, const std::string &passwd, uint32_t timeout_s)
virtual size_t read(uint8_t *buffer, size_t max_length)
virtual size_t write(const uint8_t *data, size_t length)
static Stream * create_ntrip(const std::string &address, uint16_t port, const std::string &mountpoint, const std::string &user, const std::string &passwd, uint32_t timeout_s=30)
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
Some util functions.
class register implement
Definition arena_queue.h:37
Some string util functions.