52 using BaseHandlersType =
53 std::map<uint64_t, std::map<std::string, ListenerHandlerBasePtr>>;
56 template <
typename MessageT>
58 const std::string& message_type,
61 auto ret = GetHandler<MessageT>(channel_id, message_type, &handlers_);
62 auto handler = ret.first;
63 if (handler ==
nullptr) {
64 AERROR <<
"get handler failed. channel: "
66 <<
", message type: " << message::GetMessageName<MessageT>();
69 handler->Connect(self_id, listener);
73 template <
typename MessageT>
74 bool AddListener(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id,
75 const std::string& message_type,
78 if (oppo_handlers_.find(oppo_id) == oppo_handlers_.end()) {
79 oppo_handlers_[oppo_id] = BaseHandlersType();
81 BaseHandlersType& handlers = oppo_handlers_[oppo_id];
82 auto ret = GetHandler<MessageT>(channel_id, message_type, &handlers);
83 auto handler = ret.first;
84 if (handler ==
nullptr) {
85 AERROR <<
"get handler failed. channel: "
87 <<
", message type: " << message_type;
90 handler->Connect(self_id, oppo_id, listener);
94 template <
typename MessageT>
96 const std::string& message_type) {
98 auto handler = RemoveHandler(channel_id, message_type, &handlers_);
100 handler->Disconnect(self_id);
104 template <
typename MessageT>
106 const std::string& message_type) {
108 if (oppo_handlers_.find(oppo_id) == oppo_handlers_.end()) {
111 BaseHandlersType& handlers = oppo_handlers_[oppo_id];
112 auto handler = RemoveHandler(channel_id, message_type, &handlers);
113 if (oppo_handlers_[oppo_id].empty()) {
114 oppo_handlers_.erase(oppo_id);
117 handler->Disconnect(self_id, oppo_id);
121 template <
typename MessageT>
122 void Run(uint64_t self_id, uint64_t channel_id,
123 const std::string& message_type,
124 const std::shared_ptr<MessageT>& message,
127 Run(channel_id, message_type, handlers_, message, message_info);
130 template <
typename MessageT>
131 void Run(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id,
132 const std::string& message_type,
133 const std::shared_ptr<MessageT>& message,
136 if (oppo_handlers_.find(oppo_id) == oppo_handlers_.end()) {
139 BaseHandlersType& handlers = oppo_handlers_[oppo_id];
140 Run(channel_id, message_type, handlers, message, message_info);
145 template <
typename MessageT>
146 std::pair<std::shared_ptr<ListenerHandler<MessageT>>,
bool> GetHandler(
147 uint64_t channel_id,
const std::string& message_type,
148 BaseHandlersType* handlers) {
149 std::shared_ptr<ListenerHandler<MessageT>> handler;
150 bool created =
false;
152 if (handlers->find(channel_id) == handlers->end()) {
153 (*handlers)[channel_id] = std::map<std::string, ListenerHandlerBasePtr>();
156 if ((*handlers)[channel_id].find(message_type) ==
157 (*handlers)[channel_id].end()) {
158 ADEBUG <<
"Create new ListenerHandler for channel "
160 <<
", message type: " << message_type;
161 handler.reset(
new ListenerHandler<MessageT>());
162 (*handlers)[channel_id][message_type] = handler;
166 <<
"'s ListenerHandler, message type: " << message_type;
167 handler = std::dynamic_pointer_cast<ListenerHandler<MessageT>>(
168 (*handlers)[channel_id][message_type]);
171 return std::make_pair(handler, created);
176 const std::string message_type,
177 BaseHandlersType* handlers) {
179 if (handlers->find(channel_id) != handlers->end()) {
180 if ((*handlers)[channel_id].find(message_type) !=
181 (*handlers)[channel_id].end()) {
182 handler_base = (*handlers)[channel_id][message_type];
184 << message_type <<
" ListenerHandler";
185 (*handlers)[channel_id].erase(message_type);
187 if ((*handlers)[channel_id].empty()) {
189 <<
"'s all ListenerHandler";
190 (*handlers).erase(channel_id);
196 template <
typename MessageT>
197 void Run(
const uint64_t channel_id,
const std::string& message_type,
198 const BaseHandlersType& handlers,
199 const std::shared_ptr<MessageT>& message,
200 const MessageInfo& message_info) {
201 const auto channel_handlers_itr = handlers.find(channel_id);
202 if (channel_handlers_itr == handlers.end()) {
207 const auto& channel_handlers = channel_handlers_itr->second;
210 <<
"'s chain run, size: " << channel_handlers.size()
211 <<
", message type: " << message_type;
213 for (
const auto& ele : channel_handlers) {
214 auto handler_base = ele.second;
215 if (message_type == ele.first) {
216 ADEBUG <<
"Run handler for message type: " << ele.first <<
" directly";
218 std::static_pointer_cast<ListenerHandler<MessageT>>(handler_base);
219 if (handler ==
nullptr) {
222 handler->Run(message, message_info);
224 ADEBUG <<
"Run handler for message type: " << ele.first
229 AERROR <<
"Failed to get message size. channel["
233 msg.resize(msg_size);
236 AERROR <<
"Chain Serialize error for channel id: " << channel_id;
241 (handler_base)->RunFromString(msg, message_info);
247 BaseHandlersType handlers_;
248 base::AtomicRWLock rw_lock_;
249 std::map<uint64_t, BaseHandlersType> oppo_handlers_;
250 base::AtomicRWLock oppo_rw_lock_;
354 std::string message_type = message::GetMessageName<MessageT>();
355 uint64_t self_id = self_attr.
id();
358 chain_->AddListener(self_id, channel_id, message_type, listener);
360 auto handler = GetHandler<MessageT>(self_attr.
channel_id());
361 if (handler && created) {
362 auto listener_wrapper =
363 [
this, self_id, channel_id, message_type, self_attr](
364 const std::shared_ptr<MessageT>& message,
367 statistics::Statistics::Instance()->SetProcStatus(self_attr, recv_time);
368 this->chain_->Run<MessageT>(self_id, channel_id, message_type, message,
371 handler->Connect(self_id, listener_wrapper);
384 std::string message_type = message::GetMessageName<MessageT>();
385 uint64_t self_id = self_attr.
id();
386 uint64_t oppo_id = opposite_attr.
id();
389 chain_->AddListener(self_id, oppo_id, channel_id, message_type, listener);
391 auto handler = GetHandler<MessageT>(self_attr.
channel_id());
392 if (handler && created) {
393 auto listener_wrapper =
394 [
this, self_id, oppo_id, channel_id, message_type, self_attr](
395 const std::shared_ptr<MessageT>& message,
398 statistics::Statistics::Instance()->SetProcStatus(self_attr, recv_time);
399 this->chain_->Run<MessageT>(self_id, oppo_id, channel_id, message_type,
400 message, message_info);
402 handler->Connect(self_id, oppo_id, listener_wrapper);