Apollo 10.0
自动驾驶开放平台
record_viewer.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <algorithm>
20#include <limits>
21#include <utility>
22
23#include "cyber/common/log.h"
24
25namespace apollo {
26namespace cyber {
27namespace record {
28
29RecordViewer::RecordViewer(const RecordReaderPtr& reader, uint64_t begin_time,
30 uint64_t end_time,
31 const std::set<std::string>& channels)
32 : begin_time_(begin_time),
33 end_time_(end_time),
34 channels_(channels),
35 readers_({reader}) {
36 Init();
37 UpdateTime();
38}
39
40RecordViewer::RecordViewer(const std::vector<RecordReaderPtr>& readers,
41 uint64_t begin_time, uint64_t end_time,
42 const std::set<std::string>& channels)
43 : begin_time_(begin_time),
44 end_time_(end_time),
45 channels_(channels),
46 readers_(readers) {
47 Init();
48 UpdateTime();
49}
50
52 if (begin_time_ > end_time_) {
53 AERROR << "Begin time must be earlier than end time"
54 << ", begin_time=" << begin_time_ << ", end_time=" << end_time_;
55 return false;
56 }
57 return true;
58}
59
60bool RecordViewer::Update(RecordMessage* message) {
61 bool find = false;
62 do {
63 if (msg_buffer_.empty() && !FillBuffer()) {
64 break;
65 }
66 auto& msg = msg_buffer_.begin()->second;
67 if (channels_.empty() || channels_.count(msg->channel_name) == 1) {
68 *message = *msg;
69 find = true;
70 }
71 msg_buffer_.erase(msg_buffer_.begin());
72 } while (!find);
73
74 return find;
75}
76
78
80
82
83void RecordViewer::set_curr_itr(const Iterator& curr_itr) { itr_ = curr_itr; }
84
85void RecordViewer::Init() {
86 // Init the channel list
87 for (auto& reader : readers_) {
88 auto all_channel = reader->GetChannelList();
89 std::set_intersection(all_channel.begin(), all_channel.end(),
90 channels_.begin(), channels_.end(),
91 std::inserter(channel_list_, channel_list_.end()));
92 }
93 readers_finished_.resize(readers_.size(), false);
94
95 // Sort the readers
96 std::sort(readers_.begin(), readers_.end(),
97 [](const RecordReaderPtr& lhs, const RecordReaderPtr& rhs) {
98 const auto& lhs_header = lhs->GetHeader();
99 const auto& rhs_header = rhs->GetHeader();
100 if (lhs_header.begin_time() == rhs_header.begin_time()) {
101 return lhs_header.end_time() < rhs_header.end_time();
102 }
103 return lhs_header.begin_time() < rhs_header.begin_time();
104 });
105}
106
107void RecordViewer::Reset() {
108 for (auto& reader : readers_) {
109 reader->Reset();
110 }
111 std::fill(readers_finished_.begin(), readers_finished_.end(), false);
112 curr_begin_time_ = begin_time_;
113 msg_buffer_.clear();
114}
115
116void RecordViewer::UpdateTime() {
117 uint64_t min_begin_time = std::numeric_limits<uint64_t>::max();
118 uint64_t max_end_time = 0;
119
120 for (auto& reader : readers_) {
121 if (!reader->IsValid()) {
122 continue;
123 }
124 const auto& header = reader->GetHeader();
125 if (min_begin_time > header.begin_time()) {
126 min_begin_time = header.begin_time();
127 }
128 if (max_end_time < header.end_time()) {
129 max_end_time = header.end_time();
130 }
131 }
132
133 if (begin_time_ < min_begin_time) {
134 begin_time_ = min_begin_time;
135 }
136
137 if (end_time_ > max_end_time) {
138 end_time_ = max_end_time;
139 }
140
141 curr_begin_time_ = begin_time_;
142}
143
144bool RecordViewer::FillBuffer() {
145 while (curr_begin_time_ <= end_time_ && msg_buffer_.size() < kBufferMinSize) {
146 uint64_t this_begin_time = curr_begin_time_;
147 uint64_t this_end_time = this_begin_time + kStepTimeNanoSec;
148 if (this_end_time > end_time_) {
149 this_end_time = end_time_;
150 }
151
152 for (size_t i = 0; i < readers_.size(); ++i) {
153 if (!readers_finished_[i] &&
154 readers_[i]->GetHeader().end_time() < this_begin_time) {
155 readers_finished_[i] = true;
156 readers_[i]->Reset();
157 }
158 }
159
160 for (size_t i = 0; i < readers_.size(); ++i) {
161 if (readers_finished_[i]) {
162 continue;
163 }
164 auto& reader = readers_[i];
165 while (true) {
166 auto record_msg = std::make_shared<RecordMessage>();
167 if (!reader->ReadMessage(record_msg.get(), this_begin_time,
168 this_end_time)) {
169 break;
170 }
171 msg_buffer_.emplace(std::make_pair(record_msg->time, record_msg));
172 }
173 }
174
175 // because ReadMessage of RecordReader is closed interval, so we add 1 here
176 curr_begin_time_ = this_end_time + 1;
177 }
178
179 return !msg_buffer_.empty();
180}
181
182RecordViewer::Iterator::Iterator(RecordViewer* viewer, bool end)
183 : end_(end), viewer_(viewer) {
184 if (end_) {
185 return;
186 }
187 viewer_->Reset();
188 if (!viewer_->IsValid() || !viewer_->Update(&message_instance_)) {
189 end_ = true;
190 }
191}
192
194 if (other.end_) {
195 return end_;
196 }
197 return index_ == other.index_ && viewer_ == other.viewer_;
198}
199
201 return !(*this == rhs);
202}
203
205 index_++;
206 if (!viewer_->Update(&message_instance_)) {
207 end_ = true;
208 }
209 return *this;
210}
211
212RecordViewer::Iterator::pointer RecordViewer::Iterator::operator->() {
213 return &message_instance_;
214}
215
216RecordViewer::Iterator::reference RecordViewer::Iterator::operator*() {
217 return message_instance_;
218}
219
220} // namespace record
221} // namespace cyber
222} // namespace apollo
bool operator==(Iterator const &other) const
Overloading operator ==.
Iterator & operator++()
Overloading operator ++.
bool operator!=(const Iterator &rhs) const
Overloading operator !=.
pointer operator->()
Overloading operator ->.
reference operator*()
Overloading operator *.
void set_curr_itr(const Iterator &curr_itr)
Iterator begin()
Get the begin iterator.
bool IsValid() const
Is this record reader is valid.
RecordViewer(const RecordReaderPtr &reader, uint64_t begin_time=0, uint64_t end_time=std::numeric_limits< uint64_t >::max(), const std::set< std::string > &channels={})
The constructor with single reader.
std::shared_ptr< RecordReader > RecordReaderPtr
Iterator curr_itr()
Get current iterator.
Iterator end()
Get the end iterator.
#define AERROR
Definition log.h:44
bool Init(const char *binary_name, const std::string &dag_info)
Definition init.cc:98
class register implement
Definition arena_queue.h:37
Basic data struct of record message.