Apollo 10.0
自动驾驶开放平台
writer.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_NODE_WRITER_H_
18#define CYBER_NODE_WRITER_H_
19
20#include <memory>
21#include <string>
22#include <vector>
23
24#include "cyber/proto/topology_change.pb.h"
25
26#include "cyber/common/log.h"
30
31namespace apollo {
32namespace cyber {
33
41template <typename MessageT>
42class Writer : public WriterBase {
43 public:
44 using TransmitterPtr = std::shared_ptr<transport::Transmitter<MessageT>>;
47
53 explicit Writer(const proto::RoleAttributes& role_attr);
54 virtual ~Writer();
55
62 bool Init() override;
63
67 void Shutdown() override;
68
76 virtual bool Write(const MessageT& msg);
77
85 virtual bool Write(const std::shared_ptr<MessageT>& msg_ptr);
86
94 bool HasReader() override;
95
101 void GetReaders(std::vector<proto::RoleAttributes>* readers) override;
102
107 std::shared_ptr<MessageT> AcquireMessage();
108
109 private:
110 void JoinTheTopology();
111 void LeaveTheTopology();
112 void OnChannelChange(const proto::ChangeMsg& change_msg);
113
114 TransmitterPtr transmitter_;
115
116 ChangeConnection change_conn_;
118};
119
120template <typename MessageT>
122 : WriterBase(role_attr), transmitter_(nullptr), channel_manager_(nullptr) {}
123
124template <typename MessageT>
126 Shutdown();
127}
128
129template <typename MessageT>
131 {
132 std::lock_guard<std::mutex> g(lock_);
133 if (init_) {
134 return true;
135 }
136 transmitter_ =
137 transport::Transport::Instance()->CreateTransmitter<MessageT>(
138 role_attr_);
139 if (transmitter_ == nullptr) {
140 return false;
141 }
142 init_ = true;
143 }
144 this->role_attr_.set_id(transmitter_->id().HashValue());
145 channel_manager_ =
146 service_discovery::TopologyManager::Instance()->channel_manager();
147 JoinTheTopology();
148 return true;
149}
150
151template <typename MessageT>
153 {
154 std::lock_guard<std::mutex> g(lock_);
155 if (!init_) {
156 return;
157 }
158 init_ = false;
159 }
160 LeaveTheTopology();
161 transmitter_ = nullptr;
162 channel_manager_ = nullptr;
163}
164
165template <typename MessageT>
166bool Writer<MessageT>::Write(const MessageT& msg) {
168 auto msg_ptr = std::make_shared<MessageT>(msg);
169 return Write(msg_ptr);
170}
171
172template <typename MessageT>
173bool Writer<MessageT>::Write(const std::shared_ptr<MessageT>& msg_ptr) {
175 return transmitter_->Transmit(msg_ptr);
176}
177
178template <typename MessageT>
180 // add listener
181 change_conn_ = channel_manager_->AddChangeListener(std::bind(
182 &Writer<MessageT>::OnChannelChange, this, std::placeholders::_1));
183
184 // get peer readers
185 const std::string& channel_name = this->role_attr_.channel_name();
186 std::vector<proto::RoleAttributes> readers;
187 channel_manager_->GetReadersOfChannel(channel_name, &readers);
188 for (auto& reader : readers) {
189 transmitter_->Enable(reader);
190 }
191
192 channel_manager_->Join(this->role_attr_, proto::RoleType::ROLE_WRITER,
193 message::HasSerializer<MessageT>::value);
194}
195
196template <typename MessageT>
197void Writer<MessageT>::LeaveTheTopology() {
198 channel_manager_->RemoveChangeListener(change_conn_);
199 channel_manager_->Leave(this->role_attr_, proto::RoleType::ROLE_WRITER);
200}
201
202template <typename MessageT>
203void Writer<MessageT>::OnChannelChange(const proto::ChangeMsg& change_msg) {
204 if (change_msg.role_type() != proto::RoleType::ROLE_READER) {
205 return;
206 }
207
208 auto& reader_attr = change_msg.role_attr();
209 if (reader_attr.channel_name() != this->role_attr_.channel_name()) {
210 return;
211 }
212
213 auto operate_type = change_msg.operate_type();
214 if (operate_type == proto::OperateType::OPT_JOIN) {
215 transmitter_->Enable(reader_attr);
216 } else {
217 transmitter_->Disable(reader_attr);
218 }
219}
220
221template <typename MessageT>
224 return channel_manager_->HasReader(role_attr_.channel_name());
225}
226
227template <typename MessageT>
228void Writer<MessageT>::GetReaders(std::vector<proto::RoleAttributes>* readers) {
229 if (readers == nullptr) {
230 return;
231 }
232
233 if (!WriterBase::IsInit()) {
234 return;
235 }
236
237 channel_manager_->GetReadersOfChannel(role_attr_.channel_name(), readers);
238}
239
240template <typename MessageT>
241std::shared_ptr<MessageT> Writer<MessageT>::AcquireMessage() {
242 if (!WriterBase::IsInit()) {
243 AERROR << "Please Acquire message after init writer!";
244 auto m = std::make_shared<MessageT>();
245 return m;
246 }
247
248 std::shared_ptr<MessageT> m(nullptr);
249 if (transmitter_->AcquireMessage(m)) {
250 return m;
251 } else {
252 m = std::make_shared<MessageT>();
253 return m;
254 }
255}
256
257} // namespace cyber
258} // namespace apollo
259
260#endif // CYBER_NODE_WRITER_H_
Base class for a Writer.
Definition writer_base.h:37
bool IsInit() const
Is Writer initialized?
Definition writer_base.h:99
std::shared_ptr< MessageT > AcquireMessage()
Acquire message instance to send
Definition writer.h:241
virtual ~Writer()
Definition writer.h:125
bool HasReader() override
Is there any Reader that subscribes our Channel? You can publish message when this return true
Definition writer.h:222
std::shared_ptr< transport::Transmitter< MessageT > > TransmitterPtr
Definition writer.h:44
bool Init() override
Init the Writer
Definition writer.h:130
Writer(const proto::RoleAttributes &role_attr)
Construct a new Writer object
Definition writer.h:121
void Shutdown() override
Shutdown the Writer
Definition writer.h:152
void GetReaders(std::vector< proto::RoleAttributes > *readers) override
Get all Readers that subscriber our writing channel
Definition writer.h:228
virtual bool Write(const MessageT &msg)
Write a MessageT instance
Definition writer.h:166
typename service_discovery::Manager::ChangeConnection ChangeConnection
Definition writer.h:46
base::Connection< const ChangeMsg & > ChangeConnection
Definition manager.h:58
#define RETURN_VAL_IF(condition, val)
Definition log.h:114
#define AERROR
Definition log.h:44
std::shared_ptr< ChannelManager > ChannelManagerPtr
class register implement
Definition arena_queue.h:37