Apollo 10.0
自动驾驶开放平台
node_channel_impl.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_NODE_NODE_CHANNEL_IMPL_H_
18#define CYBER_NODE_NODE_CHANNEL_IMPL_H_
19
20#include <memory>
21#include <string>
22
23#include "cyber/proto/run_mode_conf.pb.h"
24
29#include "cyber/node/reader.h"
30#include "cyber/node/writer.h"
31
32namespace apollo {
33namespace cyber {
34
35class Node;
36
61
68 friend class Node;
69
70 public:
71 using NodeManagerPtr = std::shared_ptr<service_discovery::NodeManager>;
72
78 explicit NodeChannelImpl(const std::string& node_name)
79 : is_reality_mode_(true), node_name_(node_name) {
80 node_attr_.set_host_name(common::GlobalData::Instance()->HostName());
81 node_attr_.set_host_ip(common::GlobalData::Instance()->HostIp());
82 node_attr_.set_process_id(common::GlobalData::Instance()->ProcessId());
83 node_attr_.set_node_name(node_name);
84 uint64_t node_id = common::GlobalData::RegisterNode(node_name);
85 node_attr_.set_node_id(node_id);
86
87 is_reality_mode_ = common::GlobalData::Instance()->IsRealityMode();
88
89 if (is_reality_mode_) {
90 node_manager_ =
91 service_discovery::TopologyManager::Instance()->node_manager();
92 node_manager_->Join(node_attr_, RoleType::ROLE_NODE);
93 }
94 }
95
99 virtual ~NodeChannelImpl() {
100 if (is_reality_mode_) {
101 node_manager_->Leave(node_attr_, RoleType::ROLE_NODE);
102 node_manager_ = nullptr;
103 }
104 }
105
111 const std::string& NodeName() const { return node_name_; }
112
113 private:
114 template <typename MessageT>
115 auto CreateWriter(const proto::RoleAttributes& role_attr)
116 -> std::shared_ptr<Writer<MessageT>>;
117
118 template <typename MessageT>
119 auto CreateWriter(const std::string& channel_name)
120 -> std::shared_ptr<Writer<MessageT>>;
121
122 template <typename MessageT>
123 auto CreateReader(const std::string& channel_name,
124 const CallbackFunc<MessageT>& reader_func)
125 -> std::shared_ptr<Reader<MessageT>>;
126
127 template <typename MessageT>
128 auto CreateReader(const ReaderConfig& config,
129 const CallbackFunc<MessageT>& reader_func)
130 -> std::shared_ptr<Reader<MessageT>>;
131
132 template <typename MessageT>
133 auto CreateReader(const proto::RoleAttributes& role_attr,
134 const CallbackFunc<MessageT>& reader_func,
135 uint32_t pending_queue_size = DEFAULT_PENDING_QUEUE_SIZE)
136 -> std::shared_ptr<Reader<MessageT>>;
137
138 template <typename MessageT>
139 auto CreateReader(const proto::RoleAttributes& role_attr)
140 -> std::shared_ptr<Reader<MessageT>>;
141
142 template <typename MessageT>
143 void FillInAttr(proto::RoleAttributes* attr);
144
145 bool is_reality_mode_;
146 std::string node_name_;
147 proto::RoleAttributes node_attr_;
148 NodeManagerPtr node_manager_ = nullptr;
149};
150
151template <typename MessageT>
152auto NodeChannelImpl::CreateWriter(const proto::RoleAttributes& role_attr)
153 -> std::shared_ptr<Writer<MessageT>> {
154 if (!role_attr.has_channel_name() || role_attr.channel_name().empty()) {
155 AERROR << "Can't create a writer with empty channel name!";
156 return nullptr;
157 }
158 proto::RoleAttributes new_attr(role_attr);
159 FillInAttr<MessageT>(&new_attr);
160
161 std::shared_ptr<Writer<MessageT>> writer_ptr = nullptr;
162 if (!is_reality_mode_) {
163 writer_ptr = std::make_shared<blocker::IntraWriter<MessageT>>(new_attr);
164 } else {
165 writer_ptr = std::make_shared<Writer<MessageT>>(new_attr);
166 }
167
168 RETURN_VAL_IF_NULL(writer_ptr, nullptr);
169 RETURN_VAL_IF(!writer_ptr->Init(), nullptr);
170 return writer_ptr;
171}
172
173template <typename MessageT>
174auto NodeChannelImpl::CreateWriter(const std::string& channel_name)
175 -> std::shared_ptr<Writer<MessageT>> {
176 proto::RoleAttributes role_attr;
177 role_attr.set_channel_name(channel_name);
178 return this->CreateWriter<MessageT>(role_attr);
179}
180
181template <typename MessageT>
182auto NodeChannelImpl::CreateReader(const std::string& channel_name,
183 const CallbackFunc<MessageT>& reader_func)
184 -> std::shared_ptr<Reader<MessageT>> {
185 proto::RoleAttributes role_attr;
186 role_attr.set_channel_name(channel_name);
187 return this->template CreateReader<MessageT>(role_attr, reader_func);
188}
189
190template <typename MessageT>
191auto NodeChannelImpl::CreateReader(const ReaderConfig& config,
192 const CallbackFunc<MessageT>& reader_func)
193 -> std::shared_ptr<Reader<MessageT>> {
194 proto::RoleAttributes role_attr;
195 role_attr.set_channel_name(config.channel_name);
196 role_attr.mutable_qos_profile()->CopyFrom(config.qos_profile);
197 return this->template CreateReader<MessageT>(role_attr, reader_func,
198 config.pending_queue_size);
199}
200
201template <typename MessageT>
202auto NodeChannelImpl::CreateReader(const proto::RoleAttributes& role_attr,
203 const CallbackFunc<MessageT>& reader_func,
204 uint32_t pending_queue_size)
205 -> std::shared_ptr<Reader<MessageT>> {
206 if (!role_attr.has_channel_name() || role_attr.channel_name().empty()) {
207 AERROR << "Can't create a reader with empty channel name!";
208 return nullptr;
209 }
210
211 proto::RoleAttributes new_attr(role_attr);
212 FillInAttr<MessageT>(&new_attr);
213
214 std::shared_ptr<Reader<MessageT>> reader_ptr = nullptr;
215 if (!is_reality_mode_) {
216 reader_ptr =
217 std::make_shared<blocker::IntraReader<MessageT>>(new_attr, reader_func);
218 } else {
219 reader_ptr = std::make_shared<Reader<MessageT>>(new_attr, reader_func,
220 pending_queue_size);
221 }
222
223 RETURN_VAL_IF_NULL(reader_ptr, nullptr);
224 RETURN_VAL_IF(!reader_ptr->Init(), nullptr);
225 return reader_ptr;
226}
227
228template <typename MessageT>
229auto NodeChannelImpl::CreateReader(const proto::RoleAttributes& role_attr)
230 -> std::shared_ptr<Reader<MessageT>> {
231 return this->template CreateReader<MessageT>(role_attr, nullptr);
232}
233
234template <typename MessageT>
235void NodeChannelImpl::FillInAttr(proto::RoleAttributes* attr) {
236 attr->set_host_name(node_attr_.host_name());
237 attr->set_host_ip(node_attr_.host_ip());
238 attr->set_process_id(node_attr_.process_id());
239 attr->set_node_name(node_attr_.node_name());
240 attr->set_node_id(node_attr_.node_id());
241 auto channel_id = GlobalData::RegisterChannel(attr->channel_name());
242 attr->set_channel_id(channel_id);
243 if (!attr->has_message_type()) {
244 attr->set_message_type(message::MessageType<MessageT>());
245 }
246 if (!attr->has_proto_desc()) {
247 std::string proto_desc("");
248 message::GetDescriptorString<MessageT>(attr->message_type(), &proto_desc);
249 attr->set_proto_desc(proto_desc);
250 }
251 if (!attr->has_qos_profile()) {
252 attr->mutable_qos_profile()->CopyFrom(
254 }
255}
256
257} // namespace cyber
258} // namespace apollo
259
260#endif // CYBER_NODE_NODE_CHANNEL_IMPL_H_
Definition node.h:31
The implementation for Node to create Objects connected by Channels.
const std::string & NodeName() const
get name of this node
std::shared_ptr< service_discovery::NodeManager > NodeManagerPtr
NodeChannelImpl(const std::string &node_name)
Construct a new Node Channel Impl object
virtual ~NodeChannelImpl()
Destroy the Node Channel Impl object
Node is the fundamental building block of Cyber RT.
Definition node.h:44
static uint64_t RegisterNode(const std::string &node_name)
static uint64_t RegisterChannel(const std::string &channel)
#define RETURN_VAL_IF_NULL(ptr, val)
Definition log.h:98
#define RETURN_VAL_IF(condition, val)
Definition log.h:114
#define AERROR
Definition log.h:44
const uint32_t DEFAULT_PENDING_QUEUE_SIZE
Definition reader.h:50
std::function< void(const std::shared_ptr< M0 > &)> CallbackFunc
Definition reader.h:46
class register implement
Definition arena_queue.h:37
ReaderConfig(const ReaderConfig &other)
ReaderConfig()
< configurations for a Reader
uint32_t pending_queue_size
configuration for responding ChannelBuffer.