27 for (
auto it = writers.begin(); it != writers.end(); ) {
28 if (it->second.clients.find(client) != it->second.clients.end()) {
29 ADEBUG <<
"Removing client writer";
30 it->second.clients.erase(client);
31 if (it->second.clients.empty()) {
32 ADEBUG <<
"Removing Cyber writer";
33 it = writers.erase(it);
42 std::lock_guard<std::mutex> lock(mutex);
44 for (
auto it = readers.begin(); it != readers.end(); ) {
45 if (it->second.clients.find(client) != it->second.clients.end()) {
46 ADEBUG <<
"Removing client reader";
47 it->second.clients.erase(client);
48 if (it->second.clients.empty()) {
49 ADEBUG <<
"Removing Cyber reader";
50 it = readers.erase(it);
61 std::shared_ptr<Client> client) {
62 auto rit = readers.find(channel);
63 if (rit != readers.end()) {
64 ADEBUG <<
"Adding client to existing " << channel;
65 rit->second.clients.insert(client);
69 auto cb = [
this, channel](
const std::shared_ptr<const PyMessageWrap>& msg) {
70 ADEBUG <<
"New message on " << channel;
72 const std::string& data = msg->data();
74 std::lock_guard<std::mutex> lock(mutex);
76 auto it = readers.find(channel);
77 if (it != readers.end()) {
78 for (
auto client : it->second.clients) {
79 client->publish(channel, data);
84 ADEBUG <<
"Adding new reader to " << channel;
86 reader.reader = node->CreateReader<
PyMessageWrap>(channel, cb);
87 reader.clients.insert(client);
89 std::lock_guard<std::mutex> lock(mutex);
90 readers.insert(std::make_pair(channel, reader));
94 std::shared_ptr<Client> client) {
95 auto wit = writers.find(channel);
96 if (wit != writers.end()) {
97 wit->second.clients.insert(client);
104 apollo::cyber::message::ProtobufFactory::Instance()->GetDescriptorString(
106 if (writer.desc.empty()) {
107 AWARN <<
"Cannot find proto descriptor for message type " << type;
112 role.set_channel_name(channel);
113 role.set_message_type(type);
114 role.set_proto_desc(writer.desc);
116 auto qos_profile = role.mutable_qos_profile();
117 qos_profile->set_depth(1);
119 writer.clients.insert(client);
121 writers.insert(std::make_pair(channel, writer));