Apollo 10.0
自动驾驶开放平台
intra_dispatcher.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_TRANSPORT_DISPATCHER_INTRA_DISPATCHER_H_
18#define CYBER_TRANSPORT_DISPATCHER_INTRA_DISPATCHER_H_
19
20#include <iostream>
21#include <map>
22#include <memory>
23#include <string>
24#include <utility>
25
28#include "cyber/common/log.h"
29#include "cyber/common/macros.h"
33#include "cyber/time/time.h"
35
36namespace apollo {
37namespace cyber {
38namespace transport {
39
40class IntraDispatcher;
42class ChannelChain;
43using ChannelChainPtr = std::shared_ptr<ChannelChain>;
44template <typename MessageT>
45using MessageListener =
46 std::function<void(const std::shared_ptr<MessageT>&, const MessageInfo&)>;
47
48// use a channel chain to wrap specific ListenerHandler.
49// If the message is MessageT, then we use pointer directly, or we first parse
50// to a string, and use it to serialise to another message type.
52 using BaseHandlersType =
53 std::map<uint64_t, std::map<std::string, ListenerHandlerBasePtr>>;
54
55 public:
56 template <typename MessageT>
57 bool AddListener(uint64_t self_id, uint64_t channel_id,
58 const std::string& message_type,
59 const MessageListener<MessageT>& listener) {
61 auto ret = GetHandler<MessageT>(channel_id, message_type, &handlers_);
62 auto handler = ret.first;
63 if (handler == nullptr) {
64 AERROR << "get handler failed. channel: "
65 << GlobalData::GetChannelById(channel_id)
66 << ", message type: " << message::GetMessageName<MessageT>();
67 return ret.second;
68 }
69 handler->Connect(self_id, listener);
70 return ret.second;
71 }
72
73 template <typename MessageT>
74 bool AddListener(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id,
75 const std::string& message_type,
76 const MessageListener<MessageT>& listener) {
77 WriteLockGuard<base::AtomicRWLock> lg(oppo_rw_lock_);
78 if (oppo_handlers_.find(oppo_id) == oppo_handlers_.end()) {
79 oppo_handlers_[oppo_id] = BaseHandlersType();
80 }
81 BaseHandlersType& handlers = oppo_handlers_[oppo_id];
82 auto ret = GetHandler<MessageT>(channel_id, message_type, &handlers);
83 auto handler = ret.first;
84 if (handler == nullptr) {
85 AERROR << "get handler failed. channel: "
86 << GlobalData::GetChannelById(channel_id)
87 << ", message type: " << message_type;
88 return ret.second;
89 }
90 handler->Connect(self_id, oppo_id, listener);
91 return ret.second;
92 }
93
94 template <typename MessageT>
95 void RemoveListener(uint64_t self_id, uint64_t channel_id,
96 const std::string& message_type) {
98 auto handler = RemoveHandler(channel_id, message_type, &handlers_);
99 if (handler) {
100 handler->Disconnect(self_id);
101 }
102 }
103
104 template <typename MessageT>
105 void RemoveListener(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id,
106 const std::string& message_type) {
107 WriteLockGuard<base::AtomicRWLock> lg(oppo_rw_lock_);
108 if (oppo_handlers_.find(oppo_id) == oppo_handlers_.end()) {
109 return;
110 }
111 BaseHandlersType& handlers = oppo_handlers_[oppo_id];
112 auto handler = RemoveHandler(channel_id, message_type, &handlers);
113 if (oppo_handlers_[oppo_id].empty()) {
114 oppo_handlers_.erase(oppo_id);
115 }
116 if (handler) {
117 handler->Disconnect(self_id, oppo_id);
118 }
119 }
120
121 template <typename MessageT>
122 void Run(uint64_t self_id, uint64_t channel_id,
123 const std::string& message_type,
124 const std::shared_ptr<MessageT>& message,
125 const MessageInfo& message_info) {
127 Run(channel_id, message_type, handlers_, message, message_info);
128 }
129
130 template <typename MessageT>
131 void Run(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id,
132 const std::string& message_type,
133 const std::shared_ptr<MessageT>& message,
134 const MessageInfo& message_info) {
135 ReadLockGuard<base::AtomicRWLock> lg(oppo_rw_lock_);
136 if (oppo_handlers_.find(oppo_id) == oppo_handlers_.end()) {
137 return;
138 }
139 BaseHandlersType& handlers = oppo_handlers_[oppo_id];
140 Run(channel_id, message_type, handlers, message, message_info);
141 }
142
143 private:
144 // NOTE: lock hold
145 template <typename MessageT>
146 std::pair<std::shared_ptr<ListenerHandler<MessageT>>, bool> GetHandler(
147 uint64_t channel_id, const std::string& message_type,
148 BaseHandlersType* handlers) {
149 std::shared_ptr<ListenerHandler<MessageT>> handler;
150 bool created = false; // if the handler is created
151
152 if (handlers->find(channel_id) == handlers->end()) {
153 (*handlers)[channel_id] = std::map<std::string, ListenerHandlerBasePtr>();
154 }
155
156 if ((*handlers)[channel_id].find(message_type) ==
157 (*handlers)[channel_id].end()) {
158 ADEBUG << "Create new ListenerHandler for channel "
159 << GlobalData::GetChannelById(channel_id)
160 << ", message type: " << message_type;
161 handler.reset(new ListenerHandler<MessageT>());
162 (*handlers)[channel_id][message_type] = handler;
163 created = true;
164 } else {
165 ADEBUG << "Find channel " << GlobalData::GetChannelById(channel_id)
166 << "'s ListenerHandler, message type: " << message_type;
167 handler = std::dynamic_pointer_cast<ListenerHandler<MessageT>>(
168 (*handlers)[channel_id][message_type]);
169 }
170
171 return std::make_pair(handler, created);
172 }
173
174 // NOTE: Lock hold
175 ListenerHandlerBasePtr RemoveHandler(int64_t channel_id,
176 const std::string message_type,
177 BaseHandlersType* handlers) {
178 ListenerHandlerBasePtr handler_base;
179 if (handlers->find(channel_id) != handlers->end()) {
180 if ((*handlers)[channel_id].find(message_type) !=
181 (*handlers)[channel_id].end()) {
182 handler_base = (*handlers)[channel_id][message_type];
183 ADEBUG << "remove " << GlobalData::GetChannelById(channel_id) << "'s "
184 << message_type << " ListenerHandler";
185 (*handlers)[channel_id].erase(message_type);
186 }
187 if ((*handlers)[channel_id].empty()) {
188 ADEBUG << "remove " << GlobalData::GetChannelById(channel_id)
189 << "'s all ListenerHandler";
190 (*handlers).erase(channel_id);
191 }
192 }
193 return handler_base;
194 }
195
196 template <typename MessageT>
197 void Run(const uint64_t channel_id, const std::string& message_type,
198 const BaseHandlersType& handlers,
199 const std::shared_ptr<MessageT>& message,
200 const MessageInfo& message_info) {
201 const auto channel_handlers_itr = handlers.find(channel_id);
202 if (channel_handlers_itr == handlers.end()) {
203 AERROR << "Cant find channel " << GlobalData::GetChannelById(channel_id)
204 << " in Chain";
205 return;
206 }
207 const auto& channel_handlers = channel_handlers_itr->second;
208
210 << "'s chain run, size: " << channel_handlers.size()
211 << ", message type: " << message_type;
212 std::string msg;
213 for (const auto& ele : channel_handlers) {
214 auto handler_base = ele.second;
215 if (message_type == ele.first) {
216 ADEBUG << "Run handler for message type: " << ele.first << " directly";
217 auto handler =
218 std::static_pointer_cast<ListenerHandler<MessageT>>(handler_base);
219 if (handler == nullptr) {
220 continue;
221 }
222 handler->Run(message, message_info);
223 } else {
224 ADEBUG << "Run handler for message type: " << ele.first
225 << " from string";
226 if (msg.empty()) {
227 auto msg_size = message::FullByteSize(*message);
228 if (msg_size < 0) {
229 AERROR << "Failed to get message size. channel["
230 << common::GlobalData::GetChannelById(channel_id) << "]";
231 continue;
232 }
233 msg.resize(msg_size);
234 if (!message::SerializeToHC(*message, const_cast<char*>(msg.data()),
235 msg_size)) {
236 AERROR << "Chain Serialize error for channel id: " << channel_id;
237 msg.clear();
238 }
239 }
240 if (!msg.empty()) {
241 (handler_base)->RunFromString(msg, message_info);
242 }
243 }
244 }
245 }
246
247 BaseHandlersType handlers_;
248 base::AtomicRWLock rw_lock_;
249 std::map<uint64_t, BaseHandlersType> oppo_handlers_;
250 base::AtomicRWLock oppo_rw_lock_;
251};
252
254 public:
255 virtual ~IntraDispatcher();
256
257 template <typename MessageT>
258 void OnMessage(uint64_t channel_id, const std::shared_ptr<MessageT>& message,
259 const MessageInfo& message_info);
260
261 template <typename MessageT>
262 void AddListener(const RoleAttributes& self_attr,
263 const MessageListener<MessageT>& listener);
264
265 template <typename MessageT>
266 void AddListener(const RoleAttributes& self_attr,
267 const RoleAttributes& opposite_attr,
268 const MessageListener<MessageT>& listener);
269
270 template <typename MessageT>
271 void RemoveListener(const RoleAttributes& self_attr);
272
273 template <typename MessageT>
274 void RemoveListener(const RoleAttributes& self_attr,
275 const RoleAttributes& opposite_attr);
276
278
279 private:
280 template <typename MessageT>
281 std::shared_ptr<ListenerHandler<MessageT>> GetHandler(uint64_t channel_id);
282
283 ChannelChainPtr chain_;
284};
285
286template <typename MessageT>
287void IntraDispatcher::OnMessage(uint64_t channel_id,
288 const std::shared_ptr<MessageT>& message,
289 const MessageInfo& message_info) {
290 if (is_shutdown_.load()) {
291 return;
292 }
293 ListenerHandlerBasePtr* handler_base = nullptr;
294 ADEBUG << "intra on message, channel:"
296 if (msg_listeners_.Get(channel_id, &handler_base)) {
297 auto handler =
298 std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
299 if (handler) {
300 handler->Run(message, message_info);
301 } else {
302 auto msg_size = message::FullByteSize(*message);
303 if (msg_size < 0) {
304 AERROR << "Failed to get message size. channel["
305 << common::GlobalData::GetChannelById(channel_id) << "]";
306 return;
307 }
308 std::string msg;
309 msg.resize(msg_size);
310 if (message::SerializeToHC(*message, const_cast<char*>(msg.data()),
311 msg_size)) {
312 (*handler_base)->RunFromString(msg, message_info);
313 } else {
314 AERROR << "Failed to serialize message. channel["
315 << common::GlobalData::GetChannelById(channel_id) << "]";
316 }
317 }
318 }
319}
320
321template <typename MessageT>
322std::shared_ptr<ListenerHandler<MessageT>> IntraDispatcher::GetHandler(
323 uint64_t channel_id) {
324 std::shared_ptr<ListenerHandler<MessageT>> handler;
325 ListenerHandlerBasePtr* handler_base = nullptr;
326
327 if (msg_listeners_.Get(channel_id, &handler_base)) {
328 handler =
329 std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
330 if (handler == nullptr) {
331 ADEBUG << "Find a new type for channel "
332 << GlobalData::GetChannelById(channel_id) << " with type "
333 << message::GetMessageName<MessageT>();
334 }
335 } else {
336 ADEBUG << "Create new ListenerHandler for channel "
337 << GlobalData::GetChannelById(channel_id) << " with type "
338 << message::GetMessageName<MessageT>();
339 handler.reset(new ListenerHandler<MessageT>());
340 msg_listeners_.Set(channel_id, handler);
341 }
342
343 return handler;
344}
345
346template <typename MessageT>
348 const MessageListener<MessageT>& listener) {
349 if (is_shutdown_.load()) {
350 return;
351 }
352
353 auto channel_id = self_attr.channel_id();
354 std::string message_type = message::GetMessageName<MessageT>();
355 uint64_t self_id = self_attr.id();
356
357 bool created =
358 chain_->AddListener(self_id, channel_id, message_type, listener);
359
360 auto handler = GetHandler<MessageT>(self_attr.channel_id());
361 if (handler && created) {
362 auto listener_wrapper =
363 [this, self_id, channel_id, message_type, self_attr](
364 const std::shared_ptr<MessageT>& message,
365 const MessageInfo& message_info) {
366 auto recv_time = Time::Now().ToMicrosecond();
367 statistics::Statistics::Instance()->SetProcStatus(self_attr, recv_time);
368 this->chain_->Run<MessageT>(self_id, channel_id, message_type, message,
369 message_info);
370 };
371 handler->Connect(self_id, listener_wrapper);
372 }
373}
374
375template <typename MessageT>
377 const RoleAttributes& opposite_attr,
378 const MessageListener<MessageT>& listener) {
379 if (is_shutdown_.load()) {
380 return;
381 }
382
383 auto channel_id = self_attr.channel_id();
384 std::string message_type = message::GetMessageName<MessageT>();
385 uint64_t self_id = self_attr.id();
386 uint64_t oppo_id = opposite_attr.id();
387
388 bool created =
389 chain_->AddListener(self_id, oppo_id, channel_id, message_type, listener);
390
391 auto handler = GetHandler<MessageT>(self_attr.channel_id());
392 if (handler && created) {
393 auto listener_wrapper =
394 [this, self_id, oppo_id, channel_id, message_type, self_attr](
395 const std::shared_ptr<MessageT>& message,
396 const MessageInfo& message_info) {
397 auto recv_time = Time::Now().ToMicrosecond();
398 statistics::Statistics::Instance()->SetProcStatus(self_attr, recv_time);
399 this->chain_->Run<MessageT>(self_id, oppo_id, channel_id, message_type,
400 message, message_info);
401 };
402 handler->Connect(self_id, oppo_id, listener_wrapper);
403 }
404}
405
406template <typename MessageT>
408 if (is_shutdown_.load()) {
409 return;
410 }
411 Dispatcher::RemoveListener<MessageT>(self_attr);
412 chain_->RemoveListener<MessageT>(self_attr.id(), self_attr.channel_id(),
413 message::GetMessageName<MessageT>());
414}
415
416template <typename MessageT>
418 const RoleAttributes& opposite_attr) {
419 if (is_shutdown_.load()) {
420 return;
421 }
422 Dispatcher::RemoveListener<MessageT>(self_attr, opposite_attr);
423 chain_->RemoveListener<MessageT>(self_attr.id(), opposite_attr.id(),
424 self_attr.channel_id(),
425 message::GetMessageName<MessageT>());
426}
427
428} // namespace transport
429} // namespace cyber
430} // namespace apollo
431
432#endif // CYBER_TRANSPORT_DISPATCHER_INTRA_DISPATCHER_H_
uint64_t ToMicrosecond() const
convert time to microsecond (us).
Definition time.cc:85
static Time Now()
get the current time.
Definition time.cc:57
static std::string GetChannelById(uint64_t id)
bool AddListener(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id, const std::string &message_type, const MessageListener< MessageT > &listener)
void RemoveListener(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id, const std::string &message_type)
bool AddListener(uint64_t self_id, uint64_t channel_id, const std::string &message_type, const MessageListener< MessageT > &listener)
void Run(uint64_t self_id, uint64_t channel_id, const std::string &message_type, const std::shared_ptr< MessageT > &message, const MessageInfo &message_info)
void RemoveListener(uint64_t self_id, uint64_t channel_id, const std::string &message_type)
void Run(uint64_t self_id, uint64_t oppo_id, uint64_t channel_id, const std::string &message_type, const std::shared_ptr< MessageT > &message, const MessageInfo &message_info)
AtomicHashMap< uint64_t, ListenerHandlerBasePtr > msg_listeners_
Definition dispatcher.h:83
void AddListener(const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
void OnMessage(uint64_t channel_id, const std::shared_ptr< MessageT > &message, const MessageInfo &message_info)
void RemoveListener(const RoleAttributes &self_attr)
#define DECLARE_SINGLETON(classname)
Definition macros.h:52
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
std::enable_if< HasSerializeToArray< T >::value, bool >::type SerializeToHC(const T &message, void *data, int size)
int FullByteSize(const T &message)
std::shared_ptr< ChannelChain > ChannelChainPtr
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr
std::function< void(const std::shared_ptr< MessageT > &, const MessageInfo &)> MessageListener
Definition dispatcher.h:53
class register implement
Definition arena_queue.h:37