Apollo 11.0
自动驾驶开放平台
msg_buffer.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16#pragma once
17
18#include <limits>
19#include <memory>
20#include <string>
21#include <utility>
22#include <vector>
23
24#include <boost/circular_buffer.hpp>
25#include "gflags/gflags.h"
26
27#include "cyber/cyber.h"
28
29namespace apollo {
30namespace perception {
31namespace onboard {
32
33DECLARE_int32(obs_msg_buffer_size);
34DECLARE_double(obs_buffer_match_precision);
35
36template <class T>
37class MsgBuffer {
38 public:
39 typedef std::shared_ptr<T const> ConstPtr;
40 typedef std::pair<double, ConstPtr> ObjectPair;
41
42 public:
43 MsgBuffer() : buffer_queue_(FLAGS_obs_msg_buffer_size) {}
44 ~MsgBuffer() = default;
45
46 MsgBuffer(const MsgBuffer&) = delete;
47 MsgBuffer operator=(const MsgBuffer&) = delete;
48
49 void Init(const std::string& channel, const std::string& name);
50
51 // get nearest message
52 int LookupNearest(double timestamp, ConstPtr* msg);
53 // get latest message
55 // get messages in (timestamp-period, timestamp+period)
56 int LookupPeriod(double timestamp, double period,
57 std::vector<ObjectPair>* msgs);
58
59 private:
60 void MsgCallback(const ConstPtr& msg);
61
62 private:
63 std::string node_name_;
64 std::unique_ptr<cyber::Node> node_;
65 std::shared_ptr<cyber::Reader<T>> msg_subscriber_;
66 std::mutex buffer_mutex_;
67
68 bool init_ = false;
69 boost::circular_buffer<ObjectPair> buffer_queue_;
70};
71
72template <class T>
73void MsgBuffer<T>::Init(const std::string& channel, const std::string& name) {
74 int index = static_cast<int>(name.find_last_of('/'));
75 if (index != -1) {
76 node_name_ = name.substr(index + 1) + "_subscriber";
77 } else {
78 node_name_ = name + "_subscriber";
79 }
80 node_.reset(apollo::cyber::CreateNode(node_name_).release());
81
82 std::function<void(const ConstPtr&)> register_call =
83 std::bind(&MsgBuffer<T>::MsgCallback, this, std::placeholders::_1);
84 msg_subscriber_ = node_->CreateReader<T>(channel, register_call);
85
86 std::lock_guard<std::mutex> lock(buffer_mutex_);
87 buffer_queue_.set_capacity(FLAGS_obs_msg_buffer_size);
88 init_ = true;
89}
90
91template <class T>
92void MsgBuffer<T>::MsgCallback(const ConstPtr& msg) {
93 std::lock_guard<std::mutex> lock(buffer_mutex_);
94 double timestamp = msg->measurement_time();
95 buffer_queue_.push_back(std::make_pair(timestamp, msg));
96}
97
98template <class T>
99int MsgBuffer<T>::LookupNearest(double timestamp, ConstPtr* msg) {
100 std::lock_guard<std::mutex> lock(buffer_mutex_);
101 if (!init_) {
102 AERROR << "msg buffer is uninitialized.";
103 return false;
104 }
105 if (buffer_queue_.empty()) {
106 AERROR << "msg buffer is empty.";
107 return false;
108 }
109 if (buffer_queue_.front().first - FLAGS_obs_buffer_match_precision >
110 timestamp) {
111 AERROR << "Your timestamp (" << timestamp
112 << ") is earlier than the oldest timestamp ("
113 << buffer_queue_.front().first << ").";
114 return false;
115 }
116 if (buffer_queue_.back().first + FLAGS_obs_buffer_match_precision <
117 timestamp) {
118 AERROR << "Your timestamp (" << timestamp
119 << ") is newer than the latest timestamp ("
120 << buffer_queue_.back().first << ").";
121 return false;
122 }
123
124 // loop to find nearest
125 double distance = std::numeric_limits<double>::max();
126 int idx = static_cast<int>(buffer_queue_.size()) - 1;
127 for (; idx >= 0; --idx) {
128 double temp_distance = fabs(timestamp - buffer_queue_[idx].first);
129 if (temp_distance >= distance) {
130 break;
131 }
132 distance = temp_distance;
133 }
134 *msg = buffer_queue_[idx + 1].second;
135
136 return true;
137}
138
139template <class T>
141 std::lock_guard<std::mutex> lock(buffer_mutex_);
142 if (!init_) {
143 AERROR << "Message buffer is uninitialized.";
144 return false;
145 }
146 if (buffer_queue_.empty()) {
147 AERROR << "Message buffer is empty.";
148 return false;
149 }
150 *msg = buffer_queue_.back().second;
151 return true;
152}
153
154template <class T>
155int MsgBuffer<T>::LookupPeriod(const double timestamp, const double period,
156 std::vector<ObjectPair>* msgs) {
157 std::lock_guard<std::mutex> lock(buffer_mutex_);
158 if (!init_) {
159 AERROR << "Message buffer is uninitialized.";
160 return false;
161 }
162 if (buffer_queue_.empty()) {
163 AERROR << "Message buffer is empty.";
164 return false;
165 }
166 if (buffer_queue_.front().first - FLAGS_obs_buffer_match_precision >
167 timestamp) {
168 AERROR << "Your timestamp (" << timestamp << ") is earlier than the oldest "
169 << "timestamp (" << buffer_queue_.front().first << ").";
170 return false;
171 }
172 if (buffer_queue_.back().first + FLAGS_obs_buffer_match_precision <
173 timestamp) {
174 AERROR << "Your timestamp (" << timestamp << ") is newer than the latest "
175 << "timestamp (" << buffer_queue_.back().first << ").";
176 return false;
177 }
178
179 const double lower_timestamp = timestamp - period;
180 const double upper_timestamp = timestamp + period;
181 for (const auto& obj_pair : buffer_queue_) {
182 if (obj_pair.first < lower_timestamp) {
183 continue;
184 }
185 if (obj_pair.first > upper_timestamp) {
186 break;
187 }
188 msgs->push_back(obj_pair);
189 }
190
191 return true;
192}
193
194} // namespace onboard
195} // namespace perception
196} // namespace apollo
MsgBuffer operator=(const MsgBuffer &)=delete
std::pair< double, ConstPtr > ObjectPair
Definition msg_buffer.h:40
int LookupPeriod(double timestamp, double period, std::vector< ObjectPair > *msgs)
Definition msg_buffer.h:155
int LookupNearest(double timestamp, ConstPtr *msg)
Definition msg_buffer.h:99
void Init(const std::string &channel, const std::string &name)
Definition msg_buffer.h:73
MsgBuffer(const MsgBuffer &)=delete
std::shared_ptr< T const > ConstPtr
Definition msg_buffer.h:39
#define AERROR
Definition log.h:44
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space)
Definition cyber.cc:33
DECLARE_double(obs_buffer_match_precision)
DECLARE_int32(obs_msg_buffer_size)
class register implement
Definition arena_queue.h:37