Apollo 10.0
自动驾驶开放平台
websocket_handler.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2017 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include "cyber/common/log.h"
21
22namespace apollo {
23namespace dreamview {
24
25using apollo::common::util::ContainsKey;
26
27void WebSocketHandler::handleReadyState(CivetServer *server, Connection *conn) {
28 {
29 std::unique_lock<std::mutex> lock(mutex_);
30 connections_.emplace(conn, std::make_shared<std::mutex>());
31 }
32 AINFO << name_
33 << ": Accepted connection. Total connections: " << connections_.size();
34
35 // Trigger registered new connection handlers.
36 for (const auto handler : connection_ready_handlers_) {
37 handler(conn);
38 }
39}
40
41void WebSocketHandler::handleClose(CivetServer *server,
42 const Connection *conn) {
43 // Remove from the store of currently open connections. Copy the mutex out
44 // so that it won't be reclaimed during map.erase().
45 Connection *connection = const_cast<Connection *>(conn);
46
47 std::shared_ptr<std::mutex> connection_lock;
48 {
49 std::unique_lock<std::mutex> lock(mutex_);
50 connection_lock = connections_[connection];
51 }
52
53 {
54 // Make sure there's no data being sent via the connection
55 std::unique_lock<std::mutex> lock_connection(*connection_lock);
56 std::unique_lock<std::mutex> lock(mutex_);
57 connections_.erase(connection);
58 }
59 // send close frame
60 int ret = mg_websocket_write(connection, 0x8, "", 0);
61 // Determine error message based on return value.
62 AWARN << name_ << ": Failed to send clase frame. Reason";
63 if (ret == 0) {
64 AWARN << "Connection closed";
65 } else if (ret < 0) {
66 AWARN << "Send error: " << std::strerror(errno);
67 } else {
68 AWARN << "Bytes to send: expected 2, actual: " << ret;
69 }
70
71 AINFO << name_
72 << ": Connection closed. Total connections: " << connections_.size();
73}
74
75bool WebSocketHandler::BroadcastData(const std::string &data, bool skippable) {
76 std::vector<Connection *> connections_to_send;
77 {
78 std::unique_lock<std::mutex> lock(mutex_);
79 if (connections_.empty()) {
80 return true;
81 }
82 for (auto &kv : connections_) {
83 Connection *conn = kv.first;
84 connections_to_send.push_back(conn);
85 }
86 }
87
88 bool all_success = true;
89 for (Connection *conn : connections_to_send) {
90 if (!SendData(conn, data, skippable)) {
91 all_success = false;
92 }
93 }
94
95 return all_success;
96}
97
98bool WebSocketHandler::BroadcastBinaryData(const std::string &data,
99 bool skippable) {
100 std::vector<Connection *> connections_to_send;
101 {
102 std::unique_lock<std::mutex> lock(mutex_);
103 if (connections_.empty()) {
104 return true;
105 }
106 for (auto &kv : connections_) {
107 Connection *conn = kv.first;
108 connections_to_send.push_back(conn);
109 }
110 }
111
112 bool all_success = true;
113 for (Connection *conn : connections_to_send) {
114 if (!SendData(conn, data, skippable, MG_WEBSOCKET_OPCODE_BINARY)) {
115 all_success = false;
116 }
117 }
118
119 return all_success;
120}
121
122bool WebSocketHandler::SendBinaryData(Connection *conn, const std::string &data,
123 bool skippable) {
124 return SendData(conn, data, skippable, MG_WEBSOCKET_OPCODE_BINARY);
125}
126
127bool WebSocketHandler::SendData(Connection *conn, const std::string &data,
128 bool skippable, int op_code) {
129 std::shared_ptr<std::mutex> connection_lock;
130 {
131 std::unique_lock<std::mutex> lock(mutex_);
132 if (!ContainsKey(connections_, conn)) {
133 AERROR << name_
134 << ": Trying to send to an uncached connection, skipping.";
135 return false;
136 }
137 // Copy the lock so that it still exists if the connection is closed after
138 // this block.
139 connection_lock = connections_[conn];
140 }
141
142 // Lock the connection while sending.
143 if (!connection_lock->try_lock()) {
144 // Skip sending data if:
145 // 1. Data is skippable according to sender and there's higher priority data
146 // being sent.
147 // 2. The connection has been closed.
148 if (skippable) {
149 AWARN << "Skip sending a droppable message!";
150 return false;
151 }
152 // Block to acquire the lock.
153 connection_lock->lock();
154 std::unique_lock<std::mutex> lock(mutex_);
155 if (!ContainsKey(connections_, conn)) {
156 return false;
157 }
158 }
159
160 // Note that while we are holding the connection lock, the connection won't be
161 // closed and removed.
162 int ret = mg_websocket_write(conn, op_code, data.c_str(), data.size());
163 connection_lock->unlock();
164
165 if (ret != static_cast<int>(data.size())) {
166 // When data is empty, the header length (2) is returned.
167 if (data.empty() && ret == 2) {
168 return true;
169 }
170
171 // Determine error message based on return value.
172 AWARN << name_ << ": Failed to send data via websocket connection. Reason";
173 if (ret == 0) {
174 AWARN << "Connection closed";
175 } else if (ret < 0) {
176 AWARN << "Send error: " << std::strerror(errno);
177 } else {
178 AWARN << "Bytes to send: expected " << data.size() << ", actual: " << ret;
179 }
180 return false;
181 }
182
183 return true;
184}
185
186thread_local unsigned char WebSocketHandler::current_opcode_ = 0x00;
187thread_local std::stringstream WebSocketHandler::data_;
188
189bool WebSocketHandler::handleData(CivetServer *server, Connection *conn,
190 int bits, char *data, size_t data_len) {
191 // Ignore connection close request.
192 if ((bits & 0x0F) == MG_WEBSOCKET_OPCODE_CONNECTION_CLOSE) {
193 return false;
194 }
195
196 data_.write(data, data_len);
197 if (current_opcode_ == 0x00) {
198 current_opcode_ = bits & 0x7f;
199 }
200
201 bool result = true;
202
203 // The FIN bit (the left most significant bit) is used to indicates
204 // the final fragment in a message. Note, the first fragment MAY
205 // also be the final fragment.
206 bool is_final_fragment = bits & 0x80;
207 if (is_final_fragment) {
208 switch (current_opcode_) {
209 case MG_WEBSOCKET_OPCODE_TEXT:
210 result = handleJsonData(conn, data_.str());
211 break;
212 case MG_WEBSOCKET_OPCODE_BINARY:
213 result = handleBinaryData(conn, data_.str());
214 break;
215 default:
216 AERROR << name_ << ": Unknown WebSocket bits flag: " << bits;
217 break;
218 }
219
220 // reset opcode and data
221 current_opcode_ = 0x00;
222 data_.clear();
223 data_.str(std::string());
224 }
225
226 return result;
227}
228
230 const std::string &data) {
231 Json json;
232 try {
233 json = Json::parse(data.begin(), data.end());
234 } catch (const std::exception &e) {
235 AERROR << "Failed to parse JSON data: " << e.what();
236 return false;
237 }
238
239 if (!ContainsKey(json, "type")) {
240 AERROR << "Received JSON data without type field: " << json;
241 return true;
242 }
243
244 auto type = json["type"];
245 if (!ContainsKey(message_handlers_, type)) {
246 AERROR << "No message handler found for message type " << type
247 << ". The message will be discarded!";
248 return true;
249 }
250 message_handlers_[type](json, conn);
251 return true;
252}
253
255 const std::string &data) {
256 auto type = "Binary";
257 message_handlers_[type](data, conn);
258 return true;
259}
260
261} // namespace dreamview
262} // namespace apollo
bool BroadcastData(const std::string &data, bool skippable=false)
Sends the provided data to all the connected clients.
bool handleData(CivetServer *server, Connection *conn, int bits, char *data, size_t data_len) override
Callback method for when a data frame has been received from the client.
bool BroadcastBinaryData(const std::string &data, bool skippable=false)
Sends the provided binary data to all the connected clients.
bool SendData(Connection *conn, const std::string &data, bool skippable=false, int op_code=MG_WEBSOCKET_OPCODE_TEXT)
Sends the provided data to a specific connected client.
bool handleBinaryData(Connection *conn, const std::string &data)
void handleClose(CivetServer *server, const Connection *conn) override
Callback method for when the connection is closed.
bool handleJsonData(Connection *conn, const std::string &data)
void handleReadyState(CivetServer *server, Connection *conn) override
Callback method for when websocket handshake is successfully completed, and connection is ready for d...
bool SendBinaryData(Connection *conn, const std::string &data, bool skippable=false)
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
#define AWARN
Definition log.h:43
Some map util functions.
class register implement
Definition arena_queue.h:37