Cyber python版本
2023-01-03 15:291615830
我们知道cyber是apollo自动驾驶的中间件,是一个分布式的发布订阅系统,这里提供了一个轻量化的python版本,可以直接在本地安装,不需要启动容器和下载整个apollo代码仓库,就可以运行。
安装
通过pip就可以完成cyber的安装。
pip3 install pycyber
目前在以下环境中测试通过,暂时不支持windows和mac系统。
- 操作系统:ubuntu 18.04
- 硬件 : x86
- python版本:3.6
安装过程如图:
pycyber安装过程
运行
pycyber目前集成了一个简单的测试用例和4个命令,下面我们分别开始介绍。
测试用例
目前pycyber集成了一个简单的测试用例,你可以在一个终端中输入listener
,然后在打开一个终端窗口,输入talker
,就可以看到2者之间的收发消息过程了,其中listener为接收端,talker为发布端。
运行的效果如下图所示。
listener和talker进行通信
命令行
同时pycyber还自带了4个命令,来支持查看和启动节点等。
- cyber_node查看当前的节点信息
- cyber_channel查看当前的通道信息
- cyber_service查看当前的service信息
- cyber_launch启动cyber组件
以cyber_channel
为例,在上述过程中启动该命令,可以查看当前的channel信息
cyber_channel命令输出
引用
除了直接使用命令行之外,还可以通过import pycyber
来引用cyber的功能。
1. 发送和接收
通过引用pycyber创建node和reader
importsysfrompycyberimportcyberfrompycyber.examples.proto.examples_pb2importChatterdefcallback(data):"""
Reader message callback.
"""print("="*80)print("py:reader callback msg->:")print(data)print("="*80)deftest_listener_class():"""
Reader message.
"""print("="*120)test_node=cyber.Node("listener")test_node.create_reader("channel/chatter",Chatter,callback)test_node.spin()defmain(args=sys.argv):cyber.init()test_listener_class()cyber.shutdown()
通过引用pycyber创建node和writer
importsysimporttimefrompycyberimportcyberfrompycyber.examples.proto.examples_pb2importChatterdeftest_talker_class():"""
Test talker.
"""msg=Chatter()msg.content=str.encode("py:talker:send Alex!")msg.timestamp=9999msg.seq=0print(msg)test_node=cyber.Node("node_name1")g_count=1writer=test_node.create_writer("channel/chatter",Chatter,6)whilenotcyber.is_shutdown():time.sleep(1)g_count=g_count+1msg.seq=g_countmsg.content=str.encode("I am python talker.")print("="*80)print("write msg -> %s"%msg)writer.write(msg)defmain(args=sys.argv):cyber.init("talker_sample")test_talker_class()cyber.shutdown()
2. 定时器
通过引用pycyber创建定时器timer
importtimefrompycyberimportcyberfrompycyberimportcyber_timercount=0deffun():globalcountprint("cb fun is called:",count)count+=1deftest_timer():cyber.init()ct=cyber_timer.Timer(10,fun,0)# 10msct.start()time.sleep(1)# 1sct.stop()print("+"*80,"test set_option")ct2=cyber_timer.Timer()# 10msct2.set_option(10,fun,0)ct2.start()time.sleep(1)# 1sct2.stop()cyber.shutdown()if__name__=='__main__':test_timer()
更多例子可以参考:
总结
pycyber提供了cyber的python版本的功能,可以更加方便的安装和构建,帮助开发者在之上构建自己的功能。
来自专栏
Cyber入门与探索查看专栏 >
原创声明,本文由作者授权发布于Apollo开发者社区,未经许可,不得转载。
发表评论已发表 0 条评论
登录后可评论,请前往 登录
暂无评论~快去发表自己的独特见解吧!
目录
安装
运行
测试用例
命令行
引用
1. 发送和接收
2. 定时器
总结