Apollo 10.0
自动驾驶开放平台
hybrid_transmitter.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_TRANSPORT_TRANSMITTER_HYBRID_TRANSMITTER_H_
18#define CYBER_TRANSPORT_TRANSMITTER_HYBRID_TRANSMITTER_H_
19
20#include <chrono>
21#include <map>
22#include <memory>
23#include <set>
24#include <string>
25#include <unordered_map>
26#include <vector>
27
28#include "cyber/proto/role_attributes.pb.h"
29#include "cyber/proto/transport_conf.pb.h"
31#include "cyber/common/log.h"
32#include "cyber/common/types.h"
33#include "cyber/task/task.h"
41
42namespace apollo {
43namespace cyber {
44namespace transport {
45
49
50template <typename M>
51class HybridTransmitter : public Transmitter<M> {
52 public:
53 using MessagePtr = std::shared_ptr<M>;
54 using HistoryPtr = std::shared_ptr<History<M>>;
55 using TransmitterPtr = std::shared_ptr<Transmitter<M>>;
57 std::unordered_map<OptionalMode, TransmitterPtr, std::hash<int>>;
59 std::unordered_map<OptionalMode, std::set<uint64_t>, std::hash<int>>;
60 using CommunicationModePtr = std::shared_ptr<proto::CommunicationMode>;
62 std::unordered_map<Relation, OptionalMode, std::hash<int>>;
63
65 const ParticipantPtr& participant);
66 virtual ~HybridTransmitter();
67
68 void Enable() override;
69 void Disable() override;
70 void Enable(const RoleAttributes& opposite_attr) override;
71 void Disable(const RoleAttributes& opposite_attr) override;
72
73 bool Transmit(const MessagePtr& msg, const MessageInfo& msg_info) override;
74
75 bool AcquireMessage(std::shared_ptr<M>& msg);
76
77 private:
78 void InitMode();
79 void ObtainConfig();
80 void InitHistory();
81 void InitTransmitters();
82 void ClearTransmitters();
83 void InitReceivers();
84 void ClearReceivers();
85 void TransmitHistoryMsg(const RoleAttributes& opposite_attr);
86 void ThreadFunc(const RoleAttributes& opposite_attr,
87 const std::vector<typename History<M>::CachedMessage>& msgs);
88 Relation GetRelation(const RoleAttributes& opposite_attr);
89
90 HistoryPtr history_;
91 TransmitterMap transmitters_;
92 ReceiverMap receivers_;
93 std::mutex mutex_;
94
96 MappingTable mapping_table_;
97
98 ParticipantPtr participant_;
99};
100
101template <typename M>
103 const ParticipantPtr& participant)
104 : Transmitter<M>(attr),
105 history_(nullptr),
106 mode_(nullptr),
107 participant_(participant) {
108 InitMode();
109 ObtainConfig();
110 InitHistory();
111 InitTransmitters();
112 InitReceivers();
113}
114
115template <typename M>
117 ClearReceivers();
118 ClearTransmitters();
119}
120
121template <typename M>
123 std::lock_guard<std::mutex> lock(mutex_);
124 for (auto& item : transmitters_) {
125 item.second->Enable();
126 }
127}
128
129template <typename M>
131 std::lock_guard<std::mutex> lock(mutex_);
132 for (auto& item : transmitters_) {
133 item.second->Disable();
134 }
135}
136
137template <typename M>
139 auto relation = GetRelation(opposite_attr);
140 if (relation == NO_RELATION) {
141 return;
142 }
143
144 uint64_t id = opposite_attr.id();
145 std::lock_guard<std::mutex> lock(mutex_);
146 receivers_[mapping_table_[relation]].insert(id);
147 transmitters_[mapping_table_[relation]]->Enable(opposite_attr);
148 TransmitHistoryMsg(opposite_attr);
149}
150
151template <typename M>
153 auto relation = GetRelation(opposite_attr);
154 if (relation == NO_RELATION) {
155 return;
156 }
157
158 uint64_t id = opposite_attr.id();
159 std::lock_guard<std::mutex> lock(mutex_);
160 receivers_[mapping_table_[relation]].erase(id);
161 transmitters_[mapping_table_[relation]]->Disable(opposite_attr);
162}
163
164template <typename M>
166 const MessageInfo& msg_info) {
167 std::lock_guard<std::mutex> lock(mutex_);
168 history_->Add(msg, msg_info);
169 bool return_val = false;
170 for (auto& item : transmitters_) {
171 item.second->Transmit(msg, msg_info);
172 }
173 return true;
174}
175
176template <typename M>
177bool HybridTransmitter<M>::AcquireMessage(std::shared_ptr<M>& msg) {
178 bool result = false;
179 for (auto& item : transmitters_) {
180 result = item.second->AcquireMessage(msg);
181 if (result) {
182 return true;
183 }
184 }
185 return false;
186}
187
188template <typename M>
190 mode_ = std::make_shared<proto::CommunicationMode>();
191 mapping_table_[SAME_PROC] = mode_->same_proc();
192 mapping_table_[DIFF_PROC] = mode_->diff_proc();
193 mapping_table_[DIFF_HOST] = mode_->diff_host();
194}
195
196template <typename M>
197void HybridTransmitter<M>::ObtainConfig() {
198 auto& global_conf = common::GlobalData::Instance()->Config();
199 if (!global_conf.has_transport_conf()) {
200 return;
201 }
202 if (!global_conf.transport_conf().has_communication_mode()) {
203 return;
204 }
205 mode_->CopyFrom(global_conf.transport_conf().communication_mode());
206
207 mapping_table_[SAME_PROC] = mode_->same_proc();
208 mapping_table_[DIFF_PROC] = mode_->diff_proc();
209 mapping_table_[DIFF_HOST] = mode_->diff_host();
210}
211
212template <typename M>
213void HybridTransmitter<M>::InitHistory() {
214 HistoryAttributes history_attr(this->attr_.qos_profile().history(),
215 this->attr_.qos_profile().depth());
216 history_ = std::make_shared<History<M>>(history_attr);
217 if (this->attr_.qos_profile().durability() ==
218 QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
219 history_->Enable();
220 }
221}
222
223template <typename M>
224void HybridTransmitter<M>::InitTransmitters() {
225 std::set<OptionalMode> modes;
226 modes.insert(mode_->same_proc());
227 modes.insert(mode_->diff_proc());
228 modes.insert(mode_->diff_host());
229 for (auto& mode : modes) {
230 switch (mode) {
231 case OptionalMode::INTRA:
232 transmitters_[mode] =
233 std::make_shared<IntraTransmitter<M>>(this->attr_);
234 break;
235 case OptionalMode::SHM:
236 transmitters_[mode] = std::make_shared<ShmTransmitter<M>>(this->attr_);
237 break;
238 default:
239 transmitters_[mode] =
240 std::make_shared<RtpsTransmitter<M>>(this->attr_, participant_);
241 break;
242 }
243 }
244}
245
246template <typename M>
247void HybridTransmitter<M>::ClearTransmitters() {
248 for (auto& item : transmitters_) {
249 item.second->Disable();
250 }
251 transmitters_.clear();
252}
253
254template <typename M>
255void HybridTransmitter<M>::InitReceivers() {
256 std::set<uint64_t> empty;
257 for (auto& item : transmitters_) {
258 receivers_[item.first] = empty;
259 }
260}
261
262template <typename M>
263void HybridTransmitter<M>::ClearReceivers() {
264 receivers_.clear();
265}
266
267template <typename M>
268void HybridTransmitter<M>::TransmitHistoryMsg(
269 const RoleAttributes& opposite_attr) {
270 // check qos
271 if (this->attr_.qos_profile().durability() !=
272 QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) {
273 return;
274 }
275
276 // get unsent messages
277 std::vector<typename History<M>::CachedMessage> unsent_msgs;
278 history_->GetCachedMessage(&unsent_msgs);
279 if (unsent_msgs.empty()) {
280 return;
281 }
282
283 auto attr = opposite_attr;
284 cyber::Async(&HybridTransmitter<M>::ThreadFunc, this, attr, unsent_msgs);
285}
286
287template <typename M>
288void HybridTransmitter<M>::ThreadFunc(
289 const RoleAttributes& opposite_attr,
290 const std::vector<typename History<M>::CachedMessage>& msgs) {
291 // create transmitter to transmit msgs
292 RoleAttributes new_attr;
293 new_attr.CopyFrom(this->attr_);
294 std::string new_channel_name =
295 std::to_string(this->attr_.id()) + std::to_string(opposite_attr.id());
296 uint64_t channel_id = common::GlobalData::RegisterChannel(new_channel_name);
297 new_attr.set_channel_name(new_channel_name);
298 new_attr.set_channel_id(channel_id);
299 auto new_transmitter =
300 std::make_shared<RtpsTransmitter<M>>(new_attr, participant_);
301 new_transmitter->Enable();
302
303 for (auto& item : msgs) {
304 new_transmitter->Transmit(item.msg, item.msg_info);
305 cyber::USleep(1000);
306 }
307 new_transmitter->Disable();
308 ADEBUG << "trans threadfunc exit.";
309}
310
311template <typename M>
312Relation HybridTransmitter<M>::GetRelation(
313 const RoleAttributes& opposite_attr) {
314 if (opposite_attr.channel_name() != this->attr_.channel_name()) {
315 return NO_RELATION;
316 }
317 if (opposite_attr.host_ip() != this->attr_.host_ip()) {
318 return DIFF_HOST;
319 }
320 if (opposite_attr.process_id() != this->attr_.process_id()) {
321 return DIFF_PROC;
322 }
323
324 return SAME_PROC;
325}
326
327} // namespace transport
328} // namespace cyber
329} // namespace apollo
330
331#endif // CYBER_TRANSPORT_TRANSMITTER_HYBRID_TRANSMITTER_H_
static uint64_t RegisterChannel(const std::string &channel)
std::unordered_map< OptionalMode, std::set< uint64_t >, std::hash< int > > ReceiverMap
std::unordered_map< OptionalMode, TransmitterPtr, std::hash< int > > TransmitterMap
std::unordered_map< Relation, OptionalMode, std::hash< int > > MappingTable
bool Transmit(const MessagePtr &msg, const MessageInfo &msg_info) override
std::shared_ptr< Transmitter< M > > TransmitterPtr
HybridTransmitter(const RoleAttributes &attr, const ParticipantPtr &participant)
std::shared_ptr< History< M > > HistoryPtr
std::shared_ptr< proto::CommunicationMode > CommunicationModePtr
bool AcquireMessage(std::shared_ptr< M > &msg)
#define ADEBUG
Definition log.h:41
std::shared_ptr< Participant > ParticipantPtr
Relation
Describe relation between nodes, writers/readers...
Definition types.h:36
@ NO_RELATION
Definition types.h:37
class register implement
Definition arena_queue.h:37