17#ifndef CYBER_TRANSPORT_MESSAGE_LISTENER_HANDLER_H_
18#define CYBER_TRANSPORT_MESSAGE_LISTENER_HANDLER_H_
23#include <unordered_map>
41class ListenerHandlerBase;
50 virtual void Disconnect(uint64_t self_id, uint64_t oppo_id) = 0;
59template <
typename MessageT>
62 using Message = std::shared_ptr<MessageT>;
74 void Connect(uint64_t self_id, uint64_t oppo_id,
const Listener& listener);
77 void Disconnect(uint64_t self_id, uint64_t oppo_id)
override;
84 using SignalPtr = std::shared_ptr<MessageSignal>;
85 using MessageSignalMap = std::unordered_map<uint64_t, SignalPtr>;
91 MessageSignalMap signals_;
93 std::unordered_map<uint64_t, ConnectionMap> signals_conns_;
100 is_raw_message_ =
true;
103template <
typename MessageT>
106 auto connection = signal_.Connect(listener);
107 if (!connection.IsConnected()) {
112 signal_conns_[self_id] = connection;
115template <
typename MessageT>
119 if (signals_.find(oppo_id) == signals_.end()) {
120 signals_[oppo_id] = std::make_shared<MessageSignal>();
123 auto connection = signals_[oppo_id]->Connect(listener);
124 if (!connection.IsConnected()) {
125 AWARN << oppo_id <<
" " << self_id <<
" connect failed!";
129 if (signals_conns_.find(oppo_id) == signals_conns_.end()) {
133 signals_conns_[oppo_id][self_id] = connection;
136template <
typename MessageT>
139 if (signal_conns_.find(self_id) == signal_conns_.end()) {
143 signal_conns_[self_id].Disconnect();
144 signal_conns_.erase(self_id);
147template <
typename MessageT>
150 if (signals_conns_.find(oppo_id) == signals_conns_.end()) {
154 if (signals_conns_[oppo_id].find(self_id) == signals_conns_[oppo_id].end()) {
158 signals_conns_[oppo_id][self_id].Disconnect();
159 signals_conns_[oppo_id].erase(self_id);
162template <
typename MessageT>
165 signal_(msg, msg_info);
168 if (signals_.find(oppo_id) == signals_.end()) {
172 (*signals_[oppo_id])(msg, msg_info);
175template <
typename MessageT>
178 auto msg = std::make_shared<MessageT>();
183 AWARN <<
"Failed to parse message. Content: " << str;
uint64_t HashValue() const
bool IsRawMessage() const
virtual ~ListenerHandlerBase()
virtual void RunFromString(const std::string &str, const MessageInfo &msg_info)=0
virtual void Disconnect(uint64_t self_id)=0
virtual void Disconnect(uint64_t self_id, uint64_t oppo_id)=0
virtual ~ListenerHandler()
void RunFromString(const std::string &str, const MessageInfo &msg_info) override
void Disconnect(uint64_t self_id) override
std::shared_ptr< MessageT > Message
void Connect(uint64_t self_id, const Listener &listener)
std::unordered_map< uint64_t, MessageConnection > ConnectionMap
void Run(const Message &msg, const MessageInfo &msg_info)
std::function< void(const Message &, const MessageInfo &)> Listener
base::Signal< const Message &, const MessageInfo & > MessageSignal
const Identity & sender_id() const
std::enable_if< HasParseFromArray< T >::value, bool >::type ParseFromHC(const void *data, int size, T *message)
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr