17#ifndef CYBER_TRANSPORT_RECEIVER_HYBRID_RECEIVER_H_
18#define CYBER_TRANSPORT_RECEIVER_HYBRID_RECEIVER_H_
24#include <unordered_map>
31#include "cyber/proto/role_attributes.pb.h"
54 std::unordered_map<OptionalMode, ReceiverPtr, std::hash<int>>;
56 std::unordered_map<OptionalMode,
57 std::unordered_map<uint64_t, RoleAttributes>,
61 std::unordered_map<Relation, OptionalMode, std::hash<int>>;
79 void ClearReceivers();
80 void InitTransmitters();
81 void ClearTransmitters();
104 participant_(participant) {
120 std::lock_guard<std::mutex> lock(mutex_);
121 for (
auto& item : receivers_) {
122 item.second->Enable();
128 std::lock_guard<std::mutex> lock(mutex_);
129 for (
auto& item : receivers_) {
130 item.second->Disable();
136 auto relation = GetRelation(opposite_attr);
139 uint64_t
id = opposite_attr.
id();
140 std::lock_guard<std::mutex> lock(mutex_);
141 if (transmitters_[mapping_table_[relation]].count(
id) == 0) {
142 transmitters_[mapping_table_[relation]].insert(
143 std::make_pair(
id, opposite_attr));
144 receivers_[mapping_table_[relation]]->Enable(opposite_attr);
145 ReceiveHistoryMsg(opposite_attr);
151 auto relation = GetRelation(opposite_attr);
154 uint64_t
id = opposite_attr.
id();
155 std::lock_guard<std::mutex> lock(mutex_);
156 if (transmitters_[mapping_table_[relation]].count(
id) > 0) {
157 transmitters_[mapping_table_[relation]].erase(
id);
158 receivers_[mapping_table_[relation]]->Disable(opposite_attr);
164 mode_ = std::make_shared<proto::CommunicationMode>();
165 mapping_table_[
SAME_PROC] = mode_->same_proc();
166 mapping_table_[
DIFF_PROC] = mode_->diff_proc();
167 mapping_table_[
DIFF_HOST] = mode_->diff_host();
171void HybridReceiver<M>::ObtainConfig() {
172 auto& global_conf = common::GlobalData::Instance()->Config();
173 if (!global_conf.has_transport_conf()) {
176 if (!global_conf.transport_conf().has_communication_mode()) {
179 mode_->CopyFrom(global_conf.transport_conf().communication_mode());
181 mapping_table_[
SAME_PROC] = mode_->same_proc();
182 mapping_table_[
DIFF_PROC] = mode_->diff_proc();
183 mapping_table_[
DIFF_HOST] = mode_->diff_host();
187void HybridReceiver<M>::InitHistory() {
188 HistoryAttributes history_attr(this->attr_.qos_profile().history(),
189 this->attr_.qos_profile().depth());
190 history_ = std::make_shared<History<M>>(history_attr);
191 if (this->attr_.qos_profile().durability() ==
192 QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
198void HybridReceiver<M>::InitReceivers() {
199 std::set<OptionalMode> modes;
200 modes.insert(mode_->same_proc());
201 modes.insert(mode_->diff_proc());
202 modes.insert(mode_->diff_host());
204 std::placeholders::_1, std::placeholders::_2);
205 for (
auto& mode : modes) {
207 case OptionalMode::INTRA:
209 std::make_shared<IntraReceiver<M>>(this->attr_, listener);
211 case OptionalMode::SHM:
213 std::make_shared<ShmReceiver<M>>(this->attr_, listener);
217 std::make_shared<RtpsReceiver<M>>(this->attr_, listener);
224void HybridReceiver<M>::ClearReceivers() {
229void HybridReceiver<M>::InitTransmitters() {
230 std::unordered_map<uint64_t, RoleAttributes> empty;
231 for (
auto& item : receivers_) {
232 transmitters_[item.first] = empty;
237void HybridReceiver<M>::ClearTransmitters() {
238 for (
auto& item : transmitters_) {
239 for (
auto& upper_reach : item.second) {
240 receivers_[item.first]->Disable(upper_reach.second);
243 transmitters_.clear();
247void HybridReceiver<M>::ReceiveHistoryMsg(
const RoleAttributes& opposite_attr) {
249 if (opposite_attr.qos_profile().durability() !=
250 QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
254 auto attr = opposite_attr;
255 cyber::Async(&HybridReceiver<M>::ThreadFunc,
this, attr);
259void HybridReceiver<M>::ThreadFunc(
const RoleAttributes& opposite_attr) {
260 std::string channel_name =
261 std::to_string(opposite_attr.id()) + std::to_string(this->attr_.id());
264 RoleAttributes attr(this->attr_);
265 attr.set_channel_name(channel_name);
266 attr.set_channel_id(channel_id);
267 attr.mutable_qos_profile()->CopyFrom(opposite_attr.qos_profile());
269 volatile bool is_msg_arrived =
false;
270 auto listener = [&](
const std::shared_ptr<M>& msg,
271 const MessageInfo& msg_info,
const RoleAttributes& attr) {
272 is_msg_arrived =
true;
273 this->OnNewMessage(msg, msg_info);
276 auto receiver = std::make_shared<RtpsReceiver<M>>(attr, listener);
280 if (is_msg_arrived) {
281 is_msg_arrived =
false;
283 cyber::USleep(1000000);
284 }
while (is_msg_arrived);
287 ADEBUG <<
"recv threadfunc exit.";
291Relation HybridReceiver<M>::GetRelation(
const RoleAttributes& opposite_attr) {
292 if (opposite_attr.channel_name() != this->attr_.channel_name()) {
296 if (opposite_attr.host_ip() != this->attr_.host_ip()) {
300 if (opposite_attr.process_id() != this->attr_.process_id()) {
static uint64_t RegisterChannel(const std::string &channel)
std::shared_ptr< Receiver< M > > ReceiverPtr
std::shared_ptr< History< M > > HistoryPtr
std::unordered_map< OptionalMode, ReceiverPtr, std::hash< int > > ReceiverContainer
HybridReceiver(const RoleAttributes &attr, const typename Receiver< M >::MessageListener &msg_listener, const ParticipantPtr &participant)
std::unordered_map< OptionalMode, std::unordered_map< uint64_t, RoleAttributes >, std::hash< int > > TransmitterContainer
std::unordered_map< Relation, OptionalMode, std::hash< int > > MappingTable
virtual ~HybridReceiver()
std::shared_ptr< proto::CommunicationMode > CommunicationModePtr
void OnNewMessage(const MessagePtr &msg, const MessageInfo &msg_info)
std::function< void(const MessagePtr &, const MessageInfo &, const RoleAttributes &)> MessageListener
#define RETURN_IF(condition)
std::shared_ptr< Participant > ParticipantPtr
Relation
Describe relation between nodes, writers/readers...