14#include "boost/bind.hpp"
29 : node(*node), clients(*clients), socket(
std::move(s)) {
30 auto endpoint = socket.remote_endpoint();
31 AINFO <<
"Client [" << endpoint.address() <<
":" << endpoint.port()
38 socket.async_read_some(
39 boost::asio::buffer(temp,
sizeof(temp)),
40 boost::bind(&Client::handle_read, shared_from_this(),
41 boost::asio::placeholders::error,
42 boost::asio::placeholders::bytes_transferred));
47void Client::handle_read(
const boost::system::error_code& ec,
50 ADEBUG <<
"Received " << size <<
" bytes";
51 buffer.insert(buffer.end(), temp, temp + size);
53 size_t size = buffer.size();
55 while (size >=
sizeof(uint8_t)) {
57 handle_register_desc();
65 AERROR <<
"Unknown operation received from client ("
66 << uint32_t(buffer[0]) <<
"), disconnecting client";
67 clients.
stop(shared_from_this());
71 if (size == buffer.size()) {
81 if (ec == boost::asio::error::eof) {
83 clients.
stop(shared_from_this());
84 node.
remove(shared_from_this());
88 if (ec != boost::asio::error::operation_aborted) {
89 AERROR <<
"Client read failed, disconnecting" << ec;
90 clients.
stop(shared_from_this());
91 node.
remove(shared_from_this());
96void Client::handle_write(
const boost::system::error_code& ec) {
98 if (ec != boost::asio::error::operation_aborted) {
99 AERROR <<
"Client write failed, disconnecting" << ec;
100 clients.
stop(shared_from_this());
101 node.
remove(shared_from_this());
106 std::lock_guard<std::mutex> lock(publish_mutex);
108 if (!pending.empty()) {
109 writing.swap(pending);
110 boost::asio::async_write(
111 socket, boost::asio::buffer(writing.data(), writing.size()),
112 boost::bind(&Client::handle_write, shared_from_this(),
113 boost::asio::placeholders::error));
118void Client::handle_register_desc() {
119 if (
sizeof(uint8_t) +
sizeof(uint32_t) > buffer.size()) {
120 ADEBUG <<
"handle_register_desc too short";
124 uint32_t count = get32le(1);
126 std::vector<std::string> desc;
128 bool complete =
true;
129 size_t offset =
sizeof(uint8_t) +
sizeof(uint32_t);
130 for (uint32_t i = 0; i < count; i++) {
131 if (offset +
sizeof(uint32_t) > buffer.size()) {
132 ADEBUG <<
"handle_register_desc too short";
137 uint32_t size = get32le(offset);
138 offset +=
sizeof(uint32_t);
140 if (offset + size > buffer.size()) {
141 ADEBUG <<
"handle_register_desc too short";
146 desc.push_back(std::string(
reinterpret_cast<char*
>(&buffer[offset]), size));
151 ADEBUG <<
"OP_REGISTER_DESC, count = " << count;
153 auto factory = apollo::cyber::message::ProtobufFactory::Instance();
154 for (
const auto& s : desc) {
155 factory->RegisterPythonMessage(s);
158 buffer.erase(buffer.begin(), buffer.begin() + offset);
163void Client::handle_add_reader() {
164 if (
sizeof(uint8_t) + 2 *
sizeof(uint32_t) > buffer.size()) {
165 ADEBUG <<
"handle_add_reader too short header";
169 size_t offset =
sizeof(uint8_t);
171 uint32_t channel_length = get32le(offset);
172 offset +=
sizeof(uint32_t);
173 if (offset + channel_length > buffer.size()) {
174 ADEBUG <<
"handle_add_reader short1 " << offset + channel_length <<
" "
179 std::string channel(
reinterpret_cast<char*
>(&buffer[offset]), channel_length);
180 offset += channel_length;
182 uint32_t type_length = get32le(offset);
183 offset +=
sizeof(uint32_t);
184 if (offset + type_length > buffer.size()) {
185 ADEBUG <<
"handle_add_reader short2 " << offset + type_length <<
" "
190 std::string type(
reinterpret_cast<char*
>(&buffer[offset]), type_length);
191 offset += type_length;
193 ADEBUG <<
"OP_NEW_READER, channel = " << channel <<
", type = " << type;
195 node.
add_reader(channel, type, shared_from_this());
197 buffer.erase(buffer.begin(), buffer.begin() + offset);
201void Client::handle_add_writer() {
202 if (
sizeof(uint8_t) + 2 *
sizeof(uint32_t) > buffer.size()) {
203 ADEBUG <<
"handle_new_writer too short header";
207 size_t offset =
sizeof(uint8_t);
209 uint32_t channel_length = get32le(offset);
210 offset +=
sizeof(uint32_t);
211 if (offset + channel_length > buffer.size()) {
212 ADEBUG <<
"handle_new_writer short1 " << offset + channel_length <<
" "
217 std::string channel(
reinterpret_cast<char*
>(&buffer[offset]), channel_length);
218 offset += channel_length;
220 uint32_t type_length = get32le(offset);
221 offset +=
sizeof(uint32_t);
222 if (offset + type_length > buffer.size()) {
223 ADEBUG <<
"handle_new_writer short2 " << offset + type_length <<
" "
228 std::string type(
reinterpret_cast<char*
>(&buffer[offset]), type_length);
229 offset += type_length;
231 ADEBUG <<
"OP_NEW_WRITER, channel = " << channel <<
", type = " << type;
233 node.
add_writer(channel, type, shared_from_this());
235 buffer.erase(buffer.begin(), buffer.begin() + offset);
239void Client::handle_publish() {
240 if (
sizeof(uint8_t) + 2 *
sizeof(uint32_t) > buffer.size()) {
244 size_t offset =
sizeof(uint8_t);
246 uint32_t channel_length = get32le(offset);
247 offset +=
sizeof(uint32_t);
248 if (offset + channel_length > buffer.size()) {
252 std::string channel(
reinterpret_cast<char*
>(&buffer[offset]), channel_length);
253 offset += channel_length;
255 uint32_t message_length = get32le(offset);
256 offset +=
sizeof(uint32_t);
257 if (offset + message_length > buffer.size()) {
261 std::string message(
reinterpret_cast<char*
>(&buffer[offset]), message_length);
262 offset += message_length;
264 ADEBUG <<
"OP_PUBLISH, channel = " << channel;
266 node.
publish(channel, message);
268 buffer.erase(buffer.begin(), buffer.begin() + offset);
271void fill_data(std::vector<uint8_t>* data,
const std::string& channel,
272 const std::string& msg) {
273 data->reserve(data->size() +
sizeof(uint8_t) +
sizeof(uint32_t) +
274 channel.size() +
sizeof(uint32_t) + msg.size());
278 data->push_back(uint8_t(channel.size() >> 0));
279 data->push_back(uint8_t(channel.size() >> 8));
280 data->push_back(uint8_t(channel.size() >> 16));
281 data->push_back(uint8_t(channel.size() >> 24));
282 const uint8_t* channel_data =
283 reinterpret_cast<const uint8_t*
>(channel.data());
284 data->insert(data->end(), channel_data, channel_data + channel.size());
286 data->push_back(uint8_t(msg.size() >> 0));
287 data->push_back(uint8_t(msg.size() >> 8));
288 data->push_back(uint8_t(msg.size() >> 16));
289 data->push_back(uint8_t(msg.size() >> 24));
290 const uint8_t* msg_data =
reinterpret_cast<const uint8_t*
>(msg.data());
291 data->insert(data->end(), msg_data, msg_data + msg.size());
295 std::lock_guard<std::mutex> lock(publish_mutex);
296 if (writing.empty()) {
298 boost::asio::async_write(
299 socket, boost::asio::buffer(writing.data(), writing.size()),
300 boost::bind(&Client::handle_write, shared_from_this(),
301 boost::asio::placeholders::error));
302 }
else if (pending.size() < MAX_PENDING_SIZE) {
306 AERROR <<
"Pending size too large. Discard message.";
310uint32_t Client::get32le(
size_t offset)
const {
311 return buffer[offset + 0] | (buffer[offset + 1] << 8) |
312 (buffer[offset + 2] << 16) | (buffer[offset + 3] << 24);
Client(Node *node, Clients *clients, boost::asio::ip::tcp::socket socket)
void publish(const std::string &channel, const std::string &msg)
void stop(std::shared_ptr< Client > client)
void add_writer(const std::string &channel, const std::string &type, std::shared_ptr< Client > client)
void remove(std::shared_ptr< Client > client)
void add_reader(const std::string &channel, const std::string &type, std::shared_ptr< Client > client)
void publish(const std::string &channel, const std::string &data)
void fill_data(std::vector< uint8_t > *data, const std::string &channel, const std::string &msg)