Apollo 10.0
自动驾驶开放平台
hybrid_receiver.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_TRANSPORT_RECEIVER_HYBRID_RECEIVER_H_
18#define CYBER_TRANSPORT_RECEIVER_HYBRID_RECEIVER_H_
19
20#include <map>
21#include <memory>
22#include <set>
23#include <string>
24#include <unordered_map>
25#include <utility>
26#include <vector>
27
29#include "cyber/common/log.h"
30#include "cyber/common/types.h"
31#include "cyber/proto/role_attributes.pb.h"
33#include "cyber/task/task.h"
34#include "cyber/time/time.h"
39
40namespace apollo {
41namespace cyber {
42namespace transport {
43
47
48template <typename M>
49class HybridReceiver : public Receiver<M> {
50 public:
51 using HistoryPtr = std::shared_ptr<History<M>>;
52 using ReceiverPtr = std::shared_ptr<Receiver<M>>;
54 std::unordered_map<OptionalMode, ReceiverPtr, std::hash<int>>;
56 std::unordered_map<OptionalMode,
57 std::unordered_map<uint64_t, RoleAttributes>,
58 std::hash<int>>;
59 using CommunicationModePtr = std::shared_ptr<proto::CommunicationMode>;
61 std::unordered_map<Relation, OptionalMode, std::hash<int>>;
62
64 const typename Receiver<M>::MessageListener& msg_listener,
65 const ParticipantPtr& participant);
66 virtual ~HybridReceiver();
67
68 void Enable() override;
69 void Disable() override;
70
71 void Enable(const RoleAttributes& opposite_attr) override;
72 void Disable(const RoleAttributes& opposite_attr) override;
73
74 private:
75 void InitMode();
76 void ObtainConfig();
77 void InitHistory();
78 void InitReceivers();
79 void ClearReceivers();
80 void InitTransmitters();
81 void ClearTransmitters();
82 void ReceiveHistoryMsg(const RoleAttributes& opposite_attr);
83 void ThreadFunc(const RoleAttributes& opposite_attr);
84 Relation GetRelation(const RoleAttributes& opposite_attr);
85
86 HistoryPtr history_;
87 ReceiverContainer receivers_;
88 TransmitterContainer transmitters_;
89 std::mutex mutex_;
90
92 MappingTable mapping_table_;
93
94 ParticipantPtr participant_;
95};
96
97template <typename M>
99 const RoleAttributes& attr,
100 const typename Receiver<M>::MessageListener& msg_listener,
101 const ParticipantPtr& participant)
102 : Receiver<M>(attr, msg_listener),
103 history_(nullptr),
104 participant_(participant) {
105 InitMode();
106 ObtainConfig();
107 InitHistory();
108 InitReceivers();
109 InitTransmitters();
110}
111
112template <typename M>
114 ClearTransmitters();
115 ClearReceivers();
116}
117
118template <typename M>
120 std::lock_guard<std::mutex> lock(mutex_);
121 for (auto& item : receivers_) {
122 item.second->Enable();
123 }
124}
125
126template <typename M>
128 std::lock_guard<std::mutex> lock(mutex_);
129 for (auto& item : receivers_) {
130 item.second->Disable();
131 }
132}
133
134template <typename M>
135void HybridReceiver<M>::Enable(const RoleAttributes& opposite_attr) {
136 auto relation = GetRelation(opposite_attr);
137 RETURN_IF(relation == NO_RELATION);
138
139 uint64_t id = opposite_attr.id();
140 std::lock_guard<std::mutex> lock(mutex_);
141 if (transmitters_[mapping_table_[relation]].count(id) == 0) {
142 transmitters_[mapping_table_[relation]].insert(
143 std::make_pair(id, opposite_attr));
144 receivers_[mapping_table_[relation]]->Enable(opposite_attr);
145 ReceiveHistoryMsg(opposite_attr);
146 }
147}
148
149template <typename M>
150void HybridReceiver<M>::Disable(const RoleAttributes& opposite_attr) {
151 auto relation = GetRelation(opposite_attr);
152 RETURN_IF(relation == NO_RELATION);
153
154 uint64_t id = opposite_attr.id();
155 std::lock_guard<std::mutex> lock(mutex_);
156 if (transmitters_[mapping_table_[relation]].count(id) > 0) {
157 transmitters_[mapping_table_[relation]].erase(id);
158 receivers_[mapping_table_[relation]]->Disable(opposite_attr);
159 }
160}
161
162template <typename M>
164 mode_ = std::make_shared<proto::CommunicationMode>();
165 mapping_table_[SAME_PROC] = mode_->same_proc();
166 mapping_table_[DIFF_PROC] = mode_->diff_proc();
167 mapping_table_[DIFF_HOST] = mode_->diff_host();
168}
169
170template <typename M>
171void HybridReceiver<M>::ObtainConfig() {
172 auto& global_conf = common::GlobalData::Instance()->Config();
173 if (!global_conf.has_transport_conf()) {
174 return;
175 }
176 if (!global_conf.transport_conf().has_communication_mode()) {
177 return;
178 }
179 mode_->CopyFrom(global_conf.transport_conf().communication_mode());
180
181 mapping_table_[SAME_PROC] = mode_->same_proc();
182 mapping_table_[DIFF_PROC] = mode_->diff_proc();
183 mapping_table_[DIFF_HOST] = mode_->diff_host();
184}
185
186template <typename M>
187void HybridReceiver<M>::InitHistory() {
188 HistoryAttributes history_attr(this->attr_.qos_profile().history(),
189 this->attr_.qos_profile().depth());
190 history_ = std::make_shared<History<M>>(history_attr);
191 if (this->attr_.qos_profile().durability() ==
192 QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
193 history_->Enable();
194 }
195}
196
197template <typename M>
198void HybridReceiver<M>::InitReceivers() {
199 std::set<OptionalMode> modes;
200 modes.insert(mode_->same_proc());
201 modes.insert(mode_->diff_proc());
202 modes.insert(mode_->diff_host());
203 auto listener = std::bind(&HybridReceiver<M>::OnNewMessage, this,
204 std::placeholders::_1, std::placeholders::_2);
205 for (auto& mode : modes) {
206 switch (mode) {
207 case OptionalMode::INTRA:
208 receivers_[mode] =
209 std::make_shared<IntraReceiver<M>>(this->attr_, listener);
210 break;
211 case OptionalMode::SHM:
212 receivers_[mode] =
213 std::make_shared<ShmReceiver<M>>(this->attr_, listener);
214 break;
215 default:
216 receivers_[mode] =
217 std::make_shared<RtpsReceiver<M>>(this->attr_, listener);
218 break;
219 }
220 }
221}
222
223template <typename M>
224void HybridReceiver<M>::ClearReceivers() {
225 receivers_.clear();
226}
227
228template <typename M>
229void HybridReceiver<M>::InitTransmitters() {
230 std::unordered_map<uint64_t, RoleAttributes> empty;
231 for (auto& item : receivers_) {
232 transmitters_[item.first] = empty;
233 }
234}
235
236template <typename M>
237void HybridReceiver<M>::ClearTransmitters() {
238 for (auto& item : transmitters_) {
239 for (auto& upper_reach : item.second) {
240 receivers_[item.first]->Disable(upper_reach.second);
241 }
242 }
243 transmitters_.clear();
244}
245
246template <typename M>
247void HybridReceiver<M>::ReceiveHistoryMsg(const RoleAttributes& opposite_attr) {
248 // check qos
249 if (opposite_attr.qos_profile().durability() !=
250 QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
251 return;
252 }
253
254 auto attr = opposite_attr;
255 cyber::Async(&HybridReceiver<M>::ThreadFunc, this, attr);
256}
257
258template <typename M>
259void HybridReceiver<M>::ThreadFunc(const RoleAttributes& opposite_attr) {
260 std::string channel_name =
261 std::to_string(opposite_attr.id()) + std::to_string(this->attr_.id());
262 uint64_t channel_id = common::GlobalData::RegisterChannel(channel_name);
263
264 RoleAttributes attr(this->attr_);
265 attr.set_channel_name(channel_name);
266 attr.set_channel_id(channel_id);
267 attr.mutable_qos_profile()->CopyFrom(opposite_attr.qos_profile());
268
269 volatile bool is_msg_arrived = false;
270 auto listener = [&](const std::shared_ptr<M>& msg,
271 const MessageInfo& msg_info, const RoleAttributes& attr) {
272 is_msg_arrived = true;
273 this->OnNewMessage(msg, msg_info);
274 };
275
276 auto receiver = std::make_shared<RtpsReceiver<M>>(attr, listener);
277 receiver->Enable();
278
279 do {
280 if (is_msg_arrived) {
281 is_msg_arrived = false;
282 }
283 cyber::USleep(1000000);
284 } while (is_msg_arrived);
285
286 receiver->Disable();
287 ADEBUG << "recv threadfunc exit.";
288}
289
290template <typename M>
291Relation HybridReceiver<M>::GetRelation(const RoleAttributes& opposite_attr) {
292 if (opposite_attr.channel_name() != this->attr_.channel_name()) {
293 return NO_RELATION;
294 }
295
296 if (opposite_attr.host_ip() != this->attr_.host_ip()) {
297 return DIFF_HOST;
298 }
299
300 if (opposite_attr.process_id() != this->attr_.process_id()) {
301 return DIFF_PROC;
302 }
303
304 return SAME_PROC;
305}
306
307} // namespace transport
308} // namespace cyber
309} // namespace apollo
310
311#endif // CYBER_TRANSPORT_RECEIVER_HYBRID_RECEIVER_H_
static uint64_t RegisterChannel(const std::string &channel)
std::shared_ptr< Receiver< M > > ReceiverPtr
std::shared_ptr< History< M > > HistoryPtr
std::unordered_map< OptionalMode, ReceiverPtr, std::hash< int > > ReceiverContainer
HybridReceiver(const RoleAttributes &attr, const typename Receiver< M >::MessageListener &msg_listener, const ParticipantPtr &participant)
std::unordered_map< OptionalMode, std::unordered_map< uint64_t, RoleAttributes >, std::hash< int > > TransmitterContainer
std::unordered_map< Relation, OptionalMode, std::hash< int > > MappingTable
std::shared_ptr< proto::CommunicationMode > CommunicationModePtr
void OnNewMessage(const MessagePtr &msg, const MessageInfo &msg_info)
Definition receiver.h:61
std::function< void(const MessagePtr &, const MessageInfo &, const RoleAttributes &)> MessageListener
Definition receiver.h:36
#define ADEBUG
Definition log.h:41
#define RETURN_IF(condition)
Definition log.h:106
std::shared_ptr< Participant > ParticipantPtr
Relation
Describe relation between nodes, writers/readers...
Definition types.h:36
@ NO_RELATION
Definition types.h:37
class register implement
Definition arena_queue.h:37