25namespace service_discovery {
27TopologyManager::TopologyManager()
29 node_manager_(nullptr),
30 channel_manager_(nullptr),
31 service_manager_(nullptr),
32 participant_(nullptr) {
36TopologyManager::~TopologyManager() { Shutdown(); }
38void TopologyManager::Shutdown() {
39 ADEBUG <<
"topology shutdown.";
41 if (!init_.exchange(
false)) {
45 node_manager_->Shutdown();
46 channel_manager_->Shutdown();
47 service_manager_->Shutdown();
48 participant_->Shutdown();
50 change_signal_.DisconnectAllSlots();
55 return change_signal_.Connect(func);
59 auto local_conn = conn;
63bool TopologyManager::Init() {
64 if (init_.exchange(
true)) {
68 node_manager_ = std::make_shared<NodeManager>();
69 channel_manager_ = std::make_shared<ChannelManager>();
70 service_manager_ = std::make_shared<ServiceManager>();
75 InitNodeManager() && InitChannelManager() && InitServiceManager();
77 AERROR <<
"init manager failed.";
78 participant_ =
nullptr;
79 node_manager_ =
nullptr;
80 channel_manager_ =
nullptr;
81 service_manager_ =
nullptr;
89bool TopologyManager::InitNodeManager() {
90 return node_manager_->StartDiscovery(participant_);
93bool TopologyManager::InitChannelManager() {
94 return channel_manager_->StartDiscovery(participant_);
97bool TopologyManager::InitServiceManager() {
98 return service_manager_->StartDiscovery(participant_);
101bool TopologyManager::CreateParticipant() {
102 std::string participant_name =
103 common::GlobalData::Instance()->HostName() +
'+' +
104 std::to_string(common::GlobalData::Instance()->ProcessId());
105 participant_ = std::make_shared<transport::Participant>(
106 participant_name, 11511,
107 new ParticipantListener(std::bind(&TopologyManager::OnParticipantChange,
108 this, std::placeholders::_1)));
109 if (!participant_->Init()) {
110 AERROR <<
"init participant failed";
116void TopologyManager::OnParticipantChange(
117 const eprosima::fastrtps::rtps::ParticipantDiscoveryInfo& info) {
119 if (!Convert(info, &msg)) {
127 if (msg.operate_type() == OperateType::OPT_LEAVE) {
128 auto& host_name = msg.role_attr().host_name();
129 int process_id = msg.role_attr().process_id();
130 node_manager_->OnTopoModuleLeave(host_name, process_id);
131 channel_manager_->OnTopoModuleLeave(host_name, process_id);
132 service_manager_->OnTopoModuleLeave(host_name, process_id);
137bool TopologyManager::Convert(
138 const eprosima::fastrtps::rtps::ParticipantDiscoveryInfo& info,
140 auto guid = info.info.m_guid;
141 std::string participant_name(
"");
144 switch (info.status) {
145 case eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERY_STATUS::
146 DISCOVERED_PARTICIPANT:
147 participant_name = info.info.m_participantName;
148 AINFO <<
"discovery participant name:" << participant_name;
149 participant_names_[guid] = participant_name;
150 opt_type = OperateType::OPT_JOIN;
152 case eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERY_STATUS::
154 if (participant_names_.find(guid) != participant_names_.end()) {
155 participant_name = participant_names_[guid];
156 AINFO <<
"remove participant name:" << participant_name;
157 participant_names_.erase(guid);
159 opt_type = OperateType::OPT_LEAVE;
161 case eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERY_STATUS::
163 if (participant_names_.find(guid) != participant_names_.end()) {
164 participant_name = participant_names_[guid];
165 AINFO <<
"dropped participant name:" << participant_name;
166 participant_names_.erase(guid);
168 opt_type = OperateType::OPT_LEAVE;
174 std::string host_name(
"");
176 if (!ParseParticipantName(participant_name, &host_name, &process_id)) {
181 msg->set_change_type(ChangeType::CHANGE_PARTICIPANT);
182 msg->set_operate_type(opt_type);
183 msg->set_role_type(RoleType::ROLE_PARTICIPANT);
184 auto role_attr = msg->mutable_role_attr();
185 role_attr->set_host_name(host_name);
186 role_attr->set_process_id(process_id);
190bool TopologyManager::ParseParticipantName(
const std::string& participant_name,
191 std::string* host_name,
194 auto pos = participant_name.find(
'+');
195 if (pos == std::string::npos) {
196 ADEBUG <<
"participant_name [" << participant_name <<
"] format mismatch.";
199 *host_name = participant_name.substr(0, pos);
200 std::string pid_str = participant_name.substr(pos + 1);
202 *process_id = std::stoi(pid_str);
203 }
catch (
const std::exception& e) {
204 AERROR <<
"invalid process_id:" << e.what();
static Time Now()
get the current time.
std::function< void(const ChangeMsg &)> ChangeFunc
bool Init(const char *binary_name, const std::string &dag_info)