Apollo 10.0
自动驾驶开放平台
apollo::dreamview::WebSocketHandler类 参考

The WebSocketHandler, built on top of CivetWebSocketHandler, is a websocket handler that handles different types of websocket related events. 更多...

#include <websocket_handler.h>

类 apollo::dreamview::WebSocketHandler 继承关系图:
apollo::dreamview::WebSocketHandler 的协作图:

Public 类型

using Json = nlohmann::json
 
using Connection = mg_connection
 
using MessageHandler = std::function< void(const Json &, Connection *)>
 
using ConnectionReadyHandler = std::function< void(Connection *)>
 

Public 成员函数

 WebSocketHandler (const std::string &name)
 
bool handleConnection (CivetServer *server, const Connection *conn) override
 Callback method for when the client intends to establish a websocket connection, before websocket handshake.
 
void handleReadyState (CivetServer *server, Connection *conn) override
 Callback method for when websocket handshake is successfully completed, and connection is ready for data exchange.
 
bool handleData (CivetServer *server, Connection *conn, int bits, char *data, size_t data_len) override
 Callback method for when a data frame has been received from the client.
 
bool handleJsonData (Connection *conn, const std::string &data)
 
bool handleBinaryData (Connection *conn, const std::string &data)
 
void handleClose (CivetServer *server, const Connection *conn) override
 Callback method for when the connection is closed.
 
bool BroadcastData (const std::string &data, bool skippable=false)
 Sends the provided data to all the connected clients.
 
bool BroadcastBinaryData (const std::string &data, bool skippable=false)
 Sends the provided binary data to all the connected clients.
 
bool SendData (Connection *conn, const std::string &data, bool skippable=false, int op_code=MG_WEBSOCKET_OPCODE_TEXT)
 Sends the provided data to a specific connected client.
 
bool SendBinaryData (Connection *conn, const std::string &data, bool skippable=false)
 
void RegisterMessageHandler (std::string type, MessageHandler handler)
 Add a new message handler for a message type.
 
void RegisterConnectionReadyHandler (ConnectionReadyHandler handler)
 Add a new handler for new connections.
 

详细描述

The WebSocketHandler, built on top of CivetWebSocketHandler, is a websocket handler that handles different types of websocket related events.

在文件 websocket_handler.h46 行定义.

成员类型定义说明

◆ Connection

在文件 websocket_handler.h54 行定义.

◆ ConnectionReadyHandler

在文件 websocket_handler.h56 行定义.

◆ Json

在文件 websocket_handler.h53 行定义.

◆ MessageHandler

using apollo::dreamview::WebSocketHandler::MessageHandler = std::function<void(const Json &, Connection *)>

在文件 websocket_handler.h55 行定义.

构造及析构函数说明

◆ WebSocketHandler()

apollo::dreamview::WebSocketHandler::WebSocketHandler ( const std::string &  name)
inlineexplicit

在文件 websocket_handler.h58 行定义.

58: name_(name) {}

成员函数说明

◆ BroadcastBinaryData()

bool apollo::dreamview::WebSocketHandler::BroadcastBinaryData ( const std::string &  data,
bool  skippable = false 
)

Sends the provided binary data to all the connected clients.

参数
dataThe message string to be sent.

在文件 websocket_handler.cc98 行定义.

99 {
100 std::vector<Connection *> connections_to_send;
101 {
102 std::unique_lock<std::mutex> lock(mutex_);
103 if (connections_.empty()) {
104 return true;
105 }
106 for (auto &kv : connections_) {
107 Connection *conn = kv.first;
108 connections_to_send.push_back(conn);
109 }
110 }
111
112 bool all_success = true;
113 for (Connection *conn : connections_to_send) {
114 if (!SendData(conn, data, skippable, MG_WEBSOCKET_OPCODE_BINARY)) {
115 all_success = false;
116 }
117 }
118
119 return all_success;
120}
bool SendData(Connection *conn, const std::string &data, bool skippable=false, int op_code=MG_WEBSOCKET_OPCODE_TEXT)
Sends the provided data to a specific connected client.

◆ BroadcastData()

bool apollo::dreamview::WebSocketHandler::BroadcastData ( const std::string &  data,
bool  skippable = false 
)

Sends the provided data to all the connected clients.

参数
dataThe message string to be sent.

在文件 websocket_handler.cc75 行定义.

75 {
76 std::vector<Connection *> connections_to_send;
77 {
78 std::unique_lock<std::mutex> lock(mutex_);
79 if (connections_.empty()) {
80 return true;
81 }
82 for (auto &kv : connections_) {
83 Connection *conn = kv.first;
84 connections_to_send.push_back(conn);
85 }
86 }
87
88 bool all_success = true;
89 for (Connection *conn : connections_to_send) {
90 if (!SendData(conn, data, skippable)) {
91 all_success = false;
92 }
93 }
94
95 return all_success;
96}

