Apollo 10.0
自动驾驶开放平台
channel_manager.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2024 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <algorithm>
20#include <utility>
21
23#include "cyber/common/log.h"
27
28namespace apollo {
29namespace cyber {
30namespace service_discovery {
31
33 allowed_role_ |= 1 << RoleType::ROLE_WRITER;
34 allowed_role_ |= 1 << RoleType::ROLE_READER;
35 change_type_ = ChangeType::CHANGE_CHANNEL;
36 channel_name_ = "channel_change_broadcast";
37 exempted_msg_types_.emplace(message::MessageType<message::RawMessage>());
38 exempted_msg_types_.emplace(message::MessageType<message::PyMessageWrap>());
39}
40
42
43void ChannelManager::GetChannelNames(std::vector<std::string>* channels) {
44 RETURN_IF_NULL(channels);
45
46 std::unordered_set<std::string> local_channels;
47 std::vector<RolePtr> roles;
48 channel_writers_.GetAllRoles(&roles);
49 channel_readers_.GetAllRoles(&roles);
50 for (auto& role : roles) {
51 local_channels.emplace(role->attributes().channel_name());
52 }
53 std::move(local_channels.begin(), local_channels.end(),
54 std::back_inserter(*channels));
55}
56
57void ChannelManager::GetProtoDesc(const std::string& channel_name,
58 std::string* proto_desc) {
59 RETURN_IF_NULL(proto_desc);
60 uint64_t key = common::GlobalData::RegisterChannel(channel_name);
61 RolePtr writer = nullptr;
62 if (!channel_writers_.Search(key, &writer)) {
63 return;
64 }
65
66 if (writer->attributes().has_proto_desc()) {
67 *proto_desc = writer->attributes().proto_desc();
68 }
69}
70
71void ChannelManager::GetMsgType(const std::string& channel_name,
72 std::string* msg_type) {
73 RETURN_IF_NULL(msg_type);
74 uint64_t key = common::GlobalData::RegisterChannel(channel_name);
75 RolePtr writer = nullptr;
76 if (!channel_writers_.Search(key, &writer)) {
77 AERROR << "cannot find writer of channel: " << channel_name
78 << " key: " << key;
79 return;
80 }
81
82 if (writer->attributes().has_message_type()) {
83 *msg_type = writer->attributes().message_type();
84 }
85}
86
87bool ChannelManager::HasWriter(const std::string& channel_name) {
88 uint64_t key = common::GlobalData::RegisterChannel(channel_name);
89 return channel_writers_.Search(key);
90}
91
93 RETURN_IF_NULL(writers);
94 channel_writers_.GetAllRoles(writers);
95}
96
97void ChannelManager::GetWritersOfNode(const std::string& node_name,
98 RoleAttrVec* writers) {
99 RETURN_IF_NULL(writers);
100 uint64_t key = common::GlobalData::RegisterNode(node_name);
101 node_writers_.Search(key, writers);
102}
103
104void ChannelManager::GetWritersOfChannel(const std::string& channel_name,
105 RoleAttrVec* writers) {
106 RETURN_IF_NULL(writers);
107 uint64_t key = common::GlobalData::RegisterChannel(channel_name);
108 channel_writers_.Search(key, writers);
109}
110
111bool ChannelManager::HasReader(const std::string& channel_name) {
112 uint64_t key = common::GlobalData::RegisterChannel(channel_name);
113 return channel_readers_.Search(key);
114}
115
117 RETURN_IF_NULL(readers);
118 channel_readers_.GetAllRoles(readers);
119}
120
121void ChannelManager::GetReadersOfNode(const std::string& node_name,
122 RoleAttrVec* readers) {
123 RETURN_IF_NULL(readers);
124 uint64_t key = common::GlobalData::RegisterNode(node_name);
125 node_readers_.Search(key, readers);
126}
127
128void ChannelManager::GetReadersOfChannel(const std::string& channel_name,
129 RoleAttrVec* readers) {
130 RETURN_IF_NULL(readers);
131 uint64_t key = common::GlobalData::RegisterChannel(channel_name);
132 channel_readers_.Search(key, readers);
133}
134
135void ChannelManager::GetUpstreamOfNode(const std::string& node_name,
136 RoleAttrVec* upstream_nodes) {
137 RETURN_IF_NULL(upstream_nodes);
138
139 RoleAttrVec readers;
140 GetReadersOfNode(node_name, &readers);
141 if (readers.empty()) {
142 return;
143 }
144 std::unordered_set<std::string> channels;
145 for (auto& reader : readers) {
146 channels.emplace(reader.channel_name());
147 }
148
149 RoleAttrVec writers;
150 for (auto& channel : channels) {
151 GetWritersOfChannel(channel, &writers);
152 }
153
154 std::unordered_map<std::string, proto::RoleAttributes> nodes;
155 for (auto& writer : writers) {
157 attr.set_host_name(writer.host_name());
158 attr.set_process_id(writer.process_id());
159 attr.set_node_name(writer.node_name());
160 attr.set_node_id(writer.node_id());
161 nodes[attr.node_name()] = attr;
162 }
163 for (auto& item : nodes) {
164 upstream_nodes->emplace_back(item.second);
165 }
166}
167
168void ChannelManager::GetDownstreamOfNode(const std::string& node_name,
169 RoleAttrVec* downstream_nodes) {
170 RETURN_IF_NULL(downstream_nodes);
171
172 RoleAttrVec writers;
173 GetWritersOfNode(node_name, &writers);
174 if (writers.empty()) {
175 return;
176 }
177 std::unordered_set<std::string> channels;
178 for (auto& writer : writers) {
179 channels.emplace(writer.channel_name());
180 }
181
182 RoleAttrVec readers;
183 for (auto& channel : channels) {
184 GetReadersOfChannel(channel, &readers);
185 }
186
187 std::unordered_map<std::string, proto::RoleAttributes> nodes;
188 for (auto& reader : readers) {
190 attr.set_host_name(reader.host_name());
191 attr.set_process_id(reader.process_id());
192 attr.set_node_name(reader.node_name());
193 attr.set_node_id(reader.node_id());
194 nodes[attr.node_name()] = attr;
195 }
196 for (auto& item : nodes) {
197 downstream_nodes->emplace_back(item.second);
198 }
199}
200
202 const std::string& lhs_node_name, const std::string& rhs_node_name) {
203 Vertice lhs(lhs_node_name);
204 Vertice rhs(rhs_node_name);
205 return node_graph_.GetDirectionOf(lhs, rhs);
206}
207
208bool ChannelManager::IsMessageTypeMatching(const std::string& lhs,
209 const std::string& rhs) {
210 if (lhs == rhs) {
211 return true;
212 }
213 if (exempted_msg_types_.count(lhs) > 0) {
214 return true;
215 }
216 if (exempted_msg_types_.count(rhs) > 0) {
217 return true;
218 }
219 return false;
220}
221
222bool ChannelManager::Check(const RoleAttributes& attr) {
223 RETURN_VAL_IF(!attr.has_channel_name(), false);
224 RETURN_VAL_IF(!attr.has_channel_id(), false);
225 RETURN_VAL_IF(!attr.has_id(), false);
226 return true;
227}
228
229void ChannelManager::Dispose(const ChangeMsg& msg) {
230 if (msg.operate_type() == OperateType::OPT_JOIN) {
231 DisposeJoin(msg);
232 } else {
233 DisposeLeave(msg);
234 }
235 Notify(msg);
236}
237
238void ChannelManager::OnTopoModuleLeave(const std::string& host_name,
239 int process_id) {
241
242 RoleAttributes attr;
243 attr.set_host_name(host_name);
244 attr.set_process_id(process_id);
245
246 std::vector<RolePtr> writers_to_remove;
247 channel_writers_.Search(attr, &writers_to_remove);
248
249 std::vector<RolePtr> readers_to_remove;
250 channel_readers_.Search(attr, &readers_to_remove);
251
252 ChangeMsg msg;
253 for (auto& writer : writers_to_remove) {
254 Convert(writer->attributes(), RoleType::ROLE_WRITER, OperateType::OPT_LEAVE,
255 &msg);
256 DisposeLeave(msg);
257 Notify(msg);
258 }
259
260 for (auto& reader : readers_to_remove) {
261 Convert(reader->attributes(), RoleType::ROLE_READER, OperateType::OPT_LEAVE,
262 &msg);
263 DisposeLeave(msg);
264 Notify(msg);
265 }
266}
267
268void ChannelManager::DisposeJoin(const ChangeMsg& msg) {
269 ScanMessageType(msg);
270
271 Vertice v(msg.role_attr().node_name());
272 Edge e;
273 e.set_value(msg.role_attr().channel_name());
274 if (msg.role_type() == RoleType::ROLE_WRITER) {
275 if (msg.role_attr().has_proto_desc() &&
276 msg.role_attr().proto_desc() != "") {
277 message::ProtobufFactory::Instance()->RegisterMessage(
278 msg.role_attr().proto_desc());
279 }
280 auto role = std::make_shared<RoleWriter>(msg.role_attr(), msg.timestamp());
281 node_writers_.Add(role->attributes().node_id(), role);
282 channel_writers_.Add(role->attributes().channel_id(), role);
283 e.set_src(v);
284 } else {
285 auto role = std::make_shared<RoleReader>(msg.role_attr(), msg.timestamp());
286 node_readers_.Add(role->attributes().node_id(), role);
287 channel_readers_.Add(role->attributes().channel_id(), role);
288 e.set_dst(v);
289 }
290 node_graph_.Insert(e);
291}
292
293void ChannelManager::DisposeLeave(const ChangeMsg& msg) {
294 Vertice v(msg.role_attr().node_name());
295 Edge e;
296 e.set_value(msg.role_attr().channel_name());
297 if (msg.role_type() == RoleType::ROLE_WRITER) {
298 auto role = std::make_shared<RoleWriter>(msg.role_attr());
299 node_writers_.Remove(role->attributes().node_id(), role);
300 channel_writers_.Remove(role->attributes().channel_id(), role);
301 e.set_src(v);
302 } else {
303 auto role = std::make_shared<RoleReader>(msg.role_attr());
304 node_readers_.Remove(role->attributes().node_id(), role);
305 channel_readers_.Remove(role->attributes().channel_id(), role);
306 e.set_dst(v);
307 }
308 node_graph_.Delete(e);
309}
310
311void ChannelManager::ScanMessageType(const ChangeMsg& msg) {
312 uint64_t key = msg.role_attr().channel_id();
313 std::string role_type("reader");
314 if (msg.role_type() == RoleType::ROLE_WRITER) {
315 role_type = "writer";
316 }
317
318 RoleAttrVec existed_writers;
319 channel_writers_.Search(key, &existed_writers);
320 for (auto& w_attr : existed_writers) {
321 if (!IsMessageTypeMatching(msg.role_attr().message_type(),
322 w_attr.message_type())) {
323 AERROR << "newly added " << role_type << "(belongs to node["
324 << msg.role_attr().node_name() << "])"
325 << "'s message type[" << msg.role_attr().message_type()
326 << "] does not match the exsited writer(belongs to node["
327 << w_attr.node_name() << "])'s message type["
328 << w_attr.message_type() << "].";
329 }
330 }
331
332 RoleAttrVec existed_readers;
333 channel_readers_.Search(key, &existed_readers);
334 for (auto& r_attr : existed_readers) {
335 if (!IsMessageTypeMatching(msg.role_attr().message_type(),
336 r_attr.message_type())) {
337 AERROR << "newly added " << role_type << "(belongs to node["
338 << msg.role_attr().node_name() << "])"
339 << "'s message type[" << msg.role_attr().message_type()
340 << "] does not match the exsited reader(belongs to node["
341 << r_attr.node_name() << "])'s message type["
342 << r_attr.message_type() << "].";
343 }
344 }
345}
346
347} // namespace service_discovery
348} // namespace cyber
349} // namespace apollo
static uint64_t RegisterNode(const std::string &node_name)
static uint64_t RegisterChannel(const std::string &channel)
void GetMsgType(const std::string &channel_name, std::string *msg_type)
Get the Msg Type of channel_name
void GetProtoDesc(const std::string &channel_name, std::string *proto_desc)
Get the Protocol Desc of channel_name
void GetReadersOfNode(const std::string &node_name, RoleAttrVec *readers)
Get the Readers Of Node object
bool HasWriter(const std::string &channel_name)
Inquire if there is at least one Writer that publishes channel_name
void GetWritersOfChannel(const std::string &channel_name, RoleAttrVec *writers)
Get the Writers Of Channel object
std::vector< proto::RoleAttributes > RoleAttrVec
void GetUpstreamOfNode(const std::string &node_name, RoleAttrVec *upstream_nodes)
Get the Upstream Of Node object.
void GetWritersOfNode(const std::string &node_name, RoleAttrVec *writers)
Get the Writers Of Node object
bool HasReader(const std::string &channel_name)
Inquire if there is at least one Reader that publishes channel_name
void GetReadersOfChannel(const std::string &channel_name, RoleAttrVec *readers)
Get the Readers Of Channel object
bool IsMessageTypeMatching(const std::string &lhs, const std::string &rhs)
Is lhs and rhs have same MessageType
void GetWriters(RoleAttrVec *writers)
Get All Writers object
void GetReaders(RoleAttrVec *readers)
Get All Readers object
void GetChannelNames(std::vector< std::string > *channels)
Get all channel names in the topology
FlowDirection GetFlowDirection(const std::string &lhs_node_name, const std::string &rhs_node_name)
Get the Flow Direction from lhs_node_node to rhs_node_name You can see FlowDirection's description fo...
ChannelManager()
Construct a new Channel Manager object
virtual ~ChannelManager()
Destroy the Channel Manager object
void GetDownstreamOfNode(const std::string &node_name, RoleAttrVec *downstream_nodes)
Get the Downstream Of Node object.
FlowDirection GetDirectionOf(const Vertice &lhs, const Vertice &rhs)
Definition graph.cc:145
void Notify(const ChangeMsg &msg)
Definition manager.cc:153
void Convert(const RoleAttributes &attr, RoleType role, OperateType opt, ChangeMsg *msg)
Definition manager.cc:137
std::atomic< bool > is_discovery_started_
Definition manager.h:156
void GetAllRoles(std::vector< RolePtr > *roles) override
bool Add(uint64_t key, const RolePtr &role, bool ignore_if_exist=true) override
#define RETURN_IF_NULL(ptr)
Definition log.h:90
#define RETURN_VAL_IF(condition, val)
Definition log.h:114
#define AERROR
Definition log.h:44
#define RETURN_IF(condition)
Definition log.h:106
FlowDirection
describe the flow direction between nodes As the DAG below A-—>B--—>C<--—D GetDirectionOf(A,...
Definition graph.h:39
std::shared_ptr< RoleBase > RolePtr
Definition role.h:31
class register implement
Definition arena_queue.h:37