Apollo 10.0
自动驾驶开放平台
listener_handler.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_TRANSPORT_MESSAGE_LISTENER_HANDLER_H_
18#define CYBER_TRANSPORT_MESSAGE_LISTENER_HANDLER_H_
19
20#include <functional>
21#include <memory>
22#include <string>
23#include <unordered_map>
24
26#include "cyber/base/signal.h"
27#include "cyber/common/log.h"
32
33namespace apollo {
34namespace cyber {
35namespace transport {
36
40
41class ListenerHandlerBase;
42using ListenerHandlerBasePtr = std::shared_ptr<ListenerHandlerBase>;
43
45 public:
48
49 virtual void Disconnect(uint64_t self_id) = 0;
50 virtual void Disconnect(uint64_t self_id, uint64_t oppo_id) = 0;
51 inline bool IsRawMessage() const { return is_raw_message_; }
52 virtual void RunFromString(const std::string& str,
53 const MessageInfo& msg_info) = 0;
54
55 protected:
56 bool is_raw_message_ = false;
57};
58
59template <typename MessageT>
61 public:
62 using Message = std::shared_ptr<MessageT>;
64
65 using Listener = std::function<void(const Message&, const MessageInfo&)>;
68 using ConnectionMap = std::unordered_map<uint64_t, MessageConnection>;
69
71 virtual ~ListenerHandler() {}
72
73 void Connect(uint64_t self_id, const Listener& listener);
74 void Connect(uint64_t self_id, uint64_t oppo_id, const Listener& listener);
75
76 void Disconnect(uint64_t self_id) override;
77 void Disconnect(uint64_t self_id, uint64_t oppo_id) override;
78
79 void Run(const Message& msg, const MessageInfo& msg_info);
80 void RunFromString(const std::string& str,
81 const MessageInfo& msg_info) override;
82
83 private:
84 using SignalPtr = std::shared_ptr<MessageSignal>;
85 using MessageSignalMap = std::unordered_map<uint64_t, SignalPtr>;
86 // used for self_id
87 MessageSignal signal_;
88 ConnectionMap signal_conns_; // key: self_id
89
90 // used for self_id and oppo_id
91 MessageSignalMap signals_; // key: oppo_id
92 // key: oppo_id
93 std::unordered_map<uint64_t, ConnectionMap> signals_conns_;
94
95 base::AtomicRWLock rw_lock_;
96};
97
98template <>
100 is_raw_message_ = true;
101}
102
103template <typename MessageT>
105 const Listener& listener) {
106 auto connection = signal_.Connect(listener);
107 if (!connection.IsConnected()) {
108 return;
109 }
110
111 WriteLockGuard<AtomicRWLock> lock(rw_lock_);
112 signal_conns_[self_id] = connection;
113}
114
115template <typename MessageT>
116void ListenerHandler<MessageT>::Connect(uint64_t self_id, uint64_t oppo_id,
117 const Listener& listener) {
118 WriteLockGuard<AtomicRWLock> lock(rw_lock_);
119 if (signals_.find(oppo_id) == signals_.end()) {
120 signals_[oppo_id] = std::make_shared<MessageSignal>();
121 }
122
123 auto connection = signals_[oppo_id]->Connect(listener);
124 if (!connection.IsConnected()) {
125 AWARN << oppo_id << " " << self_id << " connect failed!";
126 return;
127 }
128
129 if (signals_conns_.find(oppo_id) == signals_conns_.end()) {
130 signals_conns_[oppo_id] = ConnectionMap();
131 }
132
133 signals_conns_[oppo_id][self_id] = connection;
134}
135
136template <typename MessageT>
138 WriteLockGuard<AtomicRWLock> lock(rw_lock_);
139 if (signal_conns_.find(self_id) == signal_conns_.end()) {
140 return;
141 }
142
143 signal_conns_[self_id].Disconnect();
144 signal_conns_.erase(self_id);
145}
146
147template <typename MessageT>
148void ListenerHandler<MessageT>::Disconnect(uint64_t self_id, uint64_t oppo_id) {
149 WriteLockGuard<AtomicRWLock> lock(rw_lock_);
150 if (signals_conns_.find(oppo_id) == signals_conns_.end()) {
151 return;
152 }
153
154 if (signals_conns_[oppo_id].find(self_id) == signals_conns_[oppo_id].end()) {
155 return;
156 }
157
158 signals_conns_[oppo_id][self_id].Disconnect();
159 signals_conns_[oppo_id].erase(self_id);
160}
161
162template <typename MessageT>
164 const MessageInfo& msg_info) {
165 signal_(msg, msg_info);
166 uint64_t oppo_id = msg_info.sender_id().HashValue();
167 ReadLockGuard<AtomicRWLock> lock(rw_lock_);
168 if (signals_.find(oppo_id) == signals_.end()) {
169 return;
170 }
171
172 (*signals_[oppo_id])(msg, msg_info);
173}
174
175template <typename MessageT>
177 const MessageInfo& msg_info) {
178 auto msg = std::make_shared<MessageT>();
179 if (message::ParseFromHC(str.data(), static_cast<int>(str.size()),
180 msg.get())) {
181 Run(msg, msg_info);
182 } else {
183 AWARN << "Failed to parse message. Content: " << str;
184 }
185}
186
187} // namespace transport
188} // namespace cyber
189} // namespace apollo
190
191#endif // CYBER_TRANSPORT_MESSAGE_LISTENER_HANDLER_H_
virtual void RunFromString(const std::string &str, const MessageInfo &msg_info)=0
virtual void Disconnect(uint64_t self_id)=0
virtual void Disconnect(uint64_t self_id, uint64_t oppo_id)=0
void RunFromString(const std::string &str, const MessageInfo &msg_info) override
void Disconnect(uint64_t self_id) override
void Connect(uint64_t self_id, const Listener &listener)
std::unordered_map< uint64_t, MessageConnection > ConnectionMap
void Run(const Message &msg, const MessageInfo &msg_info)
std::function< void(const Message &, const MessageInfo &)> Listener
base::Signal< const Message &, const MessageInfo & > MessageSignal
const Identity & sender_id() const
#define AWARN
Definition log.h:43
std::enable_if< HasParseFromArray< T >::value, bool >::type ParseFromHC(const void *data, int size, T *message)
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr
class register implement
Definition arena_queue.h:37