◆ handleBinaryData()

bool apollo::dreamview::WebSocketHandler::handleBinaryData ( Connection conn,
const std::string &  data 
)

在文件 websocket_handler.cc254 行定义.

255 {
256 auto type = "Binary";
257 message_handlers_[type](data, conn);
258 return true;
259}

◆ handleClose()

void apollo::dreamview::WebSocketHandler::handleClose ( CivetServer *  server,
const Connection conn 
)
override

Callback method for when the connection is closed.

参数
serverthe calling server
connthe connection information

在文件 websocket_handler.cc41 行定义.

42 {
43 // Remove from the store of currently open connections. Copy the mutex out
44 // so that it won't be reclaimed during map.erase().
45 Connection *connection = const_cast<Connection *>(conn);
46
47 std::shared_ptr<std::mutex> connection_lock;
48 {
49 std::unique_lock<std::mutex> lock(mutex_);
50 connection_lock = connections_[connection];
51 }
52
53 {
54 // Make sure there's no data being sent via the connection
55 std::unique_lock<std::mutex> lock_connection(*connection_lock);
56 std::unique_lock<std::mutex> lock(mutex_);
57 connections_.erase(connection);
58 }
59 // send close frame
60 int ret = mg_websocket_write(connection, 0x8, "", 0);
61 // Determine error message based on return value.
62 AWARN << name_ << ": Failed to send clase frame. Reason";
63 if (ret == 0) {
64 AWARN << "Connection closed";
65 } else if (ret < 0) {
66 AWARN << "Send error: " << std::strerror(errno);
67 } else {
68 AWARN << "Bytes to send: expected 2, actual: " << ret;
69 }
70
71 AINFO << name_
72 << ": Connection closed. Total connections: " << connections_.size();
73}
#define AINFO
Definition log.h:42
#define AWARN
Definition log.h:43

◆ handleConnection()

bool apollo::dreamview::WebSocketHandler::handleConnection ( CivetServer *  server,
const Connection conn 
)
inlineoverride

Callback method for when the client intends to establish a websocket connection, before websocket handshake.

参数
serverthe calling server
connthe connection information
返回
true to keep socket open, false to close it

在文件 websocket_handler.h68 行定义.

68 {
69 return true;
70 }

◆ handleData()

bool apollo::dreamview::WebSocketHandler::handleData ( CivetServer *  server,
Connection conn,
int  bits,
char *  data,
size_t  data_len 
)
override

Callback method for when a data frame has been received from the client.

In the websocket protocol, data is transmitted using a sequence of frames, and each frame received invokes this callback method. Since the type of opcode (text, binary, etc) is given in the first frame, this method stores the opcode in a thread_local variable named current_opcode_. And data from each frame is accumulated to data_ until the final fragment is detected. See websocket RFC at http://tools.ietf.org/html/rfc6455, section 5.4 for more protocol and fragmentation details.

参数
serverthe calling server
connthe connection information
bitsfirst byte of the websocket frame, see websocket RFC at http://tools.ietf.org/html/rfc6455, section 5.2
datapayload, with mask (if any) already applied.
data_lenlength of data
返回
true to keep socket open, false to close it

在文件 websocket_handler.cc189 行定义.

190 {
191 // Ignore connection close request.
192 if ((bits & 0x0F) == MG_WEBSOCKET_OPCODE_CONNECTION_CLOSE) {
193 return false;
194 }
195
196 data_.write(data, data_len);
197 if (current_opcode_ == 0x00) {
198 current_opcode_ = bits & 0x7f;
199 }
200
201 bool result = true;
202
203 // The FIN bit (the left most significant bit) is used to indicates
204 // the final fragment in a message. Note, the first fragment MAY
205 // also be the final fragment.
206 bool is_final_fragment = bits & 0x80;
207 if (is_final_fragment) {
208 switch (current_opcode_) {
209 case MG_WEBSOCKET_OPCODE_TEXT:
210 result = handleJsonData(conn, data_.str());
211 break;
212 case MG_WEBSOCKET_OPCODE_BINARY:
213 result = handleBinaryData(conn, data_.str());
214 break;
215 default:
216 AERROR << name_ << ": Unknown WebSocket bits flag: " << bits;
217 break;
218 }
219
220 // reset opcode and data
221 current_opcode_ = 0x00;
222 data_.clear();
223 data_.str(std::string());
224 }
225
226 return result;
227}
bool handleBinaryData(Connection *conn, const std::string &data)
bool handleJsonData(Connection *conn, const std::string &data)
#define AERROR
Definition log.h:44

◆ handleJsonData()

bool apollo::dreamview::WebSocketHandler::handleJsonData ( Connection conn,
const std::string &  data 
)

在文件 websocket_handler.cc229 行定义.

