33namespace service_discovery {
40 exempted_msg_types_.emplace(message::MessageType<message::RawMessage>());
41 exempted_msg_types_.emplace(message::MessageType<message::PyMessageWrap>());
49 std::unordered_set<std::string> local_channels;
50 std::vector<RolePtr> roles;
53 for (
auto& role : roles) {
54 local_channels.emplace(role->attributes().channel_name());
56 std::move(local_channels.begin(), local_channels.end(),
57 std::back_inserter(*channels));
61 std::string* proto_desc) {
65 if (!channel_writers_.
Search(key, &writer)) {
69 if (writer->attributes().has_proto_desc()) {
70 *proto_desc = writer->attributes().proto_desc();
75 std::string* msg_type) {
79 if (!channel_writers_.
Search(key, &writer)) {
80 AERROR <<
"cannot find writer of channel: " << channel_name
85 if (writer->attributes().has_message_type()) {
86 *msg_type = writer->attributes().message_type();
92 return channel_writers_.
Search(key);
104 node_writers_.
Search(key, writers);
111 channel_writers_.
Search(key, writers);
116 return channel_readers_.
Search(key);
128 node_readers_.
Search(key, readers);
135 channel_readers_.
Search(key, readers);
144 if (readers.empty()) {
147 std::unordered_set<std::string> channels;
148 for (
auto& reader : readers) {
149 channels.emplace(reader.channel_name());
153 for (
auto& channel : channels) {
157 std::unordered_map<std::string, proto::RoleAttributes> nodes;
158 for (
auto& writer : writers) {
160 attr.set_host_name(writer.host_name());
161 attr.set_process_id(writer.process_id());
162 attr.set_node_name(writer.node_name());
163 attr.set_node_id(writer.node_id());
166 for (
auto& item : nodes) {
167 upstream_nodes->emplace_back(item.second);
177 if (writers.empty()) {
180 std::unordered_set<std::string> channels;
181 for (
auto& writer : writers) {
182 channels.emplace(writer.channel_name());
186 for (
auto& channel : channels) {
190 std::unordered_map<std::string, proto::RoleAttributes> nodes;
191 for (
auto& reader : readers) {
193 attr.set_host_name(reader.host_name());
194 attr.set_process_id(reader.process_id());
195 attr.set_node_name(reader.node_name());
196 attr.set_node_id(reader.node_id());
199 for (
auto& item : nodes) {
200 downstream_nodes->emplace_back(item.second);
205 const std::string& lhs_node_name,
const std::string& rhs_node_name) {
212 const std::string& rhs) {
216 if (exempted_msg_types_.count(lhs) > 0) {
219 if (exempted_msg_types_.count(rhs) > 0) {
232void ChannelManager::Dispose(
const ChangeMsg& msg) {
233 if (msg.operate_type() == OperateType::OPT_JOIN) {
241void ChannelManager::OnTopoModuleLeave(
const std::string& host_name,
246 attr.set_host_name(host_name);
247 attr.set_process_id(process_id);
249 std::vector<RolePtr> writers_to_remove;
250 channel_writers_.
Search(attr, &writers_to_remove);
252 std::vector<RolePtr> readers_to_remove;
253 channel_readers_.
Search(attr, &readers_to_remove);
256 for (
auto& writer : writers_to_remove) {
257 Convert(writer->attributes(), RoleType::ROLE_WRITER, OperateType::OPT_LEAVE,
263 for (
auto& reader : readers_to_remove) {
264 Convert(reader->attributes(), RoleType::ROLE_READER, OperateType::OPT_LEAVE,
271void ChannelManager::DisposeJoin(
const ChangeMsg& msg) {
272 ScanMessageType(msg);
274 Vertice v(msg.role_attr().node_name());
276 e.set_value(msg.role_attr().channel_name());
277 if (msg.role_type() == RoleType::ROLE_WRITER) {
278 if (msg.role_attr().has_proto_desc() &&
279 msg.role_attr().proto_desc() !=
"") {
280 message::ProtobufFactory::Instance()->RegisterMessage(
281 msg.role_attr().proto_desc());
283 auto role = std::make_shared<RoleWriter>(msg.role_attr(), msg.timestamp());
284 node_writers_.
Add(role->attributes().node_id(), role);
285 channel_writers_.
Add(role->attributes().channel_id(), role);
288 auto role = std::make_shared<RoleReader>(msg.role_attr(), msg.timestamp());
289 node_readers_.
Add(role->attributes().node_id(), role);
290 channel_readers_.
Add(role->attributes().channel_id(), role);
296void ChannelManager::DisposeLeave(
const ChangeMsg& msg) {
297 Vertice v(msg.role_attr().node_name());
299 e.set_value(msg.role_attr().channel_name());
300 if (msg.role_type() == RoleType::ROLE_WRITER) {
301 auto role = std::make_shared<RoleWriter>(msg.role_attr());
302 node_writers_.
Remove(role->attributes().node_id(), role);
303 channel_writers_.
Remove(role->attributes().channel_id(), role);
306 auto role = std::make_shared<RoleReader>(msg.role_attr());
307 node_readers_.
Remove(role->attributes().node_id(), role);
308 channel_readers_.
Remove(role->attributes().channel_id(), role);
314void ChannelManager::ScanMessageType(
const ChangeMsg& msg) {
315 uint64_t key = msg.role_attr().channel_id();
316 std::string role_type(
"reader");
317 if (msg.role_type() == RoleType::ROLE_WRITER) {
318 role_type =
"writer";
322 channel_writers_.
Search(key, &existed_writers);
323 for (
auto& w_attr : existed_writers) {
325 w_attr.message_type())) {
326 AERROR <<
"newly added " << role_type <<
"(belongs to node["
327 << msg.role_attr().node_name() <<
"])"
328 <<
"'s message type[" << msg.role_attr().message_type()
329 <<
"] does not match the exsited writer(belongs to node["
330 << w_attr.node_name() <<
"])'s message type["
331 << w_attr.message_type() <<
"].";
336 channel_readers_.
Search(key, &existed_readers);
337 for (
auto& r_attr : existed_readers) {
339 r_attr.message_type())) {
340 AERROR <<
"newly added " << role_type <<
"(belongs to node["
341 << msg.role_attr().node_name() <<
"])"
342 <<
"'s message type[" << msg.role_attr().message_type()
343 <<
"] does not match the exsited reader(belongs to node["
344 << r_attr.node_name() <<
"])'s message type["
345 << r_attr.message_type() <<
"].";
static uint64_t RegisterNode(const std::string &node_name)
static uint64_t RegisterChannel(const std::string &channel)
void GetMsgType(const std::string &channel_name, std::string *msg_type)
Get the Msg Type of channel_name
void GetProtoDesc(const std::string &channel_name, std::string *proto_desc)
Get the Protocol Desc of channel_name
void GetReadersOfNode(const std::string &node_name, RoleAttrVec *readers)
Get the Readers Of Node object
bool HasWriter(const std::string &channel_name)
Inquire if there is at least one Writer that publishes channel_name
void GetWritersOfChannel(const std::string &channel_name, RoleAttrVec *writers)
Get the Writers Of Channel object
std::vector< proto::RoleAttributes > RoleAttrVec
void GetUpstreamOfNode(const std::string &node_name, RoleAttrVec *upstream_nodes)
Get the Upstream Of Node object.
void GetWritersOfNode(const std::string &node_name, RoleAttrVec *writers)
Get the Writers Of Node object
bool HasReader(const std::string &channel_name)
Inquire if there is at least one Reader that publishes channel_name
void GetReadersOfChannel(const std::string &channel_name, RoleAttrVec *readers)
Get the Readers Of Channel object
bool IsMessageTypeMatching(const std::string &lhs, const std::string &rhs)
Is lhs and rhs have same MessageType
void GetWriters(RoleAttrVec *writers)
Get All Writers object
void GetReaders(RoleAttrVec *readers)
Get All Readers object
void GetChannelNames(std::vector< std::string > *channels)
Get all channel names in the topology
FlowDirection GetFlowDirection(const std::string &lhs_node_name, const std::string &rhs_node_name)
Get the Flow Direction from lhs_node_node to rhs_node_name You can see FlowDirection's description fo...
ChannelManager()
Construct a new Channel Manager object
virtual ~ChannelManager()
Destroy the Channel Manager object
void GetDownstreamOfNode(const std::string &node_name, RoleAttrVec *downstream_nodes)
Get the Downstream Of Node object.
void Insert(const Edge &e)
FlowDirection GetDirectionOf(const Vertice &lhs, const Vertice &rhs)
void Delete(const Edge &e)
void Notify(const ChangeMsg &msg)
void Convert(const RoleAttributes &attr, RoleType role, OperateType opt, ChangeMsg *msg)
std::string channel_name_
std::atomic< bool > is_discovery_started_
bool Search(uint64_t key) override
void GetAllRoles(std::vector< RolePtr > *roles) override
void Remove(uint64_t key) override
bool Add(uint64_t key, const RolePtr &role, bool ignore_if_exist=true) override
#define RETURN_IF_NULL(ptr)
#define RETURN_VAL_IF(condition, val)
#define RETURN_IF(condition)
FlowDirection
describe the flow direction between nodes As the DAG below A-—>B--—>C<--—D GetDirectionOf(A,...
std::shared_ptr< RoleBase > RolePtr
optional string node_name