Apollo 10.0
自动驾驶开放平台
client.cc
浏览该文件的文档.
1
8
9#include <functional>
10#include <memory>
11#include <string>
12#include <utility>
13
14#include "boost/bind.hpp"
15
16#include "cyber/common/log.h"
20
21enum {
26};
27
28Client::Client(Node* node, Clients* clients, boost::asio::ip::tcp::socket s)
29 : node(*node), clients(*clients), socket(std::move(s)) {
30 auto endpoint = socket.remote_endpoint();
31 AINFO << "Client [" << endpoint.address() << ":" << endpoint.port()
32 << "] connected";
33}
34
36
38 socket.async_read_some(
39 boost::asio::buffer(temp, sizeof(temp)),
40 boost::bind(&Client::handle_read, shared_from_this(),
41 boost::asio::placeholders::error,
42 boost::asio::placeholders::bytes_transferred));
43}
44
45void Client::stop() { socket.close(); }
46
47void Client::handle_read(const boost::system::error_code& ec,
48 std::size_t size) {
49 if (!ec) {
50 ADEBUG << "Received " << size << " bytes";
51 buffer.insert(buffer.end(), temp, temp + size);
52
53 size_t size = buffer.size();
54
55 while (size >= sizeof(uint8_t)) {
56 if (buffer[0] == OP_REGISTER_DESC) {
57 handle_register_desc();
58 } else if (buffer[0] == OP_ADD_READER) {
59 handle_add_reader();
60 } else if (buffer[0] == OP_ADD_WRITER) {
61 handle_add_writer();
62 } else if (buffer[0] == OP_PUBLISH) {
63 handle_publish();
64 } else {
65 AERROR << "Unknown operation received from client ("
66 << uint32_t(buffer[0]) << "), disconnecting client";
67 clients.stop(shared_from_this());
68 return;
69 }
70
71 if (size == buffer.size()) {
72 break;
73 }
74 size = buffer.size();
75 }
76
77 start();
78 return;
79 }
80
81 if (ec == boost::asio::error::eof) {
82 // remote side closed connection
83 clients.stop(shared_from_this());
84 node.remove(shared_from_this());
85 return;
86 }
87
88 if (ec != boost::asio::error::operation_aborted) {
89 AERROR << "Client read failed, disconnecting" << ec;
90 clients.stop(shared_from_this());
91 node.remove(shared_from_this());
92 return;
93 }
94}
95
96void Client::handle_write(const boost::system::error_code& ec) {
97 if (ec) {
98 if (ec != boost::asio::error::operation_aborted) {
99 AERROR << "Client write failed, disconnecting" << ec;
100 clients.stop(shared_from_this());
101 node.remove(shared_from_this());
102 }
103 return;
104 }
105
106 std::lock_guard<std::mutex> lock(publish_mutex);
107 writing.clear();
108 if (!pending.empty()) {
109 writing.swap(pending);
110 boost::asio::async_write(
111 socket, boost::asio::buffer(writing.data(), writing.size()),
112 boost::bind(&Client::handle_write, shared_from_this(),
113 boost::asio::placeholders::error));
114 }
115}
116
117// [1] [count] [string] ... [string]
118void Client::handle_register_desc() {
119 if (sizeof(uint8_t) + sizeof(uint32_t) > buffer.size()) {
120 ADEBUG << "handle_register_desc too short";
121 return;
122 }
123
124 uint32_t count = get32le(1);
125
126 std::vector<std::string> desc;
127
128 bool complete = true;
129 size_t offset = sizeof(uint8_t) + sizeof(uint32_t);
130 for (uint32_t i = 0; i < count; i++) {
131 if (offset + sizeof(uint32_t) > buffer.size()) {
132 ADEBUG << "handle_register_desc too short";
133 complete = false;
134 break;
135 }
136
137 uint32_t size = get32le(offset);
138 offset += sizeof(uint32_t);
139
140 if (offset + size > buffer.size()) {
141 ADEBUG << "handle_register_desc too short";
142 complete = false;
143 break;
144 }
145
146 desc.push_back(std::string(reinterpret_cast<char*>(&buffer[offset]), size));
147 offset += size;
148 }
149
150 if (complete) {
151 ADEBUG << "OP_REGISTER_DESC, count = " << count;
152
153 auto factory = apollo::cyber::message::ProtobufFactory::Instance();
154 for (const auto& s : desc) {
155 factory->RegisterPythonMessage(s);
156 }
157
158 buffer.erase(buffer.begin(), buffer.begin() + offset);
159 }
160}
161
162// [2] [channel] [type]
163void Client::handle_add_reader() {
164 if (sizeof(uint8_t) + 2 * sizeof(uint32_t) > buffer.size()) {
165 ADEBUG << "handle_add_reader too short header";
166 return;
167 }
168
169 size_t offset = sizeof(uint8_t);
170
171 uint32_t channel_length = get32le(offset);
172 offset += sizeof(uint32_t);
173 if (offset + channel_length > buffer.size()) {
174 ADEBUG << "handle_add_reader short1 " << offset + channel_length << " "
175 << buffer.size();
176 return;
177 }
178
179 std::string channel(reinterpret_cast<char*>(&buffer[offset]), channel_length);
180 offset += channel_length;
181
182 uint32_t type_length = get32le(offset);
183 offset += sizeof(uint32_t);
184 if (offset + type_length > buffer.size()) {
185 ADEBUG << "handle_add_reader short2 " << offset + type_length << " "
186 << buffer.size();
187 return;
188 }
189
190 std::string type(reinterpret_cast<char*>(&buffer[offset]), type_length);
191 offset += type_length;
192
193 ADEBUG << "OP_NEW_READER, channel = " << channel << ", type = " << type;
194
195 node.add_reader(channel, type, shared_from_this());
196
197 buffer.erase(buffer.begin(), buffer.begin() + offset);
198}
199
200// [3] [channel] [type]
201void Client::handle_add_writer() {
202 if (sizeof(uint8_t) + 2 * sizeof(uint32_t) > buffer.size()) {
203 ADEBUG << "handle_new_writer too short header";
204 return;
205 }
206
207 size_t offset = sizeof(uint8_t);
208
209 uint32_t channel_length = get32le(offset);
210 offset += sizeof(uint32_t);
211 if (offset + channel_length > buffer.size()) {
212 ADEBUG << "handle_new_writer short1 " << offset + channel_length << " "
213 << buffer.size();
214 return;
215 }
216
217 std::string channel(reinterpret_cast<char*>(&buffer[offset]), channel_length);
218 offset += channel_length;
219
220 uint32_t type_length = get32le(offset);
221 offset += sizeof(uint32_t);
222 if (offset + type_length > buffer.size()) {
223 ADEBUG << "handle_new_writer short2 " << offset + type_length << " "
224 << buffer.size();
225 return;
226 }
227
228 std::string type(reinterpret_cast<char*>(&buffer[offset]), type_length);
229 offset += type_length;
230
231 ADEBUG << "OP_NEW_WRITER, channel = " << channel << ", type = " << type;
232
233 node.add_writer(channel, type, shared_from_this());
234
235 buffer.erase(buffer.begin(), buffer.begin() + offset);
236}
237
238// [4] [channel] [message]
239void Client::handle_publish() {
240 if (sizeof(uint8_t) + 2 * sizeof(uint32_t) > buffer.size()) {
241 return;
242 }
243
244 size_t offset = sizeof(uint8_t);
245
246 uint32_t channel_length = get32le(offset);
247 offset += sizeof(uint32_t);
248 if (offset + channel_length > buffer.size()) {
249 return;
250 }
251
252 std::string channel(reinterpret_cast<char*>(&buffer[offset]), channel_length);
253 offset += channel_length;
254
255 uint32_t message_length = get32le(offset);
256 offset += sizeof(uint32_t);
257 if (offset + message_length > buffer.size()) {
258 return;
259 }
260
261 std::string message(reinterpret_cast<char*>(&buffer[offset]), message_length);
262 offset += message_length;
263
264 ADEBUG << "OP_PUBLISH, channel = " << channel;
265
266 node.publish(channel, message);
267
268 buffer.erase(buffer.begin(), buffer.begin() + offset);
269}
270
271void fill_data(std::vector<uint8_t>* data, const std::string& channel,
272 const std::string& msg) {
273 data->reserve(data->size() + sizeof(uint8_t) + sizeof(uint32_t) +
274 channel.size() + sizeof(uint32_t) + msg.size());
275
276 data->push_back(OP_PUBLISH);
277
278 data->push_back(uint8_t(channel.size() >> 0));
279 data->push_back(uint8_t(channel.size() >> 8));
280 data->push_back(uint8_t(channel.size() >> 16));
281 data->push_back(uint8_t(channel.size() >> 24));
282 const uint8_t* channel_data =
283 reinterpret_cast<const uint8_t*>(channel.data());
284 data->insert(data->end(), channel_data, channel_data + channel.size());
285
286 data->push_back(uint8_t(msg.size() >> 0));
287 data->push_back(uint8_t(msg.size() >> 8));
288 data->push_back(uint8_t(msg.size() >> 16));
289 data->push_back(uint8_t(msg.size() >> 24));
290 const uint8_t* msg_data = reinterpret_cast<const uint8_t*>(msg.data());
291 data->insert(data->end(), msg_data, msg_data + msg.size());
292}
293
294void Client::publish(const std::string& channel, const std::string& msg) {
295 std::lock_guard<std::mutex> lock(publish_mutex);
296 if (writing.empty()) {
297 fill_data(&writing, channel, msg);
298 boost::asio::async_write(
299 socket, boost::asio::buffer(writing.data(), writing.size()),
300 boost::bind(&Client::handle_write, shared_from_this(),
301 boost::asio::placeholders::error));
302 } else if (pending.size() < MAX_PENDING_SIZE) {
303 fill_data(&pending, channel, msg);
304 } else {
305 // If pending size is larger than MAX_PENDING_SIZE, discard the message.
306 AERROR << "Pending size too large. Discard message.";
307 }
308}
309
310uint32_t Client::get32le(size_t offset) const {
311 return buffer[offset + 0] | (buffer[offset + 1] << 8) |
312 (buffer[offset + 2] << 16) | (buffer[offset + 3] << 24);
313}
Client(Node *node, Clients *clients, boost::asio::ip::tcp::socket socket)
Definition client.cc:28
void publish(const std::string &channel, const std::string &msg)
Definition client.cc:294
void start()
Definition client.cc:37
~Client()
Definition client.cc:35
void stop()
Definition client.cc:45
void stop(std::shared_ptr< Client > client)
Definition clients.cc:22
Definition node.h:31
void add_writer(const std::string &channel, const std::string &type, std::shared_ptr< Client > client)
Definition node.cc:93
void remove(std::shared_ptr< Client > client)
Definition node.cc:26
void add_reader(const std::string &channel, const std::string &type, std::shared_ptr< Client > client)
Definition node.cc:60
void publish(const std::string &channel, const std::string &data)
Definition node.cc:124
@ OP_ADD_WRITER
Definition client.cc:24
@ OP_PUBLISH
Definition client.cc:25
@ OP_REGISTER_DESC
Definition client.cc:22
@ OP_ADD_READER
Definition client.cc:23
void fill_data(std::vector< uint8_t > *data, const std::string &channel, const std::string &msg)
Definition client.cc:271
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
Definition future.h:29