230 {
231 Json json;
232 try {
233 json = Json::parse(data.begin(), data.end());
234 } catch (const std::exception &e) {
235 AERROR << "Failed to parse JSON data: " << e.what();
236 return false;
237 }
238
239 if (!ContainsKey(json, "type")) {
240 AERROR << "Received JSON data without type field: " << json;
241 return true;
242 }
243
244 auto type = json["type"];
245 if (!ContainsKey(message_handlers_, type)) {
246 AERROR << "No message handler found for message type " << type
247 << ". The message will be discarded!";
248 return true;
249 }
250 message_handlers_[type](json, conn);
251 return true;
252}
nlohmann::json Json

◆ handleReadyState()

void apollo::dreamview::WebSocketHandler::handleReadyState ( CivetServer *  server,
Connection conn 
)
override

Callback method for when websocket handshake is successfully completed, and connection is ready for data exchange.

参数
serverthe calling server
connthe connection information

在文件 websocket_handler.cc27 行定义.

27 {
28 {
29 std::unique_lock<std::mutex> lock(mutex_);
30 connections_.emplace(conn, std::make_shared<std::mutex>());
31 }
32 AINFO << name_
33 << ": Accepted connection. Total connections: " << connections_.size();
34
35 // Trigger registered new connection handlers.
36 for (const auto handler : connection_ready_handlers_) {
37 handler(conn);
38 }
39}

◆ RegisterConnectionReadyHandler()

void apollo::dreamview::WebSocketHandler::RegisterConnectionReadyHandler ( ConnectionReadyHandler  handler)
inline

Add a new handler for new connections.

参数
handlerThe function to handle the new connection in ReadyState.

在文件 websocket_handler.h154 行定义.

154 {
155 connection_ready_handlers_.emplace_back(handler);
156 }

◆ RegisterMessageHandler()

void apollo::dreamview::WebSocketHandler::RegisterMessageHandler ( std::string  type,
MessageHandler  handler 
)
inline

Add a new message handler for a message type.

参数
typeThe name/key to identify the message type.
handlerThe function to handle the received message.

在文件 websocket_handler.h146 行定义.

146 {
147 message_handlers_[type] = handler;
148 }

◆ SendBinaryData()

bool apollo::dreamview::WebSocketHandler::SendBinaryData ( Connection conn,
const std::string &  data,
bool  skippable = false 
)

在文件 websocket_handler.cc122 行定义.

123 {
124 return SendData(conn, data, skippable, MG_WEBSOCKET_OPCODE_BINARY);
125}

◆ SendData()

bool apollo::dreamview::WebSocketHandler::SendData ( Connection conn,
const std::string &  data,
bool  skippable = false,
int  op_code = MG_WEBSOCKET_OPCODE_TEXT 
)

Sends the provided data to a specific connected client.

参数
connThe connection to send to.
dataThe message string to be sent.
skippablewhether the data is allowed to be skipped if some other is being sent to this connection.

在文件 websocket_handler.cc127 行定义.

128 {
129 std::shared_ptr<std::mutex> connection_lock;
130 {
131 std::unique_lock<std::mutex> lock(mutex_);
132 if (!ContainsKey(connections_, conn)) {
133 AERROR << name_
134 << ": Trying to send to an uncached connection, skipping.";
135 return false;
136 }
137 // Copy the lock so that it still exists if the connection is closed after
138 // this block.
139 connection_lock = connections_[conn];
140 }
141
142 // Lock the connection while sending.
143 if (!connection_lock->try_lock()) {
144 // Skip sending data if:
145 // 1. Data is skippable according to sender and there's higher priority data
146 // being sent.
147 // 2. The connection has been closed.
148 if (skippable) {
149 AWARN << "Skip sending a droppable message!";
150 return false;
151 }
152 // Block to acquire the lock.
153 connection_lock->lock();
154 std::unique_lock<std::mutex> lock(mutex_);
155 if (!ContainsKey(connections_, conn)) {
156 return false;
157 }
158 }
159
160 // Note that while we are holding the connection lock, the connection won't be
161 // closed and removed.
162 int ret = mg_websocket_write(conn, op_code, data.c_str(), data.size());
163 connection_lock->unlock();
164
165 if (ret != static_cast<int>(data.size())) {
166 // When data is empty, the header length (2) is returned.
167 if (data.empty() && ret == 2) {
168 return true;
169 }
170
171 // Determine error message based on return value.
172 AWARN << name_ << ": Failed to send data via websocket connection. Reason";
173 if (ret == 0) {
174 AWARN << "Connection closed";
175 } else if (ret < 0) {
176 AWARN << "Send error: " << std::strerror(errno);
177 } else {
178 AWARN << "Bytes to send: expected " << data.size() << ", actual: " << ret;
179 }
180 return false;
181 }
182
183 return true;
184}

该类的文档由以下文件生成: