17#ifndef CYBER_TRANSPORT_TRANSMITTER_HYBRID_TRANSMITTER_H_
18#define CYBER_TRANSPORT_TRANSMITTER_HYBRID_TRANSMITTER_H_
25#include <unordered_map>
28#include "cyber/proto/role_attributes.pb.h"
29#include "cyber/proto/transport_conf.pb.h"
57 std::unordered_map<OptionalMode, TransmitterPtr, std::hash<int>>;
59 std::unordered_map<OptionalMode, std::set<uint64_t>, std::hash<int>>;
62 std::unordered_map<Relation, OptionalMode, std::hash<int>>;
81 void InitTransmitters();
82 void ClearTransmitters();
84 void ClearReceivers();
107 participant_(participant) {
123 std::lock_guard<std::mutex> lock(mutex_);
124 for (
auto& item : transmitters_) {
125 item.second->Enable();
131 std::lock_guard<std::mutex> lock(mutex_);
132 for (
auto& item : transmitters_) {
133 item.second->Disable();
139 auto relation = GetRelation(opposite_attr);
144 uint64_t
id = opposite_attr.
id();
145 std::lock_guard<std::mutex> lock(mutex_);
146 receivers_[mapping_table_[relation]].insert(
id);
147 transmitters_[mapping_table_[relation]]->Enable(opposite_attr);
148 TransmitHistoryMsg(opposite_attr);
153 auto relation = GetRelation(opposite_attr);
158 uint64_t
id = opposite_attr.
id();
159 std::lock_guard<std::mutex> lock(mutex_);
160 receivers_[mapping_table_[relation]].erase(
id);
161 transmitters_[mapping_table_[relation]]->Disable(opposite_attr);
167 std::lock_guard<std::mutex> lock(mutex_);
168 history_->Add(msg, msg_info);
169 bool return_val =
false;
170 for (
auto& item : transmitters_) {
171 item.second->Transmit(msg, msg_info);
179 for (
auto& item : transmitters_) {
180 result = item.second->AcquireMessage(msg);
190 mode_ = std::make_shared<proto::CommunicationMode>();
191 mapping_table_[
SAME_PROC] = mode_->same_proc();
192 mapping_table_[
DIFF_PROC] = mode_->diff_proc();
193 mapping_table_[
DIFF_HOST] = mode_->diff_host();
197void HybridTransmitter<M>::ObtainConfig() {
198 auto& global_conf = common::GlobalData::Instance()->Config();
199 if (!global_conf.has_transport_conf()) {
202 if (!global_conf.transport_conf().has_communication_mode()) {
205 mode_->CopyFrom(global_conf.transport_conf().communication_mode());
207 mapping_table_[
SAME_PROC] = mode_->same_proc();
208 mapping_table_[
DIFF_PROC] = mode_->diff_proc();
209 mapping_table_[
DIFF_HOST] = mode_->diff_host();
213void HybridTransmitter<M>::InitHistory() {
214 HistoryAttributes history_attr(this->attr_.qos_profile().history(),
215 this->attr_.qos_profile().depth());
216 history_ = std::make_shared<History<M>>(history_attr);
217 if (this->attr_.qos_profile().durability() ==
218 QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
224void HybridTransmitter<M>::InitTransmitters() {
225 std::set<OptionalMode> modes;
226 modes.insert(mode_->same_proc());
227 modes.insert(mode_->diff_proc());
228 modes.insert(mode_->diff_host());
229 for (
auto& mode : modes) {
231 case OptionalMode::INTRA:
232 transmitters_[mode] =
233 std::make_shared<IntraTransmitter<M>>(this->attr_);
235 case OptionalMode::SHM:
236 transmitters_[mode] = std::make_shared<ShmTransmitter<M>>(this->attr_);
239 transmitters_[mode] =
240 std::make_shared<RtpsTransmitter<M>>(this->attr_, participant_);
247void HybridTransmitter<M>::ClearTransmitters() {
248 for (
auto& item : transmitters_) {
249 item.second->Disable();
251 transmitters_.clear();
255void HybridTransmitter<M>::InitReceivers() {
256 std::set<uint64_t> empty;
257 for (
auto& item : transmitters_) {
258 receivers_[item.first] = empty;
263void HybridTransmitter<M>::ClearReceivers() {
268void HybridTransmitter<M>::TransmitHistoryMsg(
269 const RoleAttributes& opposite_attr) {
271 if (this->attr_.qos_profile().durability() !=
272 QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
277 std::vector<typename History<M>::CachedMessage> unsent_msgs;
278 history_->GetCachedMessage(&unsent_msgs);
279 if (unsent_msgs.empty()) {
283 auto attr = opposite_attr;
284 cyber::Async(&HybridTransmitter<M>::ThreadFunc,
this, attr, unsent_msgs);
288void HybridTransmitter<M>::ThreadFunc(
289 const RoleAttributes& opposite_attr,
290 const std::vector<
typename History<M>::CachedMessage>& msgs) {
292 RoleAttributes new_attr;
293 new_attr.CopyFrom(this->attr_);
294 std::string new_channel_name =
295 std::to_string(this->attr_.id()) + std::to_string(opposite_attr.id());
297 new_attr.set_channel_name(new_channel_name);
298 new_attr.set_channel_id(channel_id);
299 auto new_transmitter =
300 std::make_shared<RtpsTransmitter<M>>(new_attr, participant_);
301 new_transmitter->Enable();
303 for (
auto& item : msgs) {
304 new_transmitter->Transmit(item.msg, item.msg_info);
307 new_transmitter->Disable();
308 ADEBUG <<
"trans threadfunc exit.";
312Relation HybridTransmitter<M>::GetRelation(
313 const RoleAttributes& opposite_attr) {
314 if (opposite_attr.channel_name() != this->attr_.channel_name()) {
317 if (opposite_attr.host_ip() != this->attr_.host_ip()) {
320 if (opposite_attr.process_id() != this->attr_.process_id()) {
static uint64_t RegisterChannel(const std::string &channel)
std::unordered_map< OptionalMode, std::set< uint64_t >, std::hash< int > > ReceiverMap
std::unordered_map< OptionalMode, TransmitterPtr, std::hash< int > > TransmitterMap
std::unordered_map< Relation, OptionalMode, std::hash< int > > MappingTable
bool Transmit(const MessagePtr &msg, const MessageInfo &msg_info) override
std::shared_ptr< Transmitter< M > > TransmitterPtr
HybridTransmitter(const RoleAttributes &attr, const ParticipantPtr &participant)
virtual ~HybridTransmitter()
std::shared_ptr< History< M > > HistoryPtr
std::shared_ptr< M > MessagePtr
std::shared_ptr< proto::CommunicationMode > CommunicationModePtr
bool AcquireMessage(std::shared_ptr< M > &msg)
std::shared_ptr< Participant > ParticipantPtr
Relation
Describe relation between nodes, writers/readers...