17#ifndef CYBER_PYTHON_INTERNAL_PY_CYBER_H_
18#define CYBER_PYTHON_INTERNAL_PY_CYBER_H_
19#define PY_SSIZE_T_CLEAN
30#include <unordered_map>
47inline bool py_init(
const std::string& module_name) {
48 static bool inited =
false;
50 AINFO <<
"cyber already inited.";
54 if (!
Init(module_name.c_str())) {
55 AERROR <<
"cyber::Init failed:" << module_name;
59 AINFO <<
"cyber init succ.";
73 PyWriter(
const std::string& channel,
const std::string& type,
74 const uint32_t qos_depth,
Node* node)
75 : channel_name_(channel),
77 qos_depth_(qos_depth),
79 std::string proto_desc;
80 message::ProtobufFactory::Instance()->GetDescriptorString(type,
82 if (proto_desc.empty()) {
83 AWARN <<
"cpp can't find proto_desc msgtype->" << data_type_;
87 role_attr.set_channel_name(channel_name_);
88 role_attr.set_message_type(data_type_);
89 role_attr.set_proto_desc(proto_desc);
90 auto qos_profile = role_attr.mutable_qos_profile();
91 qos_profile->set_depth(qos_depth_);
95 int write(
const std::string& data) {
96 auto message = std::make_shared<message::PyMessageWrap>(data, data_type_);
97 message->set_type_name(data_type_);
98 return writer_->Write(message);
102 std::string channel_name_;
103 std::string data_type_;
105 Node* node_ =
nullptr;
106 std::shared_ptr<Writer<message::PyMessageWrap>> writer_;
112 PyReader(
const std::string& channel,
const std::string& type,
Node* node)
113 : channel_name_(channel), data_type_(type), node_(node), func_(nullptr) {
116 [
this](
const std::shared_ptr<const message::PyMessageWrap>& request) {
122 [
this](
const std::shared_ptr<const message::RawMessage>& request) {
123 this->cb_rawmsg(request);
131 std::string
read(
bool wait =
false) {
133 std::unique_lock<std::mutex> ul(msg_lock_);
134 if (!cache_.empty()) {
135 msg = std::move(cache_.front());
143 msg_cond_.wait(ul, [
this] {
return !this->cache_.empty(); });
144 if (!cache_.empty()) {
145 msg = std::move(cache_.front());
153 void cb(
const std::shared_ptr<const message::PyMessageWrap>& message) {
155 std::lock_guard<std::mutex> lg(msg_lock_);
156 cache_.push_back(message->data());
159 func_(channel_name_.c_str());
161 msg_cond_.notify_one();
164 void cb_rawmsg(
const std::shared_ptr<const message::RawMessage>& message) {
166 std::lock_guard<std::mutex> lg(msg_lock_);
167 cache_.push_back(message->message);
170 func_(channel_name_.c_str());
172 msg_cond_.notify_one();
175 std::string channel_name_;
176 std::string data_type_;
177 Node* node_ =
nullptr;
178 int (*func_)(
const char*) =
nullptr;
179 std::shared_ptr<Reader<message::PyMessageWrap>> reader_ =
nullptr;
180 std::deque<std::string> cache_;
181 std::mutex msg_lock_;
182 std::condition_variable msg_cond_;
184 std::shared_ptr<Reader<message::RawMessage>> reader_rawmsg_ =
nullptr;
193 service_name_(service_name),
197 const std::shared_ptr<const message::PyMessageWrap>& request,
198 std::shared_ptr<message::PyMessageWrap>& response) {
199 response = this->cb(request);
210 if (!request_cache_.empty()) {
211 msg = std::move(request_cache_.front());
212 request_cache_.pop_front();
217 int write(
const std::string& data) {
218 response_cache_.push_back(data);
224 const std::shared_ptr<const message::PyMessageWrap>& request) {
225 std::lock_guard<std::mutex> lg(msg_lock_);
227 request_cache_.push_back(request->data());
230 func_(service_name_.c_str());
234 if (!response_cache_.empty()) {
235 msg = std::move(response_cache_.front());
236 response_cache_.pop_front();
240 response.reset(
new message::PyMessageWrap(msg, data_type_));
245 std::string service_name_;
246 std::string data_type_;
247 int (*func_)(
const char*) =
nullptr;
248 std::shared_ptr<Service<message::PyMessageWrap, message::PyMessageWrap>>
250 std::mutex msg_lock_;
251 std::deque<std::string> request_cache_;
252 std::deque<std::string> response_cache_;
258 : node_(node), service_name_(name), data_type_(
data_type) {
265 std::shared_ptr<message::PyMessageWrap> m;
268 auto response = client_->SendRequest(m);
269 if (response ==
nullptr) {
270 AINFO <<
"SendRequest:response is null";
271 return std::string(
"");
273 response->ParseFromString(response->data());
275 return response->data();
280 std::string service_name_;
281 std::string data_type_;
282 std::shared_ptr<Client<message::PyMessageWrap, message::PyMessageWrap>>
288 explicit PyNode(
const std::string& node_name) : node_name_(node_name) {
294 AINFO <<
"PyNode " << node_name_ <<
" exit.";
298 uint32_t qos_depth = 1) {
300 return new PyWriter(channel, type, qos_depth, node_.get());
302 AINFO <<
"Py_Node: node_ is null, new PyWriter failed!";
307 message::ProtobufFactory::Instance()->RegisterPythonMessage(desc);
312 return new PyReader(channel, type, node_.get());
318 const std::string& type) {
320 return new PyService(service, type, node_.get());
327 return new PyClient(service, type, node_.get());
335 std::string node_name_;
336 std::shared_ptr<Node> node_ =
nullptr;
345 const std::string& msg_type,
const std::string& rawmsgdata) {
346 if (msg_type.empty()) {
347 AERROR <<
"parse rawmessage the msg_type is null";
350 if (rawmsgdata.empty()) {
351 AERROR <<
"parse rawmessage the rawmsgdata is null";
355 if (raw_msg_class_ ==
nullptr) {
356 auto rawFactory = message::ProtobufFactory::Instance();
357 raw_msg_class_ = rawFactory->GenerateMessageByType(msg_type);
360 if (raw_msg_class_ ==
nullptr) {
361 AERROR <<
"raw_msg_class_ is null";
365 if (!raw_msg_class_->ParseFromString(rawmsgdata)) {
366 AERROR <<
"Cannot parse the msg [ " << msg_type <<
" ]";
370 return raw_msg_class_->DebugString();
374 uint8_t sleep_s = 0) {
375 if (channel_name.empty()) {
376 AERROR <<
"channel_name is null";
379 auto topology = service_discovery::TopologyManager::Instance();
381 auto channel_manager = topology->channel_manager();
382 std::string msg_type(
"");
383 channel_manager->GetMsgType(channel_name, &msg_type);
388 auto topology = service_discovery::TopologyManager::Instance();
390 auto channel_manager = topology->channel_manager();
391 std::vector<std::string> channels;
392 channel_manager->GetChannelNames(&channels);
396 static std::unordered_map<std::string, std::vector<std::string>>
398 auto topology = service_discovery::TopologyManager::Instance();
400 std::vector<proto::RoleAttributes> tmpVec;
401 topology->channel_manager()->GetWriters(&tmpVec);
402 std::unordered_map<std::string, std::vector<std::string>> roles_info;
404 for (
auto& attr : tmpVec) {
405 std::string channel_name = attr.channel_name();
407 attr.SerializeToString(&msgdata);
408 roles_info[channel_name].emplace_back(msgdata);
412 topology->channel_manager()->GetReaders(&tmpVec);
413 for (
auto& attr : tmpVec) {
414 std::string channel_name = attr.channel_name();
416 attr.SerializeToString(&msgdata);
417 roles_info[channel_name].emplace_back(msgdata);
423 static google::protobuf::Message* raw_msg_class_;
429 auto topology = service_discovery::TopologyManager::Instance();
431 std::vector<std::string> node_names;
432 std::vector<RoleAttributes> nodes;
433 topology->node_manager()->GetNodes(&nodes);
435 AERROR <<
"no node found.";
439 std::sort(nodes.begin(), nodes.end(),
441 return na.node_name().compare(nb.node_name()) <= 0;
443 for (
auto& node : nodes) {
444 node_names.emplace_back(node.node_name());
450 uint8_t sleep_s = 2) {
451 auto topology = service_discovery::TopologyManager::Instance();
454 if (!topology->node_manager()->HasNode(node_name)) {
455 AERROR <<
"no node named: " << node_name;
459 std::vector<RoleAttributes> nodes;
460 topology->node_manager()->GetNodes(&nodes);
462 for (
auto& node_attr : nodes) {
463 if (node_attr.node_name() == node_name) {
464 node_attr.SerializeToString(&msgdata);
472 const std::string& node_name, uint8_t sleep_s = 2) {
473 std::vector<std::string> reader_channels;
474 auto topology = service_discovery::TopologyManager::Instance();
476 if (!topology->node_manager()->HasNode(node_name)) {
477 AERROR <<
"no node named: " << node_name;
478 return reader_channels;
481 std::vector<RoleAttributes> readers;
482 auto channel_mgr = topology->channel_manager();
483 channel_mgr->GetReadersOfNode(node_name, &readers);
484 for (
auto& reader : readers) {
485 if (reader.channel_name() ==
"param_event") {
488 reader_channels.emplace_back(reader.channel_name());
490 return reader_channels;
494 const std::string& node_name, uint8_t sleep_s = 2) {
495 std::vector<std::string> writer_channels;
496 auto topology = service_discovery::TopologyManager::Instance();
498 if (!topology->node_manager()->HasNode(node_name)) {
499 AERROR <<
"no node named: " << node_name;
500 return writer_channels;
503 std::vector<RoleAttributes> writers;
504 auto channel_mgr = topology->channel_manager();
505 channel_mgr->GetWritersOfNode(node_name, &writers);
506 for (
auto& writer : writers) {
507 if (writer.channel_name() ==
"param_event") {
510 writer_channels.emplace_back(writer.channel_name());
512 return writer_channels;
519 auto topology = service_discovery::TopologyManager::Instance();
521 std::vector<std::string> srv_names;
522 std::vector<RoleAttributes> services;
523 topology->service_manager()->GetServers(&services);
524 if (services.empty()) {
525 AERROR <<
"no service found.";
529 std::sort(services.begin(), services.end(),
531 return sa.service_name().compare(sb.service_name()) <= 0;
533 for (
auto& service : services) {
534 srv_names.emplace_back(service.service_name());
540 uint8_t sleep_s = 2) {
541 auto topology = service_discovery::TopologyManager::Instance();
544 if (!topology->service_manager()->HasService(service_name)) {
545 AERROR <<
"no service: " << service_name;
549 std::vector<RoleAttributes> services;
550 topology->service_manager()->GetServers(&services);
552 for (
auto& service_attr : services) {
553 if (service_attr.service_name() == service_name) {
554 service_attr.SerializeToString(&msgdata);
Node is the fundamental building block of Cyber RT.
auto CreateReader(const std::string &channel_name, const CallbackFunc< MessageT > &reader_func=nullptr) -> std::shared_ptr< cyber::Reader< MessageT > >
Create a Reader with specific message type with channel name qos and other configs used will be defau...
auto CreateClient(const std::string &service_name) -> std::shared_ptr< Client< Request, Response > >
Create a Client object to request Service with service_name
auto CreateService(const std::string &service_name, const typename Service< Request, Response >::ServiceCallback &service_callback) -> std::shared_ptr< Service< Request, Response > >
Create a Service object with specific service_name
auto CreateWriter(const proto::RoleAttributes &role_attr) -> std::shared_ptr< Writer< MessageT > >
Create a Writer with specific message type.
static std::unordered_map< std::string, std::vector< std::string > > get_channels_info(uint8_t sleep_s=2)
static std::string get_msgtype_by_channelname(const std::string &channel_name, uint8_t sleep_s=0)
static std::string get_debugstring_by_msgtype_rawmsgdata(const std::string &msg_type, const std::string &rawmsgdata)
static std::vector< std::string > get_active_channels(uint8_t sleep_s=2)
std::string send_request(std::string request)
PyClient(const std::string &name, const std::string &data_type, Node *node)
static std::vector< std::string > get_readersofnode(const std::string &node_name, uint8_t sleep_s=2)
static std::vector< std::string > get_writersofnode(const std::string &node_name, uint8_t sleep_s=2)
static std::string get_node_attr(const std::string &node_name, uint8_t sleep_s=2)
static std::vector< std::string > get_active_nodes(uint8_t sleep_s=2)
PyNode(const std::string &node_name)
std::shared_ptr< Node > get_node()
PyReader * create_reader(const std::string &channel, const std::string &type)
PyService * create_service(const std::string &service, const std::string &type)
PyWriter * create_writer(const std::string &channel, const std::string &type, uint32_t qos_depth=1)
PyClient * create_client(const std::string &service, const std::string &type)
void register_message(const std::string &desc)
std::string read(bool wait=false)
void register_func(int(*func)(const char *))
PyReader(const std::string &channel, const std::string &type, Node *node)
static std::vector< std::string > get_active_services(uint8_t sleep_s=2)
static std::string get_service_attr(const std::string &service_name, uint8_t sleep_s=2)
void register_func(int(*func)(const char *))
int write(const std::string &data)
PyService(const std::string &service_name, const std::string &data_type, Node *node)
PyWriter(const std::string &channel, const std::string &type, const uint32_t qos_depth, Node *node)
int write(const std::string &data)
std::shared_ptr< message::PyMessageWrap > PyMsgWrapPtr
void py_waitforshutdown()
bool py_init(const std::string &module_name)
bool Init(const char *binary_name, const std::string &dag_info)
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space)