Apollo 10.0
自动驾驶开放平台
py_cyber.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_PYTHON_INTERNAL_PY_CYBER_H_
18#define CYBER_PYTHON_INTERNAL_PY_CYBER_H_
19#define PY_SSIZE_T_CLEAN
20
21#include <unistd.h>
22
23#include <algorithm>
24#include <deque>
25#include <iostream>
26#include <memory>
27#include <mutex>
28#include <string>
29#include <thread>
30#include <unordered_map>
31#include <utility>
32#include <vector>
33
34#include "cyber/cyber.h"
35#include "cyber/init.h"
39#include "cyber/node/node.h"
40#include "cyber/node/reader.h"
41#include "cyber/node/writer.h"
43
44namespace apollo {
45namespace cyber {
46
47inline bool py_init(const std::string& module_name) {
48 static bool inited = false;
49 if (inited) {
50 AINFO << "cyber already inited.";
51 return true;
52 }
53
54 if (!Init(module_name.c_str())) {
55 AERROR << "cyber::Init failed:" << module_name;
56 return false;
57 }
58 inited = true;
59 AINFO << "cyber init succ.";
60 return true;
61}
62
63inline bool py_ok() { return OK(); }
64
65inline void py_shutdown() { return Clear(); }
66
67inline bool py_is_shutdown() { return IsShutdown(); }
68
69inline void py_waitforshutdown() { return WaitForShutdown(); }
70
71class PyWriter {
72 public:
73 PyWriter(const std::string& channel, const std::string& type,
74 const uint32_t qos_depth, Node* node)
75 : channel_name_(channel),
76 data_type_(type),
77 qos_depth_(qos_depth),
78 node_(node) {
79 std::string proto_desc;
80 message::ProtobufFactory::Instance()->GetDescriptorString(type,
81 &proto_desc);
82 if (proto_desc.empty()) {
83 AWARN << "cpp can't find proto_desc msgtype->" << data_type_;
84 return;
85 }
86 proto::RoleAttributes role_attr;
87 role_attr.set_channel_name(channel_name_);
88 role_attr.set_message_type(data_type_);
89 role_attr.set_proto_desc(proto_desc);
90 auto qos_profile = role_attr.mutable_qos_profile();
91 qos_profile->set_depth(qos_depth_);
92 writer_ = node_->CreateWriter<message::PyMessageWrap>(role_attr);
93 }
94
95 int write(const std::string& data) {
96 auto message = std::make_shared<message::PyMessageWrap>(data, data_type_);
97 message->set_type_name(data_type_);
98 return writer_->Write(message);
99 }
100
101 private:
102 std::string channel_name_;
103 std::string data_type_;
104 uint32_t qos_depth_;
105 Node* node_ = nullptr;
106 std::shared_ptr<Writer<message::PyMessageWrap>> writer_;
107};
108
109const char RAWDATATYPE[] = "RawData";
110class PyReader {
111 public:
112 PyReader(const std::string& channel, const std::string& type, Node* node)
113 : channel_name_(channel), data_type_(type), node_(node), func_(nullptr) {
114 if (data_type_.compare(RAWDATATYPE) == 0) {
115 auto f =
116 [this](const std::shared_ptr<const message::PyMessageWrap>& request) {
117 this->cb(request);
118 };
119 reader_ = node_->CreateReader<message::PyMessageWrap>(channel, f);
120 } else {
121 auto f =
122 [this](const std::shared_ptr<const message::RawMessage>& request) {
123 this->cb_rawmsg(request);
124 };
125 reader_rawmsg_ = node_->CreateReader<message::RawMessage>(channel, f);
126 }
127 }
128
129 void register_func(int (*func)(const char*)) { func_ = func; }
130
131 std::string read(bool wait = false) {
132 std::string msg("");
133 std::unique_lock<std::mutex> ul(msg_lock_);
134 if (!cache_.empty()) {
135 msg = std::move(cache_.front());
136 cache_.pop_front();
137 }
138
139 if (!wait) {
140 return msg;
141 }
142
143 msg_cond_.wait(ul, [this] { return !this->cache_.empty(); });
144 if (!cache_.empty()) {
145 msg = std::move(cache_.front());
146 cache_.pop_front();
147 }
148
149 return msg;
150 }
151
152 private:
153 void cb(const std::shared_ptr<const message::PyMessageWrap>& message) {
154 {
155 std::lock_guard<std::mutex> lg(msg_lock_);
156 cache_.push_back(message->data());
157 }
158 if (func_) {
159 func_(channel_name_.c_str());
160 }
161 msg_cond_.notify_one();
162 }
163
164 void cb_rawmsg(const std::shared_ptr<const message::RawMessage>& message) {
165 {
166 std::lock_guard<std::mutex> lg(msg_lock_);
167 cache_.push_back(message->message);
168 }
169 if (func_) {
170 func_(channel_name_.c_str());
171 }
172 msg_cond_.notify_one();
173 }
174
175 std::string channel_name_;
176 std::string data_type_;
177 Node* node_ = nullptr;
178 int (*func_)(const char*) = nullptr;
179 std::shared_ptr<Reader<message::PyMessageWrap>> reader_ = nullptr;
180 std::deque<std::string> cache_;
181 std::mutex msg_lock_;
182 std::condition_variable msg_cond_;
183
184 std::shared_ptr<Reader<message::RawMessage>> reader_rawmsg_ = nullptr;
185};
186
187using PyMsgWrapPtr = std::shared_ptr<message::PyMessageWrap>;
189 public:
190 PyService(const std::string& service_name, const std::string& data_type,
191 Node* node)
192 : node_(node),
193 service_name_(service_name),
194 data_type_(data_type),
195 func_(nullptr) {
196 auto f = [this](
197 const std::shared_ptr<const message::PyMessageWrap>& request,
198 std::shared_ptr<message::PyMessageWrap>& response) {
199 response = this->cb(request);
200 };
201 service_ =
203 service_name, f);
204 }
205
206 void register_func(int (*func)(const char*)) { func_ = func; }
207
208 std::string read() {
209 std::string msg("");
210 if (!request_cache_.empty()) {
211 msg = std::move(request_cache_.front());
212 request_cache_.pop_front();
213 }
214 return msg;
215 }
216
217 int write(const std::string& data) {
218 response_cache_.push_back(data);
219 return SUCC;
220 }
221
222 private:
223 PyMsgWrapPtr cb(
224 const std::shared_ptr<const message::PyMessageWrap>& request) {
225 std::lock_guard<std::mutex> lg(msg_lock_);
226
227 request_cache_.push_back(request->data());
228
229 if (func_) {
230 func_(service_name_.c_str());
231 }
232
233 std::string msg("");
234 if (!response_cache_.empty()) {
235 msg = std::move(response_cache_.front());
236 response_cache_.pop_front();
237 }
238
239 PyMsgWrapPtr response;
240 response.reset(new message::PyMessageWrap(msg, data_type_));
241 return response;
242 }
243
244 Node* node_;
245 std::string service_name_;
246 std::string data_type_;
247 int (*func_)(const char*) = nullptr;
248 std::shared_ptr<Service<message::PyMessageWrap, message::PyMessageWrap>>
249 service_;
250 std::mutex msg_lock_;
251 std::deque<std::string> request_cache_;
252 std::deque<std::string> response_cache_;
253};
254
255class PyClient {
256 public:
257 PyClient(const std::string& name, const std::string& data_type, Node* node)
258 : node_(node), service_name_(name), data_type_(data_type) {
259 client_ =
261 name);
262 }
263
264 std::string send_request(std::string request) {
265 std::shared_ptr<message::PyMessageWrap> m;
266 m.reset(new message::PyMessageWrap(request, data_type_));
267
268 auto response = client_->SendRequest(m);
269 if (response == nullptr) {
270 AINFO << "SendRequest:response is null";
271 return std::string("");
272 }
273 response->ParseFromString(response->data());
274
275 return response->data();
276 }
277
278 private:
279 Node* node_;
280 std::string service_name_;
281 std::string data_type_;
282 std::shared_ptr<Client<message::PyMessageWrap, message::PyMessageWrap>>
283 client_;
284};
285
286class PyNode {
287 public:
288 explicit PyNode(const std::string& node_name) : node_name_(node_name) {
289 node_ = CreateNode(node_name);
290 }
291
292 void shutdown() {
293 node_.reset();
294 AINFO << "PyNode " << node_name_ << " exit.";
295 }
296
297 PyWriter* create_writer(const std::string& channel, const std::string& type,
298 uint32_t qos_depth = 1) {
299 if (node_) {
300 return new PyWriter(channel, type, qos_depth, node_.get());
301 }
302 AINFO << "Py_Node: node_ is null, new PyWriter failed!";
303 return nullptr;
304 }
305
306 void register_message(const std::string& desc) {
307 message::ProtobufFactory::Instance()->RegisterPythonMessage(desc);
308 }
309
310 PyReader* create_reader(const std::string& channel, const std::string& type) {
311 if (node_) {
312 return new PyReader(channel, type, node_.get());
313 }
314 return nullptr;
315 }
316
317 PyService* create_service(const std::string& service,
318 const std::string& type) {
319 if (node_) {
320 return new PyService(service, type, node_.get());
321 }
322 return nullptr;
323 }
324
325 PyClient* create_client(const std::string& service, const std::string& type) {
326 if (node_) {
327 return new PyClient(service, type, node_.get());
328 }
329 return nullptr;
330 }
331
332 std::shared_ptr<Node> get_node() { return node_; }
333
334 private:
335 std::string node_name_;
336 std::shared_ptr<Node> node_ = nullptr;
337};
338
340 public:
341 // Get debugstring of rawmsgdata
342 // Pls make sure the msg_type of rawmsg is matching
343 // Used in cyber_channel echo command
345 const std::string& msg_type, const std::string& rawmsgdata) {
346 if (msg_type.empty()) {
347 AERROR << "parse rawmessage the msg_type is null";
348 return "";
349 }
350 if (rawmsgdata.empty()) {
351 AERROR << "parse rawmessage the rawmsgdata is null";
352 return "";
353 }
354
355 if (raw_msg_class_ == nullptr) {
356 auto rawFactory = message::ProtobufFactory::Instance();
357 raw_msg_class_ = rawFactory->GenerateMessageByType(msg_type);
358 }
359
360 if (raw_msg_class_ == nullptr) {
361 AERROR << "raw_msg_class_ is null";
362 return "";
363 }
364
365 if (!raw_msg_class_->ParseFromString(rawmsgdata)) {
366 AERROR << "Cannot parse the msg [ " << msg_type << " ]";
367 return "";
368 }
369
370 return raw_msg_class_->DebugString();
371 }
372
373 static std::string get_msgtype_by_channelname(const std::string& channel_name,
374 uint8_t sleep_s = 0) {
375 if (channel_name.empty()) {
376 AERROR << "channel_name is null";
377 return "";
378 }
379 auto topology = service_discovery::TopologyManager::Instance();
380 sleep(sleep_s);
381 auto channel_manager = topology->channel_manager();
382 std::string msg_type("");
383 channel_manager->GetMsgType(channel_name, &msg_type);
384 return msg_type;
385 }
386
387 static std::vector<std::string> get_active_channels(uint8_t sleep_s = 2) {
388 auto topology = service_discovery::TopologyManager::Instance();
389 sleep(sleep_s);
390 auto channel_manager = topology->channel_manager();
391 std::vector<std::string> channels;
392 channel_manager->GetChannelNames(&channels);
393 return channels;
394 }
395
396 static std::unordered_map<std::string, std::vector<std::string>>
397 get_channels_info(uint8_t sleep_s = 2) {
398 auto topology = service_discovery::TopologyManager::Instance();
399 sleep(sleep_s);
400 std::vector<proto::RoleAttributes> tmpVec;
401 topology->channel_manager()->GetWriters(&tmpVec);
402 std::unordered_map<std::string, std::vector<std::string>> roles_info;
403
404 for (auto& attr : tmpVec) {
405 std::string channel_name = attr.channel_name();
406 std::string msgdata;
407 attr.SerializeToString(&msgdata);
408 roles_info[channel_name].emplace_back(msgdata);
409 }
410
411 tmpVec.clear();
412 topology->channel_manager()->GetReaders(&tmpVec);
413 for (auto& attr : tmpVec) {
414 std::string channel_name = attr.channel_name();
415 std::string msgdata;
416 attr.SerializeToString(&msgdata);
417 roles_info[channel_name].emplace_back(msgdata);
418 }
419 return roles_info;
420 }
421
422 private:
423 static google::protobuf::Message* raw_msg_class_;
424};
425
427 public:
428 static std::vector<std::string> get_active_nodes(uint8_t sleep_s = 2) {
429 auto topology = service_discovery::TopologyManager::Instance();
430 sleep(sleep_s);
431 std::vector<std::string> node_names;
432 std::vector<RoleAttributes> nodes;
433 topology->node_manager()->GetNodes(&nodes);
434 if (nodes.empty()) {
435 AERROR << "no node found.";
436 return node_names;
437 }
438
439 std::sort(nodes.begin(), nodes.end(),
440 [](const RoleAttributes& na, const RoleAttributes& nb) -> bool {
441 return na.node_name().compare(nb.node_name()) <= 0;
442 });
443 for (auto& node : nodes) {
444 node_names.emplace_back(node.node_name());
445 }
446 return node_names;
447 }
448
449 static std::string get_node_attr(const std::string& node_name,
450 uint8_t sleep_s = 2) {
451 auto topology = service_discovery::TopologyManager::Instance();
452 sleep(sleep_s);
453
454 if (!topology->node_manager()->HasNode(node_name)) {
455 AERROR << "no node named: " << node_name;
456 return "";
457 }
458
459 std::vector<RoleAttributes> nodes;
460 topology->node_manager()->GetNodes(&nodes);
461 std::string msgdata;
462 for (auto& node_attr : nodes) {
463 if (node_attr.node_name() == node_name) {
464 node_attr.SerializeToString(&msgdata);
465 return msgdata;
466 }
467 }
468 return "";
469 }
470
471 static std::vector<std::string> get_readersofnode(
472 const std::string& node_name, uint8_t sleep_s = 2) {
473 std::vector<std::string> reader_channels;
474 auto topology = service_discovery::TopologyManager::Instance();
475 sleep(sleep_s);
476 if (!topology->node_manager()->HasNode(node_name)) {
477 AERROR << "no node named: " << node_name;
478 return reader_channels;
479 }
480
481 std::vector<RoleAttributes> readers;
482 auto channel_mgr = topology->channel_manager();
483 channel_mgr->GetReadersOfNode(node_name, &readers);
484 for (auto& reader : readers) {
485 if (reader.channel_name() == "param_event") {
486 continue;
487 }
488 reader_channels.emplace_back(reader.channel_name());
489 }
490 return reader_channels;
491 }
492
493 static std::vector<std::string> get_writersofnode(
494 const std::string& node_name, uint8_t sleep_s = 2) {
495 std::vector<std::string> writer_channels;
496 auto topology = service_discovery::TopologyManager::Instance();
497 sleep(sleep_s);
498 if (!topology->node_manager()->HasNode(node_name)) {
499 AERROR << "no node named: " << node_name;
500 return writer_channels;
501 }
502
503 std::vector<RoleAttributes> writers;
504 auto channel_mgr = topology->channel_manager();
505 channel_mgr->GetWritersOfNode(node_name, &writers);
506 for (auto& writer : writers) {
507 if (writer.channel_name() == "param_event") {
508 continue;
509 }
510 writer_channels.emplace_back(writer.channel_name());
511 }
512 return writer_channels;
513 }
514};
515
517 public:
518 static std::vector<std::string> get_active_services(uint8_t sleep_s = 2) {
519 auto topology = service_discovery::TopologyManager::Instance();
520 sleep(sleep_s);
521 std::vector<std::string> srv_names;
522 std::vector<RoleAttributes> services;
523 topology->service_manager()->GetServers(&services);
524 if (services.empty()) {
525 AERROR << "no service found.";
526 return srv_names;
527 }
528
529 std::sort(services.begin(), services.end(),
530 [](const RoleAttributes& sa, const RoleAttributes& sb) -> bool {
531 return sa.service_name().compare(sb.service_name()) <= 0;
532 });
533 for (auto& service : services) {
534 srv_names.emplace_back(service.service_name());
535 }
536 return srv_names;
537 }
538
539 static std::string get_service_attr(const std::string& service_name,
540 uint8_t sleep_s = 2) {
541 auto topology = service_discovery::TopologyManager::Instance();
542 sleep(sleep_s);
543
544 if (!topology->service_manager()->HasService(service_name)) {
545 AERROR << "no service: " << service_name;
546 return "";
547 }
548
549 std::vector<RoleAttributes> services;
550 topology->service_manager()->GetServers(&services);
551 std::string msgdata;
552 for (auto& service_attr : services) {
553 if (service_attr.service_name() == service_name) {
554 service_attr.SerializeToString(&msgdata);
555 return msgdata;
556 }
557 }
558 return "";
559 }
560};
561
562} // namespace cyber
563} // namespace apollo
564
565#endif // CYBER_PYTHON_INTERNAL_PY_CYBER_H_
double f
Definition node.h:31
Node is the fundamental building block of Cyber RT.
Definition node.h:44
auto CreateReader(const std::string &channel_name, const CallbackFunc< MessageT > &reader_func=nullptr) -> std::shared_ptr< cyber::Reader< MessageT > >
Create a Reader with specific message type with channel name qos and other configs used will be defau...
Definition node.h:240
auto CreateClient(const std::string &service_name) -> std::shared_ptr< Client< Request, Response > >
Create a Client object to request Service with service_name
Definition node.h:267
auto CreateService(const std::string &service_name, const typename Service< Request, Response >::ServiceCallback &service_callback) -> std::shared_ptr< Service< Request, Response > >
Create a Service object with specific service_name
Definition node.h:258
auto CreateWriter(const proto::RoleAttributes &role_attr) -> std::shared_ptr< Writer< MessageT > >
Create a Writer with specific message type.
Definition node.h:192
static std::unordered_map< std::string, std::vector< std::string > > get_channels_info(uint8_t sleep_s=2)
Definition py_cyber.h:397
static std::string get_msgtype_by_channelname(const std::string &channel_name, uint8_t sleep_s=0)
Definition py_cyber.h:373
static std::string get_debugstring_by_msgtype_rawmsgdata(const std::string &msg_type, const std::string &rawmsgdata)
Definition py_cyber.h:344
static std::vector< std::string > get_active_channels(uint8_t sleep_s=2)
Definition py_cyber.h:387
std::string send_request(std::string request)
Definition py_cyber.h:264
PyClient(const std::string &name, const std::string &data_type, Node *node)
Definition py_cyber.h:257
static std::vector< std::string > get_readersofnode(const std::string &node_name, uint8_t sleep_s=2)
Definition py_cyber.h:471
static std::vector< std::string > get_writersofnode(const std::string &node_name, uint8_t sleep_s=2)
Definition py_cyber.h:493
static std::string get_node_attr(const std::string &node_name, uint8_t sleep_s=2)
Definition py_cyber.h:449
static std::vector< std::string > get_active_nodes(uint8_t sleep_s=2)
Definition py_cyber.h:428
PyNode(const std::string &node_name)
Definition py_cyber.h:288
std::shared_ptr< Node > get_node()
Definition py_cyber.h:332
PyReader * create_reader(const std::string &channel, const std::string &type)
Definition py_cyber.h:310
PyService * create_service(const std::string &service, const std::string &type)
Definition py_cyber.h:317
PyWriter * create_writer(const std::string &channel, const std::string &type, uint32_t qos_depth=1)
Definition py_cyber.h:297
PyClient * create_client(const std::string &service, const std::string &type)
Definition py_cyber.h:325
void register_message(const std::string &desc)
Definition py_cyber.h:306
std::string read(bool wait=false)
Definition py_cyber.h:131
void register_func(int(*func)(const char *))
Definition py_cyber.h:129
PyReader(const std::string &channel, const std::string &type, Node *node)
Definition py_cyber.h:112
static std::vector< std::string > get_active_services(uint8_t sleep_s=2)
Definition py_cyber.h:518
static std::string get_service_attr(const std::string &service_name, uint8_t sleep_s=2)
Definition py_cyber.h:539
void register_func(int(*func)(const char *))
Definition py_cyber.h:206
int write(const std::string &data)
Definition py_cyber.h:217
PyService(const std::string &service_name, const std::string &data_type, Node *node)
Definition py_cyber.h:190
PyWriter(const std::string &channel, const std::string &type, const uint32_t qos_depth, Node *node)
Definition py_cyber.h:73
int write(const std::string &data)
Definition py_cyber.h:95
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
#define AWARN
Definition log.h:43
bool IsShutdown()
Definition state.h:46
std::shared_ptr< message::PyMessageWrap > PyMsgWrapPtr
Definition py_cyber.h:187
const char RAWDATATYPE[]
Definition py_cyber.h:109
bool py_is_shutdown()
Definition py_cyber.h:67
void py_shutdown()
Definition py_cyber.h:65
void py_waitforshutdown()
Definition py_cyber.h:69
void WaitForShutdown()
Definition state.h:50
bool py_init(const std::string &module_name)
Definition py_cyber.h:47
void Clear()
Definition init.cc:161
bool Init(const char *binary_name, const std::string &dag_info)
Definition init.cc:98
bool OK()
Definition state.h:44
bool py_ok()
Definition py_cyber.h:63
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space)
Definition cyber.cc:33
class register implement
Definition arena_queue.h:37