25using apollo::common::util::ContainsKey;
29 std::unique_lock<std::mutex> lock(mutex_);
30 connections_.emplace(conn, std::make_shared<std::mutex>());
33 <<
": Accepted connection. Total connections: " << connections_.size();
36 for (
const auto handler : connection_ready_handlers_) {
47 std::shared_ptr<std::mutex> connection_lock;
49 std::unique_lock<std::mutex> lock(mutex_);
50 connection_lock = connections_[connection];
55 std::unique_lock<std::mutex> lock_connection(*connection_lock);
56 std::unique_lock<std::mutex> lock(mutex_);
57 connections_.erase(connection);
60 int ret = mg_websocket_write(connection, 0x8,
"", 0);
62 AWARN << name_ <<
": Failed to send clase frame. Reason";
64 AWARN <<
"Connection closed";
66 AWARN <<
"Send error: " << std::strerror(errno);
68 AWARN <<
"Bytes to send: expected 2, actual: " << ret;
72 <<
": Connection closed. Total connections: " << connections_.size();
76 std::vector<Connection *> connections_to_send;
78 std::unique_lock<std::mutex> lock(mutex_);
79 if (connections_.empty()) {
82 for (
auto &kv : connections_) {
84 connections_to_send.push_back(conn);
88 bool all_success =
true;
90 if (!
SendData(conn, data, skippable)) {
100 std::vector<Connection *> connections_to_send;
102 std::unique_lock<std::mutex> lock(mutex_);
103 if (connections_.empty()) {
106 for (
auto &kv : connections_) {
108 connections_to_send.push_back(conn);
112 bool all_success =
true;
113 for (
Connection *conn : connections_to_send) {
114 if (!
SendData(conn, data, skippable, MG_WEBSOCKET_OPCODE_BINARY)) {
124 return SendData(conn, data, skippable, MG_WEBSOCKET_OPCODE_BINARY);
128 bool skippable,
int op_code) {
129 std::shared_ptr<std::mutex> connection_lock;
131 std::unique_lock<std::mutex> lock(mutex_);
132 if (!ContainsKey(connections_, conn)) {
134 <<
": Trying to send to an uncached connection, skipping.";
139 connection_lock = connections_[conn];
143 if (!connection_lock->try_lock()) {
149 AWARN <<
"Skip sending a droppable message!";
153 connection_lock->lock();
154 std::unique_lock<std::mutex> lock(mutex_);
155 if (!ContainsKey(connections_, conn)) {
162 int ret = mg_websocket_write(conn, op_code, data.c_str(), data.size());
163 connection_lock->unlock();
165 if (ret !=
static_cast<int>(data.size())) {
167 if (data.empty() && ret == 2) {
172 AWARN << name_ <<
": Failed to send data via websocket connection. Reason";
174 AWARN <<
"Connection closed";
175 }
else if (ret < 0) {
176 AWARN <<
"Send error: " << std::strerror(errno);
178 AWARN <<
"Bytes to send: expected " << data.size() <<
", actual: " << ret;
186thread_local unsigned char WebSocketHandler::current_opcode_ = 0x00;
187thread_local std::stringstream WebSocketHandler::data_;
190 int bits,
char *data,
size_t data_len) {
192 if ((bits & 0x0F) == MG_WEBSOCKET_OPCODE_CONNECTION_CLOSE) {
196 data_.write(data, data_len);
197 if (current_opcode_ == 0x00) {
198 current_opcode_ = bits & 0x7f;
206 bool is_final_fragment = bits & 0x80;
207 if (is_final_fragment) {
208 switch (current_opcode_) {
209 case MG_WEBSOCKET_OPCODE_TEXT:
212 case MG_WEBSOCKET_OPCODE_BINARY:
216 AERROR << name_ <<
": Unknown WebSocket bits flag: " << bits;
221 current_opcode_ = 0x00;
223 data_.str(std::string());
230 const std::string &data) {
233 json = Json::parse(data.begin(), data.end());
234 }
catch (
const std::exception &e) {
235 AERROR <<
"Failed to parse JSON data: " << e.what();
239 if (!ContainsKey(json,
"type")) {
240 AERROR <<
"Received JSON data without type field: " << json;
244 auto type = json[
"type"];
245 if (!ContainsKey(message_handlers_, type)) {
246 AERROR <<
"No message handler found for message type " << type
247 <<
". The message will be discarded!";
250 message_handlers_[type](json, conn);
255 const std::string &data) {
256 auto type =
"Binary";
257 message_handlers_[type](data, conn);
bool BroadcastData(const std::string &data, bool skippable=false)
Sends the provided data to all the connected clients.
bool handleData(CivetServer *server, Connection *conn, int bits, char *data, size_t data_len) override
Callback method for when a data frame has been received from the client.
bool BroadcastBinaryData(const std::string &data, bool skippable=false)
Sends the provided binary data to all the connected clients.
bool SendData(Connection *conn, const std::string &data, bool skippable=false, int op_code=MG_WEBSOCKET_OPCODE_TEXT)
Sends the provided data to a specific connected client.
bool handleBinaryData(Connection *conn, const std::string &data)
void handleClose(CivetServer *server, const Connection *conn) override
Callback method for when the connection is closed.
bool handleJsonData(Connection *conn, const std::string &data)
void handleReadyState(CivetServer *server, Connection *conn) override
Callback method for when websocket handshake is successfully completed, and connection is ready for d...
bool SendBinaryData(Connection *conn, const std::string &data, bool skippable=false)