17#ifndef CYBER_NODE_WRITER_H_
18#define CYBER_NODE_WRITER_H_
24#include "cyber/proto/topology_change.pb.h"
41template <
typename MessageT>
76 virtual bool Write(
const MessageT& msg);
85 virtual bool Write(
const std::shared_ptr<MessageT>& msg_ptr);
101 void GetReaders(std::vector<proto::RoleAttributes>* readers)
override;
110 void JoinTheTopology();
111 void LeaveTheTopology();
120template <
typename MessageT>
122 :
WriterBase(role_attr), transmitter_(nullptr), channel_manager_(nullptr) {}
124template <
typename MessageT>
129template <
typename MessageT>
132 std::lock_guard<std::mutex> g(lock_);
137 transport::Transport::Instance()->CreateTransmitter<MessageT>(
139 if (transmitter_ ==
nullptr) {
144 this->role_attr_.set_id(transmitter_->id().HashValue());
146 service_discovery::TopologyManager::Instance()->channel_manager();
151template <
typename MessageT>
154 std::lock_guard<std::mutex> g(lock_);
161 transmitter_ =
nullptr;
162 channel_manager_ =
nullptr;
165template <
typename MessageT>
168 auto msg_ptr = std::make_shared<MessageT>(msg);
169 return Write(msg_ptr);
172template <
typename MessageT>
175 return transmitter_->Transmit(msg_ptr);
178template <
typename MessageT>
181 change_conn_ = channel_manager_->AddChangeListener(std::bind(
185 const std::string& channel_name = this->role_attr_.channel_name();
186 std::vector<proto::RoleAttributes> readers;
187 channel_manager_->GetReadersOfChannel(channel_name, &readers);
188 for (
auto& reader : readers) {
189 transmitter_->Enable(reader);
193 message::HasSerializer<MessageT>::value);
196template <
typename MessageT>
197void Writer<MessageT>::LeaveTheTopology() {
198 channel_manager_->RemoveChangeListener(change_conn_);
202template <
typename MessageT>
203void Writer<MessageT>::OnChannelChange(
const proto::ChangeMsg& change_msg) {
208 auto& reader_attr = change_msg.role_attr();
209 if (reader_attr.channel_name() != this->role_attr_.channel_name()) {
213 auto operate_type = change_msg.operate_type();
215 transmitter_->Enable(reader_attr);
217 transmitter_->Disable(reader_attr);
221template <
typename MessageT>
224 return channel_manager_->HasReader(role_attr_.channel_name());
227template <
typename MessageT>
229 if (readers ==
nullptr) {
237 channel_manager_->GetReadersOfChannel(role_attr_.channel_name(), readers);
240template <
typename MessageT>
243 AERROR <<
"Please Acquire message after init writer!";
244 auto m = std::make_shared<MessageT>();
248 std::shared_ptr<MessageT> m(
nullptr);
249 if (transmitter_->AcquireMessage(m)) {
252 m = std::make_shared<MessageT>();
bool IsInit() const
Is Writer initialized?
std::shared_ptr< MessageT > AcquireMessage()
Acquire message instance to send
bool HasReader() override
Is there any Reader that subscribes our Channel? You can publish message when this return true
std::shared_ptr< transport::Transmitter< MessageT > > TransmitterPtr
bool Init() override
Init the Writer
Writer(const proto::RoleAttributes &role_attr)
Construct a new Writer object
void Shutdown() override
Shutdown the Writer
void GetReaders(std::vector< proto::RoleAttributes > *readers) override
Get all Readers that subscriber our writing channel
virtual bool Write(const MessageT &msg)
Write a MessageT instance
typename service_discovery::Manager::ChangeConnection ChangeConnection
base::Connection< const ChangeMsg & > ChangeConnection
#define RETURN_VAL_IF(condition, val)
std::shared_ptr< ChannelManager > ChannelManagerPtr