Apollo 10.0
自动驾驶开放平台
dispatcher.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_
18#define CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_
19
20#include <atomic>
21#include <functional>
22#include <iostream>
23#include <memory>
24#include <mutex>
25#include <string>
26#include <unordered_map>
27
28#include "cyber/proto/role_attributes.pb.h"
29
33#include "cyber/common/log.h"
36
37namespace apollo {
38namespace cyber {
39namespace transport {
40
46using cyber::proto::RoleAttributes;
47
48class Dispatcher;
49using DispatcherPtr = std::shared_ptr<Dispatcher>;
50
51template <typename MessageT>
53 std::function<void(const std::shared_ptr<MessageT>&, const MessageInfo&)>;
54
56 public:
57 Dispatcher();
58 virtual ~Dispatcher();
59
60 virtual void Shutdown();
61
62 template <typename MessageT>
63 void AddListener(const RoleAttributes& self_attr,
64 const MessageListener<MessageT>& listener);
65
66 template <typename MessageT>
67 void AddListener(const RoleAttributes& self_attr,
68 const RoleAttributes& opposite_attr,
69 const MessageListener<MessageT>& listener);
70
71 template <typename MessageT>
72 void RemoveListener(const RoleAttributes& self_attr);
73
74 template <typename MessageT>
75 void RemoveListener(const RoleAttributes& self_attr,
76 const RoleAttributes& opposite_attr);
77
78 bool HasChannel(uint64_t channel_id);
79
80 protected:
81 std::atomic<bool> is_shutdown_;
82 // key: channel_id of message
85};
86
87template <typename MessageT>
89 const MessageListener<MessageT>& listener) {
90 if (is_shutdown_.load()) {
91 return;
92 }
93 uint64_t channel_id = self_attr.channel_id();
94
95 std::shared_ptr<ListenerHandler<MessageT>> handler;
96 ListenerHandlerBasePtr* handler_base = nullptr;
97 if (msg_listeners_.Get(channel_id, &handler_base)) {
98 handler =
99 std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
100 if (handler == nullptr) {
101 AERROR << "please ensure that readers with the same channel["
102 << self_attr.channel_name()
103 << "] in the same process have the same message type";
104 return;
105 }
106 } else {
107 ADEBUG << "new reader for channel:"
108 << GlobalData::GetChannelById(channel_id);
109 handler.reset(new ListenerHandler<MessageT>());
110 msg_listeners_.Set(channel_id, handler);
111 }
112 handler->Connect(self_attr.id(), listener);
113}
114
115template <typename MessageT>
117 const RoleAttributes& opposite_attr,
118 const MessageListener<MessageT>& listener) {
119 if (is_shutdown_.load()) {
120 return;
121 }
122 uint64_t channel_id = self_attr.channel_id();
123
124 std::shared_ptr<ListenerHandler<MessageT>> handler;
125 ListenerHandlerBasePtr* handler_base = nullptr;
126 if (msg_listeners_.Get(channel_id, &handler_base)) {
127 handler =
128 std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
129 if (handler == nullptr) {
130 AERROR << "please ensure that readers with the same channel["
131 << self_attr.channel_name()
132 << "] in the same process have the same message type";
133 return;
134 }
135 } else {
136 ADEBUG << "new reader for channel:"
137 << GlobalData::GetChannelById(channel_id);
138 handler.reset(new ListenerHandler<MessageT>());
139 msg_listeners_.Set(channel_id, handler);
140 }
141 handler->Connect(self_attr.id(), opposite_attr.id(), listener);
142}
143
144template <typename MessageT>
146 if (is_shutdown_.load()) {
147 return;
148 }
149 uint64_t channel_id = self_attr.channel_id();
150
151 ListenerHandlerBasePtr* handler_base = nullptr;
152 if (msg_listeners_.Get(channel_id, &handler_base)) {
153 (*handler_base)->Disconnect(self_attr.id());
154 }
155}
156
157template <typename MessageT>
159 const RoleAttributes& opposite_attr) {
160 if (is_shutdown_.load()) {
161 return;
162 }
163 uint64_t channel_id = self_attr.channel_id();
164
165 ListenerHandlerBasePtr* handler_base = nullptr;
166 if (msg_listeners_.Get(channel_id, &handler_base)) {
167 (*handler_base)->Disconnect(self_attr.id(), opposite_attr.id());
168 }
169}
170
171} // namespace transport
172} // namespace cyber
173} // namespace apollo
174
175#endif // CYBER_TRANSPORT_DISPATCHER_DISPATCHER_H_
A implementation of lock-free fixed size hash map
static std::string GetChannelById(uint64_t id)
void AddListener(const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
Definition dispatcher.h:88
void RemoveListener(const RoleAttributes &self_attr)
Definition dispatcher.h:145
bool HasChannel(uint64_t channel_id)
Definition dispatcher.cc:32
AtomicHashMap< uint64_t, ListenerHandlerBasePtr > msg_listeners_
Definition dispatcher.h:83
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr
std::function< void(const std::shared_ptr< MessageT > &, const MessageInfo &)> MessageListener
Definition dispatcher.h:53
std::shared_ptr< Dispatcher > DispatcherPtr
Definition dispatcher.h:49
class register implement
Definition arena_queue.h:37