Apollo 10.0
自动驾驶开放平台
data_visitor.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_DATA_DATA_VISITOR_H_
18#define CYBER_DATA_DATA_VISITOR_H_
19
20#include <algorithm>
21#include <functional>
22#include <memory>
23#include <vector>
24
25#include "cyber/common/log.h"
31
32namespace apollo {
33namespace cyber {
34namespace data {
35
37 VisitorConfig(uint64_t id, uint32_t size)
38 : channel_id(id), queue_size(size) {}
39 uint64_t channel_id;
40 uint32_t queue_size;
41};
42
43template <typename T>
45
46template <typename M0, typename M1 = NullType, typename M2 = NullType,
47 typename M3 = NullType>
49 public:
50 explicit DataVisitor(const std::vector<VisitorConfig>& configs)
51 : buffer_m0_(configs[0].channel_id,
52 new BufferType<M0>(configs[0].queue_size)),
53 buffer_m1_(configs[1].channel_id,
54 new BufferType<M1>(configs[1].queue_size)),
55 buffer_m2_(configs[2].channel_id,
56 new BufferType<M2>(configs[2].queue_size)),
57 buffer_m3_(configs[3].channel_id,
58 new BufferType<M3>(configs[3].queue_size)) {
64 data_fusion_ = new fusion::AllLatest<M0, M1, M2, M3>(
65 buffer_m0_, buffer_m1_, buffer_m2_, buffer_m3_);
66 }
67
69 if (data_fusion_) {
70 delete data_fusion_;
71 data_fusion_ = nullptr;
72 }
73 }
74
75 bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1, // NOLINT
76 std::shared_ptr<M2>& m2, std::shared_ptr<M3>& m3) { // NOLINT
77 if (data_fusion_->Fusion(&next_msg_index_, m0, m1, m2, m3)) {
79 return true;
80 }
81 return false;
82 }
83
84 private:
85 fusion::DataFusion<M0, M1, M2, M3>* data_fusion_ = nullptr;
86 ChannelBuffer<M0> buffer_m0_;
87 ChannelBuffer<M1> buffer_m1_;
88 ChannelBuffer<M2> buffer_m2_;
89 ChannelBuffer<M3> buffer_m3_;
90};
91
92template <typename M0, typename M1, typename M2>
93class DataVisitor<M0, M1, M2, NullType> : public DataVisitorBase {
94 public:
95 explicit DataVisitor(const std::vector<VisitorConfig>& configs)
96 : buffer_m0_(configs[0].channel_id,
97 new BufferType<M0>(configs[0].queue_size)),
98 buffer_m1_(configs[1].channel_id,
99 new BufferType<M1>(configs[1].queue_size)),
100 buffer_m2_(configs[2].channel_id,
101 new BufferType<M2>(configs[2].queue_size)) {
106 data_fusion_ =
107 new fusion::AllLatest<M0, M1, M2>(buffer_m0_, buffer_m1_, buffer_m2_);
108 }
109
111 if (data_fusion_) {
112 delete data_fusion_;
113 data_fusion_ = nullptr;
114 }
115 }
116
117 bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1, // NOLINT
118 std::shared_ptr<M2>& m2) { // NOLINT
119 if (data_fusion_->Fusion(&next_msg_index_, m0, m1, m2)) {
121 return true;
122 }
123 return false;
124 }
125
126 private:
127 fusion::DataFusion<M0, M1, M2>* data_fusion_ = nullptr;
128 ChannelBuffer<M0> buffer_m0_;
129 ChannelBuffer<M1> buffer_m1_;
130 ChannelBuffer<M2> buffer_m2_;
131};
132
133template <typename M0, typename M1>
135 public:
136 explicit DataVisitor(const std::vector<VisitorConfig>& configs)
137 : buffer_m0_(configs[0].channel_id,
138 new BufferType<M0>(configs[0].queue_size)),
139 buffer_m1_(configs[1].channel_id,
140 new BufferType<M1>(configs[1].queue_size)) {
144 data_fusion_ = new fusion::AllLatest<M0, M1>(buffer_m0_, buffer_m1_);
145 }
146
148 if (data_fusion_) {
149 delete data_fusion_;
150 data_fusion_ = nullptr;
151 }
152 }
153
154 bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1) { // NOLINT
155 if (data_fusion_->Fusion(&next_msg_index_, m0, m1)) {
157 return true;
158 }
159 return false;
160 }
161
162 private:
163 fusion::DataFusion<M0, M1>* data_fusion_ = nullptr;
164 ChannelBuffer<M0> buffer_m0_;
165 ChannelBuffer<M1> buffer_m1_;
166};
167
168template <typename M0>
170 public:
171 explicit DataVisitor(const VisitorConfig& configs)
172 : buffer_(configs.channel_id, new BufferType<M0>(configs.queue_size)) {
174 data_notifier_->AddNotifier(buffer_.channel_id(), notifier_);
175 }
176
177 DataVisitor(uint64_t channel_id, uint32_t queue_size)
178 : buffer_(channel_id, new BufferType<M0>(queue_size)) {
180 data_notifier_->AddNotifier(buffer_.channel_id(), notifier_);
181 }
182
183 bool TryFetch(std::shared_ptr<M0>& m0) { // NOLINT
184 if (buffer_.Fetch(&next_msg_index_, m0)) {
186 return true;
187 }
188 return false;
189 }
190
191 private:
192 ChannelBuffer<M0> buffer_;
193};
194
195} // namespace data
196} // namespace cyber
197} // namespace apollo
198
199#endif // CYBER_DATA_DATA_VISITOR_H_
void AddBuffer(const ChannelBuffer< T > &channel_buffer)
void AddNotifier(uint64_t channel_id, const std::shared_ptr< Notifier > &notifier)
std::shared_ptr< Notifier > notifier_
bool TryFetch(std::shared_ptr< M0 > &m0, std::shared_ptr< M1 > &m1, std::shared_ptr< M2 > &m2)
DataVisitor(const std::vector< VisitorConfig > &configs)
bool TryFetch(std::shared_ptr< M0 > &m0, std::shared_ptr< M1 > &m1)
DataVisitor(const std::vector< VisitorConfig > &configs)
bool TryFetch(std::shared_ptr< M0 > &m0, std::shared_ptr< M1 > &m1, std::shared_ptr< M2 > &m2, std::shared_ptr< M3 > &m3)
DataVisitor(const std::vector< VisitorConfig > &configs)
CacheBuffer< std::shared_ptr< T > > BufferType
class register implement
Definition arena_queue.h:37
VisitorConfig(uint64_t id, uint32_t size)