Apollo 10.0
自动驾驶开放平台
manager.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
20#include "cyber/common/log.h"
22#include "cyber/time/time.h"
25
26namespace apollo {
27namespace cyber {
28namespace service_discovery {
29
30using transport::QosProfileConf;
31
33 : is_shutdown_(false),
34 is_discovery_started_(false),
35 allowed_role_(0),
36 change_type_(proto::ChangeType::CHANGE_PARTICIPANT),
37 channel_name_(""),
38 publisher_(nullptr),
39 subscriber_(nullptr) {
40 host_name_ = common::GlobalData::Instance()->HostName();
41 process_id_ = common::GlobalData::Instance()->ProcessId();
42}
43
45
46bool Manager::StartDiscovery(const ParticipantPtr& participant) {
47 if (participant == nullptr) {
48 return false;
49 }
50 if (is_discovery_started_.exchange(true)) {
51 return true;
52 }
53 if (!CreatePublisher(participant) || !CreateSubscriber(participant)) {
54 AERROR << "create publisher or subscriber failed.";
56 return false;
57 }
58 participant_ = participant;
59 return true;
60}
61
63 if (!is_discovery_started_.exchange(false)) {
64 return;
65 }
66}
67
69 if (is_shutdown_.exchange(true)) {
70 return;
71 }
72
75}
76
77bool Manager::Join(const RoleAttributes& attr, RoleType role,
78 bool need_publish) {
79 if (is_shutdown_.load()) {
80 ADEBUG << "the manager has been shut down.";
81 return false;
82 }
83 RETURN_VAL_IF(!((1 << role) & allowed_role_), false);
84 RETURN_VAL_IF(!Check(attr), false);
85 ChangeMsg msg;
86 Convert(attr, role, OperateType::OPT_JOIN, &msg);
87 Dispose(msg);
88 if (need_publish) {
89 return Publish(msg);
90 }
91 return true;
92}
93
94bool Manager::Leave(const RoleAttributes& attr, RoleType role) {
95 if (is_shutdown_.load()) {
96 ADEBUG << "the manager has been shut down.";
97 return false;
98 }
99 RETURN_VAL_IF(!((1 << role) & allowed_role_), false);
100 RETURN_VAL_IF(!Check(attr), false);
101 ChangeMsg msg;
102 Convert(attr, role, OperateType::OPT_LEAVE, &msg);
103 Dispose(msg);
104 if (NeedPublish(msg)) {
105 return Publish(msg);
106 }
107 return true;
108}
109
113
115 auto local_conn = conn;
116 local_conn.Disconnect();
117}
118
119bool Manager::CreatePublisher(const ParticipantPtr& participant) {
120 publisher_ = participant->CreatePublisher(
122 return publisher_ != nullptr;
123}
124
126 subscriber_ = participant->CreateSubscriber(
128 std::bind(&Manager::OnRemoteChange, this, std::placeholders::_1));
129 return subscriber_ != nullptr;
130}
131
132bool Manager::NeedPublish(const ChangeMsg& msg) const {
133 (void)msg;
134 return true;
135}
136
137void Manager::Convert(const RoleAttributes& attr, RoleType role,
138 OperateType opt, ChangeMsg* msg) {
139 msg->set_timestamp(cyber::Time::Now().ToNanosecond());
140 msg->set_change_type(change_type_);
141 msg->set_operate_type(opt);
142 msg->set_role_type(role);
143 auto role_attr = msg->mutable_role_attr();
144 role_attr->CopyFrom(attr);
145 if (!role_attr->has_host_name()) {
146 role_attr->set_host_name(host_name_);
147 }
148 if (!role_attr->has_process_id()) {
149 role_attr->set_process_id(process_id_);
150 }
151}
152
153void Manager::Notify(const ChangeMsg& msg) { signal_(msg); }
154
155void Manager::OnRemoteChange(const std::shared_ptr<std::string>& msg_str) {
156 if (is_shutdown_.load()) {
157 ADEBUG << "the manager has been shut down.";
158 return;
159 }
160
161 ChangeMsg msg;
162 RETURN_IF(!message::ParseFromString(*msg_str, &msg));
163 if (IsFromSameProcess(msg)) {
164 return;
165 }
166 RETURN_IF(!Check(msg.role_attr()));
167 Dispose(msg);
168}
169
170bool Manager::Publish(const ChangeMsg& msg) {
171 if (!is_discovery_started_.load()) {
172 ADEBUG << "discovery is not started.";
173 return false;
174 }
175
178 {
179 std::lock_guard<std::mutex> lg(lock_);
180 if (publisher_ != nullptr) {
181 return publisher_->Write(m);
182 }
183 }
184 return true;
185}
186
188 auto& host_name = msg.role_attr().host_name();
189 int process_id = msg.role_attr().process_id();
190
191 if (process_id != process_id_ || host_name != host_name_) {
192 return false;
193 }
194 return true;
195}
196
197} // namespace service_discovery
198} // namespace cyber
199} // namespace apollo
static Time Now()
get the current time.
Definition time.cc:57
ConnectionType Connect(const Callback &cb)
Definition signal.h:65
void Notify(const ChangeMsg &msg)
Definition manager.cc:153
void Convert(const RoleAttributes &attr, RoleType role, OperateType opt, ChangeMsg *msg)
Definition manager.cc:137
std::function< void(const ChangeMsg &)> ChangeFunc
Definition manager.h:57
std::shared_ptr< transport::Participant > ParticipantPtr
Definition manager.h:59
bool Join(const RoleAttributes &attr, RoleType role, bool need_publish=true)
Join the topology
Definition manager.cc:77
bool IsFromSameProcess(const ChangeMsg &msg)
Definition manager.cc:187
bool Publish(const ChangeMsg &msg)
Definition manager.cc:170
void RemoveChangeListener(const ChangeConnection &conn)
Remove our listener for topology change.
Definition manager.cc:114
bool StartDiscovery(const ParticipantPtr &participant)
Startup topology discovery
Definition manager.cc:46
bool CreateSubscriber(const ParticipantPtr &participant)
Definition manager.cc:125
virtual ~Manager()
Destroy the Manager object
Definition manager.cc:44
virtual void Dispose(const ChangeMsg &msg)=0
void OnRemoteChange(const std::shared_ptr< std::string > &msg_str)
Definition manager.cc:155
std::atomic< bool > is_discovery_started_
Definition manager.h:156
Manager()
Construct a new Manager object
Definition manager.cc:32
ChangeConnection AddChangeListener(const ChangeFunc &func)
Add topology change listener, when topology changed, func will be called.
Definition manager.cc:110
bool Leave(const RoleAttributes &attr, RoleType role)
Leave the topology
Definition manager.cc:94
virtual bool Check(const RoleAttributes &attr)=0
void StopDiscovery()
Stop topology discovery
Definition manager.cc:62
virtual bool NeedPublish(const ChangeMsg &msg) const
Definition manager.cc:132
virtual void Shutdown()
Shutdown module
Definition manager.cc:68
bool CreatePublisher(const ParticipantPtr &participant)
Definition manager.cc:119
static const QosProfile QOS_PROFILE_TOPO_CHANGE
This class represents the structure UnderlayMessage defined by the user in the IDL file.
void data(const std::string &_data)
This function copies the value in member data
#define RETURN_VAL_IF(condition, val)
Definition log.h:114
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
#define RETURN_IF(condition)
Definition log.h:106
std::enable_if< HasSerializeToString< T >::value, bool >::type SerializeToString(const T &message, std::string *str)
std::enable_if< HasParseFromString< T >::value, bool >::type ParseFromString(const std::string &str, T *message)
class register implement
Definition arena_queue.h:37
optional RoleAttributes role_attr