30namespace service_discovery {
37 exempted_msg_types_.emplace(message::MessageType<message::RawMessage>());
38 exempted_msg_types_.emplace(message::MessageType<message::PyMessageWrap>());
46 std::unordered_set<std::string> local_channels;
47 std::vector<RolePtr> roles;
50 for (
auto& role : roles) {
51 local_channels.emplace(role->attributes().channel_name());
53 std::move(local_channels.begin(), local_channels.end(),
54 std::back_inserter(*channels));
58 std::string* proto_desc) {
62 if (!channel_writers_.
Search(key, &writer)) {
66 if (writer->attributes().has_proto_desc()) {
67 *proto_desc = writer->attributes().proto_desc();
72 std::string* msg_type) {
76 if (!channel_writers_.
Search(key, &writer)) {
77 AERROR <<
"cannot find writer of channel: " << channel_name
82 if (writer->attributes().has_message_type()) {
83 *msg_type = writer->attributes().message_type();
89 return channel_writers_.
Search(key);
101 node_writers_.
Search(key, writers);
108 channel_writers_.
Search(key, writers);
113 return channel_readers_.
Search(key);
125 node_readers_.
Search(key, readers);
132 channel_readers_.
Search(key, readers);
141 if (readers.empty()) {
144 std::unordered_set<std::string> channels;
145 for (
auto& reader : readers) {
146 channels.emplace(reader.channel_name());
150 for (
auto& channel : channels) {
154 std::unordered_map<std::string, proto::RoleAttributes> nodes;
155 for (
auto& writer : writers) {
157 attr.set_host_name(writer.host_name());
158 attr.set_process_id(writer.process_id());
159 attr.set_node_name(writer.node_name());
160 attr.set_node_id(writer.node_id());
163 for (
auto& item : nodes) {
164 upstream_nodes->emplace_back(item.second);
174 if (writers.empty()) {
177 std::unordered_set<std::string> channels;
178 for (
auto& writer : writers) {
179 channels.emplace(writer.channel_name());
183 for (
auto& channel : channels) {
187 std::unordered_map<std::string, proto::RoleAttributes> nodes;
188 for (
auto& reader : readers) {
190 attr.set_host_name(reader.host_name());
191 attr.set_process_id(reader.process_id());
192 attr.set_node_name(reader.node_name());
193 attr.set_node_id(reader.node_id());
196 for (
auto& item : nodes) {
197 downstream_nodes->emplace_back(item.second);
202 const std::string& lhs_node_name,
const std::string& rhs_node_name) {
209 const std::string& rhs) {
213 if (exempted_msg_types_.count(lhs) > 0) {
216 if (exempted_msg_types_.count(rhs) > 0) {
229void ChannelManager::Dispose(
const ChangeMsg& msg) {
230 if (msg.operate_type() == OperateType::OPT_JOIN) {
238void ChannelManager::OnTopoModuleLeave(
const std::string& host_name,
243 attr.set_host_name(host_name);
244 attr.set_process_id(process_id);
246 std::vector<RolePtr> writers_to_remove;
247 channel_writers_.
Search(attr, &writers_to_remove);
249 std::vector<RolePtr> readers_to_remove;
250 channel_readers_.
Search(attr, &readers_to_remove);
253 for (
auto& writer : writers_to_remove) {
254 Convert(writer->attributes(), RoleType::ROLE_WRITER, OperateType::OPT_LEAVE,
260 for (
auto& reader : readers_to_remove) {
261 Convert(reader->attributes(), RoleType::ROLE_READER, OperateType::OPT_LEAVE,
268void ChannelManager::DisposeJoin(
const ChangeMsg& msg) {
269 ScanMessageType(msg);
271 Vertice v(msg.role_attr().node_name());
273 e.set_value(msg.role_attr().channel_name());
274 if (msg.role_type() == RoleType::ROLE_WRITER) {
275 if (msg.role_attr().has_proto_desc() &&
276 msg.role_attr().proto_desc() !=
"") {
277 message::ProtobufFactory::Instance()->RegisterMessage(
278 msg.role_attr().proto_desc());
280 auto role = std::make_shared<RoleWriter>(msg.role_attr(), msg.timestamp());
281 node_writers_.
Add(role->attributes().node_id(), role);
282 channel_writers_.
Add(role->attributes().channel_id(), role);
285 auto role = std::make_shared<RoleReader>(msg.role_attr(), msg.timestamp());
286 node_readers_.
Add(role->attributes().node_id(), role);
287 channel_readers_.
Add(role->attributes().channel_id(), role);
293void ChannelManager::DisposeLeave(
const ChangeMsg& msg) {
294 Vertice v(msg.role_attr().node_name());
296 e.set_value(msg.role_attr().channel_name());
297 if (msg.role_type() == RoleType::ROLE_WRITER) {
298 auto role = std::make_shared<RoleWriter>(msg.role_attr());
299 node_writers_.
Remove(role->attributes().node_id(), role);
300 channel_writers_.
Remove(role->attributes().channel_id(), role);
303 auto role = std::make_shared<RoleReader>(msg.role_attr());
304 node_readers_.
Remove(role->attributes().node_id(), role);
305 channel_readers_.
Remove(role->attributes().channel_id(), role);
311void ChannelManager::ScanMessageType(
const ChangeMsg& msg) {
312 uint64_t key = msg.role_attr().channel_id();
313 std::string role_type(
"reader");
314 if (msg.role_type() == RoleType::ROLE_WRITER) {
315 role_type =
"writer";
319 channel_writers_.
Search(key, &existed_writers);
320 for (
auto& w_attr : existed_writers) {
322 w_attr.message_type())) {
323 AERROR <<
"newly added " << role_type <<
"(belongs to node["
324 << msg.role_attr().node_name() <<
"])"
325 <<
"'s message type[" << msg.role_attr().message_type()
326 <<
"] does not match the exsited writer(belongs to node["
327 << w_attr.node_name() <<
"])'s message type["
328 << w_attr.message_type() <<
"].";
333 channel_readers_.
Search(key, &existed_readers);
334 for (
auto& r_attr : existed_readers) {
336 r_attr.message_type())) {
337 AERROR <<
"newly added " << role_type <<
"(belongs to node["
338 << msg.role_attr().node_name() <<
"])"
339 <<
"'s message type[" << msg.role_attr().message_type()
340 <<
"] does not match the exsited reader(belongs to node["
341 << r_attr.node_name() <<
"])'s message type["
342 << r_attr.message_type() <<
"].";
static uint64_t RegisterNode(const std::string &node_name)
static uint64_t RegisterChannel(const std::string &channel)
void GetMsgType(const std::string &channel_name, std::string *msg_type)
Get the Msg Type of channel_name
void GetProtoDesc(const std::string &channel_name, std::string *proto_desc)
Get the Protocol Desc of channel_name
void GetReadersOfNode(const std::string &node_name, RoleAttrVec *readers)
Get the Readers Of Node object
bool HasWriter(const std::string &channel_name)
Inquire if there is at least one Writer that publishes channel_name
void GetWritersOfChannel(const std::string &channel_name, RoleAttrVec *writers)
Get the Writers Of Channel object
std::vector< proto::RoleAttributes > RoleAttrVec
void GetUpstreamOfNode(const std::string &node_name, RoleAttrVec *upstream_nodes)
Get the Upstream Of Node object.
void GetWritersOfNode(const std::string &node_name, RoleAttrVec *writers)
Get the Writers Of Node object
bool HasReader(const std::string &channel_name)
Inquire if there is at least one Reader that publishes channel_name
void GetReadersOfChannel(const std::string &channel_name, RoleAttrVec *readers)
Get the Readers Of Channel object
bool IsMessageTypeMatching(const std::string &lhs, const std::string &rhs)
Is lhs and rhs have same MessageType
void GetWriters(RoleAttrVec *writers)
Get All Writers object
void GetReaders(RoleAttrVec *readers)
Get All Readers object
void GetChannelNames(std::vector< std::string > *channels)
Get all channel names in the topology
FlowDirection GetFlowDirection(const std::string &lhs_node_name, const std::string &rhs_node_name)
Get the Flow Direction from lhs_node_node to rhs_node_name You can see FlowDirection's description fo...
ChannelManager()
Construct a new Channel Manager object
virtual ~ChannelManager()
Destroy the Channel Manager object
void GetDownstreamOfNode(const std::string &node_name, RoleAttrVec *downstream_nodes)
Get the Downstream Of Node object.
void Insert(const Edge &e)
FlowDirection GetDirectionOf(const Vertice &lhs, const Vertice &rhs)
void Delete(const Edge &e)
void Notify(const ChangeMsg &msg)
void Convert(const RoleAttributes &attr, RoleType role, OperateType opt, ChangeMsg *msg)
std::string channel_name_
std::atomic< bool > is_discovery_started_
bool Search(uint64_t key) override
void GetAllRoles(std::vector< RolePtr > *roles) override
void Remove(uint64_t key) override
bool Add(uint64_t key, const RolePtr &role, bool ignore_if_exist=true) override
#define RETURN_IF_NULL(ptr)
#define RETURN_VAL_IF(condition, val)
#define RETURN_IF(condition)
FlowDirection
describe the flow direction between nodes As the DAG below A-—>B--—>C<--—D GetDirectionOf(A,...
std::shared_ptr< RoleBase > RolePtr
optional string node_name