17#ifndef CYBER_NODE_NODE_CHANNEL_IMPL_H_
18#define CYBER_NODE_NODE_CHANNEL_IMPL_H_
23#include "cyber/proto/run_mode_conf.pb.h"
79 : is_reality_mode_(true), node_name_(node_name) {
80 node_attr_.set_host_name(common::GlobalData::Instance()->HostName());
81 node_attr_.set_host_ip(common::GlobalData::Instance()->HostIp());
82 node_attr_.set_process_id(common::GlobalData::Instance()->ProcessId());
83 node_attr_.set_node_name(node_name);
85 node_attr_.set_node_id(node_id);
87 is_reality_mode_ = common::GlobalData::Instance()->IsRealityMode();
89 if (is_reality_mode_) {
91 service_discovery::TopologyManager::Instance()->node_manager();
92 node_manager_->Join(node_attr_, RoleType::ROLE_NODE);
100 if (is_reality_mode_) {
101 node_manager_->Leave(node_attr_, RoleType::ROLE_NODE);
102 node_manager_ =
nullptr;
111 const std::string&
NodeName()
const {
return node_name_; }
114 template <
typename MessageT>
116 -> std::shared_ptr<Writer<MessageT>>;
118 template <
typename MessageT>
119 auto CreateWriter(
const std::string& channel_name)
120 -> std::shared_ptr<Writer<MessageT>>;
122 template <
typename MessageT>
123 auto CreateReader(
const std::string& channel_name,
125 -> std::shared_ptr<Reader<MessageT>>;
127 template <
typename MessageT>
130 -> std::shared_ptr<Reader<MessageT>>;
132 template <
typename MessageT>
136 -> std::shared_ptr<Reader<MessageT>>;
138 template <
typename MessageT>
140 -> std::shared_ptr<Reader<MessageT>>;
142 template <
typename MessageT>
145 bool is_reality_mode_;
146 std::string node_name_;
151template <
typename MessageT>
152auto NodeChannelImpl::CreateWriter(
const proto::RoleAttributes& role_attr)
153 -> std::shared_ptr<Writer<MessageT>> {
154 if (!role_attr.has_channel_name() || role_attr.channel_name().empty()) {
155 AERROR <<
"Can't create a writer with empty channel name!";
158 proto::RoleAttributes new_attr(role_attr);
159 FillInAttr<MessageT>(&new_attr);
161 std::shared_ptr<Writer<MessageT>> writer_ptr =
nullptr;
162 if (!is_reality_mode_) {
163 writer_ptr = std::make_shared<blocker::IntraWriter<MessageT>>(new_attr);
165 writer_ptr = std::make_shared<Writer<MessageT>>(new_attr);
173template <
typename MessageT>
174auto NodeChannelImpl::CreateWriter(
const std::string& channel_name)
175 -> std::shared_ptr<Writer<MessageT>> {
176 proto::RoleAttributes role_attr;
177 role_attr.set_channel_name(channel_name);
178 return this->CreateWriter<MessageT>(role_attr);
181template <
typename MessageT>
182auto NodeChannelImpl::CreateReader(
const std::string& channel_name,
183 const CallbackFunc<MessageT>& reader_func)
184 -> std::shared_ptr<Reader<MessageT>> {
185 proto::RoleAttributes role_attr;
186 role_attr.set_channel_name(channel_name);
187 return this->
template CreateReader<MessageT>(role_attr, reader_func);
190template <
typename MessageT>
191auto NodeChannelImpl::CreateReader(
const ReaderConfig& config,
192 const CallbackFunc<MessageT>& reader_func)
193 -> std::shared_ptr<Reader<MessageT>> {
194 proto::RoleAttributes role_attr;
195 role_attr.set_channel_name(config.channel_name);
196 role_attr.mutable_qos_profile()->CopyFrom(config.qos_profile);
197 return this->
template CreateReader<MessageT>(role_attr, reader_func,
198 config.pending_queue_size);
201template <
typename MessageT>
202auto NodeChannelImpl::CreateReader(
const proto::RoleAttributes& role_attr,
203 const CallbackFunc<MessageT>& reader_func,
204 uint32_t pending_queue_size)
205 -> std::shared_ptr<Reader<MessageT>> {
206 if (!role_attr.has_channel_name() || role_attr.channel_name().empty()) {
207 AERROR <<
"Can't create a reader with empty channel name!";
211 proto::RoleAttributes new_attr(role_attr);
212 FillInAttr<MessageT>(&new_attr);
214 std::shared_ptr<Reader<MessageT>> reader_ptr =
nullptr;
215 if (!is_reality_mode_) {
217 std::make_shared<blocker::IntraReader<MessageT>>(new_attr, reader_func);
219 reader_ptr = std::make_shared<Reader<MessageT>>(new_attr, reader_func,
228template <
typename MessageT>
229auto NodeChannelImpl::CreateReader(
const proto::RoleAttributes& role_attr)
230 -> std::shared_ptr<Reader<MessageT>> {
231 return this->
template CreateReader<MessageT>(role_attr,
nullptr);
234template <
typename MessageT>
235void NodeChannelImpl::FillInAttr(proto::RoleAttributes* attr) {
236 attr->set_host_name(node_attr_.
host_name());
237 attr->set_host_ip(node_attr_.
host_ip());
238 attr->set_process_id(node_attr_.
process_id());
239 attr->set_node_name(node_attr_.
node_name());
240 attr->set_node_id(node_attr_.
node_id());
242 attr->set_channel_id(channel_id);
243 if (!attr->has_message_type()) {
244 attr->set_message_type(message::MessageType<MessageT>());
246 if (!attr->has_proto_desc()) {
247 std::string proto_desc(
"");
248 message::GetDescriptorString<MessageT>(attr->message_type(), &proto_desc);
249 attr->set_proto_desc(proto_desc);
251 if (!attr->has_qos_profile()) {
252 attr->mutable_qos_profile()->CopyFrom(
The implementation for Node to create Objects connected by Channels.
const std::string & NodeName() const
get name of this node
std::shared_ptr< service_discovery::NodeManager > NodeManagerPtr
NodeChannelImpl(const std::string &node_name)
Construct a new Node Channel Impl object
virtual ~NodeChannelImpl()
Destroy the Node Channel Impl object
Node is the fundamental building block of Cyber RT.
static uint64_t RegisterNode(const std::string &node_name)
static uint64_t RegisterChannel(const std::string &channel)
static const QosProfile QOS_PROFILE_DEFAULT
#define RETURN_VAL_IF_NULL(ptr, val)
#define RETURN_VAL_IF(condition, val)
const uint32_t DEFAULT_PENDING_QUEUE_SIZE
std::function< void(const std::shared_ptr< M0 > &)> CallbackFunc
proto::QosProfile qos_profile
ReaderConfig(const ReaderConfig &other)
ReaderConfig()
< configurations for a Reader
uint32_t pending_queue_size
configuration for responding ChannelBuffer.
optional string node_name
optional int32 process_id
optional string host_name