Apollo 10.0
自动驾驶开放平台
topology_manager.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
20#include "cyber/common/log.h"
21#include "cyber/time/time.h"
22
23namespace apollo {
24namespace cyber {
25namespace service_discovery {
26
27TopologyManager::TopologyManager()
28 : init_(false),
29 node_manager_(nullptr),
30 channel_manager_(nullptr),
31 service_manager_(nullptr),
32 participant_(nullptr) {
33 Init();
34}
35
36TopologyManager::~TopologyManager() { Shutdown(); }
37
38void TopologyManager::Shutdown() {
39 ADEBUG << "topology shutdown.";
40 // avoid shutdown twice
41 if (!init_.exchange(false)) {
42 return;
43 }
44
45 node_manager_->Shutdown();
46 channel_manager_->Shutdown();
47 service_manager_->Shutdown();
48 participant_->Shutdown();
49
50 change_signal_.DisconnectAllSlots();
51}
52
53TopologyManager::ChangeConnection TopologyManager::AddChangeListener(
54 const ChangeFunc& func) {
55 return change_signal_.Connect(func);
56}
57
58void TopologyManager::RemoveChangeListener(const ChangeConnection& conn) {
59 auto local_conn = conn;
60 local_conn.Disconnect();
61}
62
63bool TopologyManager::Init() {
64 if (init_.exchange(true)) {
65 return true;
66 }
67
68 node_manager_ = std::make_shared<NodeManager>();
69 channel_manager_ = std::make_shared<ChannelManager>();
70 service_manager_ = std::make_shared<ServiceManager>();
71
72 CreateParticipant();
73
74 bool result =
75 InitNodeManager() && InitChannelManager() && InitServiceManager();
76 if (!result) {
77 AERROR << "init manager failed.";
78 participant_ = nullptr;
79 node_manager_ = nullptr;
80 channel_manager_ = nullptr;
81 service_manager_ = nullptr;
82 init_.store(false);
83 return false;
84 }
85
86 return true;
87}
88
89bool TopologyManager::InitNodeManager() {
90 return node_manager_->StartDiscovery(participant_);
91}
92
93bool TopologyManager::InitChannelManager() {
94 return channel_manager_->StartDiscovery(participant_);
95}
96
97bool TopologyManager::InitServiceManager() {
98 return service_manager_->StartDiscovery(participant_);
99}
100
101bool TopologyManager::CreateParticipant() {
102 std::string participant_name =
103 common::GlobalData::Instance()->HostName() + '+' +
104 std::to_string(common::GlobalData::Instance()->ProcessId());
105 participant_ = std::make_shared<transport::Participant>(
106 participant_name, 11511,
107 new ParticipantListener(std::bind(&TopologyManager::OnParticipantChange,
108 this, std::placeholders::_1)));
109 if (!participant_->Init()) {
110 AERROR << "init participant failed";
111 return false;
112 }
113 return true;
114}
115
116void TopologyManager::OnParticipantChange(
117 const eprosima::fastrtps::rtps::ParticipantDiscoveryInfo& info) {
118 ChangeMsg msg;
119 if (!Convert(info, &msg)) {
120 return;
121 }
122
123 if (!init_.load()) {
124 return;
125 }
126
127 if (msg.operate_type() == OperateType::OPT_LEAVE) {
128 auto& host_name = msg.role_attr().host_name();
129 int process_id = msg.role_attr().process_id();
130 node_manager_->OnTopoModuleLeave(host_name, process_id);
131 channel_manager_->OnTopoModuleLeave(host_name, process_id);
132 service_manager_->OnTopoModuleLeave(host_name, process_id);
133 }
134 change_signal_(msg);
135}
136
137bool TopologyManager::Convert(
138 const eprosima::fastrtps::rtps::ParticipantDiscoveryInfo& info,
139 ChangeMsg* msg) {
140 auto guid = info.info.m_guid;
141 std::string participant_name("");
142 OperateType opt_type;
143
144 switch (info.status) {
145 case eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERY_STATUS::
146 DISCOVERED_PARTICIPANT:
147 participant_name = info.info.m_participantName;
148 AINFO << "discovery participant name:" << participant_name;
149 participant_names_[guid] = participant_name;
150 opt_type = OperateType::OPT_JOIN;
151 break;
152 case eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERY_STATUS::
153 REMOVED_PARTICIPANT:
154 if (participant_names_.find(guid) != participant_names_.end()) {
155 participant_name = participant_names_[guid];
156 AINFO << "remove participant name:" << participant_name;
157 participant_names_.erase(guid);
158 }
159 opt_type = OperateType::OPT_LEAVE;
160 break;
161 case eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERY_STATUS::
162 DROPPED_PARTICIPANT:
163 if (participant_names_.find(guid) != participant_names_.end()) {
164 participant_name = participant_names_[guid];
165 AINFO << "dropped participant name:" << participant_name;
166 participant_names_.erase(guid);
167 }
168 opt_type = OperateType::OPT_LEAVE;
169 break;
170 default:
171 break;
172 }
173
174 std::string host_name("");
175 int process_id = 0;
176 if (!ParseParticipantName(participant_name, &host_name, &process_id)) {
177 return false;
178 }
179
180 msg->set_timestamp(cyber::Time::Now().ToNanosecond());
181 msg->set_change_type(ChangeType::CHANGE_PARTICIPANT);
182 msg->set_operate_type(opt_type);
183 msg->set_role_type(RoleType::ROLE_PARTICIPANT);
184 auto role_attr = msg->mutable_role_attr();
185 role_attr->set_host_name(host_name);
186 role_attr->set_process_id(process_id);
187 return true;
188}
189
190bool TopologyManager::ParseParticipantName(const std::string& participant_name,
191 std::string* host_name,
192 int* process_id) {
193 // participant_name format: host_name+process_id
194 auto pos = participant_name.find('+');
195 if (pos == std::string::npos) {
196 ADEBUG << "participant_name [" << participant_name << "] format mismatch.";
197 return false;
198 }
199 *host_name = participant_name.substr(0, pos);
200 std::string pid_str = participant_name.substr(pos + 1);
201 try {
202 *process_id = std::stoi(pid_str);
203 } catch (const std::exception& e) {
204 AERROR << "invalid process_id:" << e.what();
205 return false;
206 }
207 return true;
208}
209
210} // namespace service_discovery
211} // namespace cyber
212} // namespace apollo
static Time Now()
get the current time.
Definition time.cc:57
std::function< void(const ChangeMsg &)> ChangeFunc
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
bool Init(const char *binary_name, const std::string &dag_info)
Definition init.cc:98
class register implement
Definition arena_queue.h:37