Apollo 11.0
自动驾驶开放平台
channel_manager.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <algorithm>
20#include <set>
21#include <utility>
22
24#include "cyber/common/log.h"
28#include "cyber/state.h"
29#include "cyber/time/time.h"
30
31namespace apollo {
32namespace cyber {
33namespace service_discovery {
34
36 allowed_role_ |= 1 << RoleType::ROLE_WRITER;
37 allowed_role_ |= 1 << RoleType::ROLE_READER;
38 change_type_ = ChangeType::CHANGE_CHANNEL;
39 channel_name_ = "channel_change_broadcast";
40 exempted_msg_types_.emplace(message::MessageType<message::RawMessage>());
41 exempted_msg_types_.emplace(message::MessageType<message::PyMessageWrap>());
42}
43
45
46void ChannelManager::GetChannelNames(std::vector<std::string>* channels) {
47 RETURN_IF_NULL(channels);
48
49 std::unordered_set<std::string> local_channels;
50 std::vector<RolePtr> roles;
51 channel_writers_.GetAllRoles(&roles);
52 channel_readers_.GetAllRoles(&roles);
53 for (auto& role : roles) {
54 local_channels.emplace(role->attributes().channel_name());
55 }
56 std::move(local_channels.begin(), local_channels.end(),
57 std::back_inserter(*channels));
58}
59
60void ChannelManager::GetProtoDesc(const std::string& channel_name,
61 std::string* proto_desc) {
62 RETURN_IF_NULL(proto_desc);
63 uint64_t key = common::GlobalData::RegisterChannel(channel_name);
64 RolePtr writer = nullptr;
65 if (!channel_writers_.Search(key, &writer)) {
66 return;
67 }
68
69 if (writer->attributes().has_proto_desc()) {
70 *proto_desc = writer->attributes().proto_desc();
71 }
72}
73
74void ChannelManager::GetMsgType(const std::string& channel_name,
75 std::string* msg_type) {
76 RETURN_IF_NULL(msg_type);
77 uint64_t key = common::GlobalData::RegisterChannel(channel_name);
78 RolePtr writer = nullptr;
79 if (!channel_writers_.Search(key, &writer)) {
80 AERROR << "cannot find writer of channel: " << channel_name
81 << " key: " << key;
82 return;
83 }
84
85 if (writer->attributes().has_message_type()) {
86 *msg_type = writer->attributes().message_type();
87 }
88}
89
90bool ChannelManager::HasWriter(const std::string& channel_name) {
91 uint64_t key = common::GlobalData::RegisterChannel(channel_name);
92 return channel_writers_.Search(key);
93}
94
96 RETURN_IF_NULL(writers);
97 channel_writers_.GetAllRoles(writers);
98}
99
100void ChannelManager::GetWritersOfNode(const std::string& node_name,
101 RoleAttrVec* writers) {
102 RETURN_IF_NULL(writers);
103 uint64_t key = common::GlobalData::RegisterNode(node_name);
104 node_writers_.Search(key, writers);
105}
106
107void ChannelManager::GetWritersOfChannel(const std::string& channel_name,
108 RoleAttrVec* writers) {
109 RETURN_IF_NULL(writers);
110 uint64_t key = common::GlobalData::RegisterChannel(channel_name);
111 channel_writers_.Search(key, writers);
112}
113
114bool ChannelManager::HasReader(const std::string& channel_name) {
115 uint64_t key = common::GlobalData::RegisterChannel(channel_name);
116 return channel_readers_.Search(key);
117}
118
120 RETURN_IF_NULL(readers);
121 channel_readers_.GetAllRoles(readers);
122}
123
124void ChannelManager::GetReadersOfNode(const std::string& node_name,
125 RoleAttrVec* readers) {
126 RETURN_IF_NULL(readers);
127 uint64_t key = common::GlobalData::RegisterNode(node_name);
128 node_readers_.Search(key, readers);
129}
130
131void ChannelManager::GetReadersOfChannel(const std::string& channel_name,
132 RoleAttrVec* readers) {
133 RETURN_IF_NULL(readers);
134 uint64_t key = common::GlobalData::RegisterChannel(channel_name);
135 channel_readers_.Search(key, readers);
136}
137
138void ChannelManager::GetUpstreamOfNode(const std::string& node_name,
139 RoleAttrVec* upstream_nodes) {
140 RETURN_IF_NULL(upstream_nodes);
141
142 RoleAttrVec readers;
143 GetReadersOfNode(node_name, &readers);
144 if (readers.empty()) {
145 return;
146 }
147 std::unordered_set<std::string> channels;
148 for (auto& reader : readers) {
149 channels.emplace(reader.channel_name());
150 }
151
152 RoleAttrVec writers;
153 for (auto& channel : channels) {
154 GetWritersOfChannel(channel, &writers);
155 }
156
157 std::unordered_map<std::string, proto::RoleAttributes> nodes;
158 for (auto& writer : writers) {
160 attr.set_host_name(writer.host_name());
161 attr.set_process_id(writer.process_id());
162 attr.set_node_name(writer.node_name());
163 attr.set_node_id(writer.node_id());
164 nodes[attr.node_name()] = attr;
165 }
166 for (auto& item : nodes) {
167 upstream_nodes->emplace_back(item.second);
168 }
169}
170
171void ChannelManager::GetDownstreamOfNode(const std::string& node_name,
172 RoleAttrVec* downstream_nodes) {
173 RETURN_IF_NULL(downstream_nodes);
174
175 RoleAttrVec writers;
176 GetWritersOfNode(node_name, &writers);
177 if (writers.empty()) {
178 return;
179 }
180 std::unordered_set<std::string> channels;
181 for (auto& writer : writers) {
182 channels.emplace(writer.channel_name());
183 }
184
185 RoleAttrVec readers;
186 for (auto& channel : channels) {
187 GetReadersOfChannel(channel, &readers);
188 }
189
190 std::unordered_map<std::string, proto::RoleAttributes> nodes;
191 for (auto& reader : readers) {
193 attr.set_host_name(reader.host_name());
194 attr.set_process_id(reader.process_id());
195 attr.set_node_name(reader.node_name());
196 attr.set_node_id(reader.node_id());
197 nodes[attr.node_name()] = attr;
198 }
199 for (auto& item : nodes) {
200 downstream_nodes->emplace_back(item.second);
201 }
202}
203
205 const std::string& lhs_node_name, const std::string& rhs_node_name) {
206 Vertice lhs(lhs_node_name);
207 Vertice rhs(rhs_node_name);
208 return node_graph_.GetDirectionOf(lhs, rhs);
209}
210
211bool ChannelManager::IsMessageTypeMatching(const std::string& lhs,
212 const std::string& rhs) {
213 if (lhs == rhs) {
214 return true;
215 }
216 if (exempted_msg_types_.count(lhs) > 0) {
217 return true;
218 }
219 if (exempted_msg_types_.count(rhs) > 0) {
220 return true;
221 }
222 return false;
223}
224
225bool ChannelManager::Check(const RoleAttributes& attr) {
226 RETURN_VAL_IF(!attr.has_channel_name(), false);
227 RETURN_VAL_IF(!attr.has_channel_id(), false);
228 RETURN_VAL_IF(!attr.has_id(), false);
229 return true;
230}
231
232void ChannelManager::Dispose(const ChangeMsg& msg) {
233 if (msg.operate_type() == OperateType::OPT_JOIN) {
234 DisposeJoin(msg);
235 } else {
236 DisposeLeave(msg);
237 }
238 Notify(msg);
239}
240
241void ChannelManager::OnTopoModuleLeave(const std::string& host_name,
242 int process_id) {
244
245 RoleAttributes attr;
246 attr.set_host_name(host_name);
247 attr.set_process_id(process_id);
248
249 std::vector<RolePtr> writers_to_remove;
250 channel_writers_.Search(attr, &writers_to_remove);
251
252 std::vector<RolePtr> readers_to_remove;
253 channel_readers_.Search(attr, &readers_to_remove);
254
255 ChangeMsg msg;
256 for (auto& writer : writers_to_remove) {
257 Convert(writer->attributes(), RoleType::ROLE_WRITER, OperateType::OPT_LEAVE,
258 &msg);
259 DisposeLeave(msg);
260 Notify(msg);
261 }
262
263 for (auto& reader : readers_to_remove) {
264 Convert(reader->attributes(), RoleType::ROLE_READER, OperateType::OPT_LEAVE,
265 &msg);
266 DisposeLeave(msg);
267 Notify(msg);
268 }
269}
270
271void ChannelManager::DisposeJoin(const ChangeMsg& msg) {
272 ScanMessageType(msg);
273
274 Vertice v(msg.role_attr().node_name());
275 Edge e;
276 e.set_value(msg.role_attr().channel_name());
277 if (msg.role_type() == RoleType::ROLE_WRITER) {
278 if (msg.role_attr().has_proto_desc() &&
279 msg.role_attr().proto_desc() != "") {
280 message::ProtobufFactory::Instance()->RegisterMessage(
281 msg.role_attr().proto_desc());
282 }
283 auto role = std::make_shared<RoleWriter>(msg.role_attr(), msg.timestamp());
284 node_writers_.Add(role->attributes().node_id(), role);
285 channel_writers_.Add(role->attributes().channel_id(), role);
286 e.set_src(v);
287 } else {
288 auto role = std::make_shared<RoleReader>(msg.role_attr(), msg.timestamp());
289 node_readers_.Add(role->attributes().node_id(), role);
290 channel_readers_.Add(role->attributes().channel_id(), role);
291 e.set_dst(v);
292 }
293 node_graph_.Insert(e);
294}
295
296void ChannelManager::DisposeLeave(const ChangeMsg& msg) {
297 Vertice v(msg.role_attr().node_name());
298 Edge e;
299 e.set_value(msg.role_attr().channel_name());
300 if (msg.role_type() == RoleType::ROLE_WRITER) {
301 auto role = std::make_shared<RoleWriter>(msg.role_attr());
302 node_writers_.Remove(role->attributes().node_id(), role);
303 channel_writers_.Remove(role->attributes().channel_id(), role);
304 e.set_src(v);
305 } else {
306 auto role = std::make_shared<RoleReader>(msg.role_attr());
307 node_readers_.Remove(role->attributes().node_id(), role);
308 channel_readers_.Remove(role->attributes().channel_id(), role);
309 e.set_dst(v);
310 }
311 node_graph_.Delete(e);
312}
313
314void ChannelManager::ScanMessageType(const ChangeMsg& msg) {
315 uint64_t key = msg.role_attr().channel_id();
316 std::string role_type("reader");
317 if (msg.role_type() == RoleType::ROLE_WRITER) {
318 role_type = "writer";
319 }
320
321 RoleAttrVec existed_writers;
322 channel_writers_.Search(key, &existed_writers);
323 for (auto& w_attr : existed_writers) {
324 if (!IsMessageTypeMatching(msg.role_attr().message_type(),
325 w_attr.message_type())) {
326 AERROR << "newly added " << role_type << "(belongs to node["
327 << msg.role_attr().node_name() << "])"
328 << "'s message type[" << msg.role_attr().message_type()
329 << "] does not match the exsited writer(belongs to node["
330 << w_attr.node_name() << "])'s message type["
331 << w_attr.message_type() << "].";
332 }
333 }
334
335 RoleAttrVec existed_readers;
336 channel_readers_.Search(key, &existed_readers);
337 for (auto& r_attr : existed_readers) {
338 if (!IsMessageTypeMatching(msg.role_attr().message_type(),
339 r_attr.message_type())) {
340 AERROR << "newly added " << role_type << "(belongs to node["
341 << msg.role_attr().node_name() << "])"
342 << "'s message type[" << msg.role_attr().message_type()
343 << "] does not match the exsited reader(belongs to node["
344 << r_attr.node_name() << "])'s message type["
345 << r_attr.message_type() << "].";
346 }
347 }
348}
349
350} // namespace service_discovery
351} // namespace cyber
352} // namespace apollo
static uint64_t RegisterNode(const std::string &node_name)
static uint64_t RegisterChannel(const std::string &channel)
void GetMsgType(const std::string &channel_name, std::string *msg_type)
Get the Msg Type of channel_name
void GetProtoDesc(const std::string &channel_name, std::string *proto_desc)
Get the Protocol Desc of channel_name
void GetReadersOfNode(const std::string &node_name, RoleAttrVec *readers)
Get the Readers Of Node object
bool HasWriter(const std::string &channel_name)
Inquire if there is at least one Writer that publishes channel_name
void GetWritersOfChannel(const std::string &channel_name, RoleAttrVec *writers)
Get the Writers Of Channel object
std::vector< proto::RoleAttributes > RoleAttrVec
void GetUpstreamOfNode(const std::string &node_name, RoleAttrVec *upstream_nodes)
Get the Upstream Of Node object.
void GetWritersOfNode(const std::string &node_name, RoleAttrVec *writers)
Get the Writers Of Node object
bool HasReader(const std::string &channel_name)
Inquire if there is at least one Reader that publishes channel_name
void GetReadersOfChannel(const std::string &channel_name, RoleAttrVec *readers)
Get the Readers Of Channel object
bool IsMessageTypeMatching(const std::string &lhs, const std::string &rhs)
Is lhs and rhs have same MessageType
void GetWriters(RoleAttrVec *writers)
Get All Writers object
void GetReaders(RoleAttrVec *readers)
Get All Readers object
void GetChannelNames(std::vector< std::string > *channels)
Get all channel names in the topology
FlowDirection GetFlowDirection(const std::string &lhs_node_name, const std::string &rhs_node_name)
Get the Flow Direction from lhs_node_node to rhs_node_name You can see FlowDirection's description fo...
ChannelManager()
Construct a new Channel Manager object
virtual ~ChannelManager()
Destroy the Channel Manager object
void GetDownstreamOfNode(const std::string &node_name, RoleAttrVec *downstream_nodes)
Get the Downstream Of Node object.
FlowDirection GetDirectionOf(const Vertice &lhs, const Vertice &rhs)
Definition graph.cc:145
void Notify(const ChangeMsg &msg)
Definition manager.cc:186
void Convert(const RoleAttributes &attr, RoleType role, OperateType opt, ChangeMsg *msg)
Definition manager.cc:170
std::atomic< bool > is_discovery_started_
Definition manager.h:155
void GetAllRoles(std::vector< RolePtr > *roles) override
bool Add(uint64_t key, const RolePtr &role, bool ignore_if_exist=true) override
#define RETURN_IF_NULL(ptr)
Definition log.h:90
#define RETURN_VAL_IF(condition, val)
Definition log.h:114
#define AERROR
Definition log.h:44
#define RETURN_IF(condition)
Definition log.h:106
FlowDirection
describe the flow direction between nodes As the DAG below A-—>B--—>C<--—D GetDirectionOf(A,...
Definition graph.h:39
std::shared_ptr< RoleBase > RolePtr
Definition role.h:31
class register implement
Definition arena_queue.h:37