Apollo 10.0
自动驾驶开放平台
node.cc
浏览该文件的文档.
1
9
10#include <utility>
11
12#include "cyber/common/log.h"
13#include "cyber/cyber.h"
15#include "cyber/node/node.h"
16#include "cyber/node/reader.h"
17#include "cyber/node/writer.h"
19
21
22Node::Node() : node(apollo::cyber::CreateNode("bridge")) {}
23
25
26void Node::remove(std::shared_ptr<Client> client) {
27 for (auto it = writers.begin(); it != writers.end(); /* empty */) {
28 if (it->second.clients.find(client) != it->second.clients.end()) {
29 ADEBUG << "Removing client writer";
30 it->second.clients.erase(client);
31 if (it->second.clients.empty()) {
32 ADEBUG << "Removing Cyber writer";
33 it = writers.erase(it);
34 } else {
35 it++;
36 }
37 } else {
38 it++;
39 }
40 }
41
42 std::lock_guard<std::mutex> lock(mutex);
43
44 for (auto it = readers.begin(); it != readers.end(); /* empty */) {
45 if (it->second.clients.find(client) != it->second.clients.end()) {
46 ADEBUG << "Removing client reader";
47 it->second.clients.erase(client);
48 if (it->second.clients.empty()) {
49 ADEBUG << "Removing Cyber reader";
50 it = readers.erase(it);
51 } else {
52 it++;
53 }
54 } else {
55 it++;
56 }
57 }
58}
59
60void Node::add_reader(const std::string& channel, const std::string& type,
61 std::shared_ptr<Client> client) {
62 auto rit = readers.find(channel);
63 if (rit != readers.end()) {
64 ADEBUG << "Adding client to existing " << channel;
65 rit->second.clients.insert(client);
66 return;
67 }
68
69 auto cb = [this, channel](const std::shared_ptr<const PyMessageWrap>& msg) {
70 ADEBUG << "New message on " << channel;
71
72 const std::string& data = msg->data();
73
74 std::lock_guard<std::mutex> lock(mutex);
75
76 auto it = readers.find(channel);
77 if (it != readers.end()) {
78 for (auto client : it->second.clients) {
79 client->publish(channel, data);
80 }
81 }
82 };
83
84 ADEBUG << "Adding new reader to " << channel;
85 Reader reader;
86 reader.reader = node->CreateReader<PyMessageWrap>(channel, cb);
87 reader.clients.insert(client);
88
89 std::lock_guard<std::mutex> lock(mutex);
90 readers.insert(std::make_pair(channel, reader));
91}
92
93void Node::add_writer(const std::string& channel, const std::string& type,
94 std::shared_ptr<Client> client) {
95 auto wit = writers.find(channel);
96 if (wit != writers.end()) {
97 wit->second.clients.insert(client);
98 return;
99 }
100
101 Writer writer;
102 writer.type = type;
103
104 apollo::cyber::message::ProtobufFactory::Instance()->GetDescriptorString(
105 type, &writer.desc);
106 if (writer.desc.empty()) {
107 AWARN << "Cannot find proto descriptor for message type " << type;
108 return;
109 }
110
112 role.set_channel_name(channel);
113 role.set_message_type(type);
114 role.set_proto_desc(writer.desc);
115
116 auto qos_profile = role.mutable_qos_profile();
117 qos_profile->set_depth(1);
118 writer.writer = node->CreateWriter<PyMessageWrap>(role);
119 writer.clients.insert(client);
120
121 writers.insert(std::make_pair(channel, writer));
122}
123
124void Node::publish(const std::string& channel, const std::string& data) {
125 auto writer = writers.find(channel);
126 if (writer == writers.end()) {
127 AWARN << "No writer registered on channel " << channel;
128 return;
129 }
130
131 auto message = std::make_shared<PyMessageWrap>(data, writer->second.type);
132 writer->second.writer->Write(message);
133}
void add_writer(const std::string &channel, const std::string &type, std::shared_ptr< Client > client)
Definition node.cc:93
void remove(std::shared_ptr< Client > client)
Definition node.cc:26
void add_reader(const std::string &channel, const std::string &type, std::shared_ptr< Client > client)
Definition node.cc:60
~Node()
Definition node.cc:24
Node()
Definition node.cc:22
void publish(const std::string &channel, const std::string &data)
Definition node.cc:124
#define ADEBUG
Definition log.h:41
#define AWARN
Definition log.h:43
class register implement
Definition arena_queue.h:37