31 const std::set<std::string>& channels)
32 : begin_time_(begin_time),
41 uint64_t begin_time, uint64_t end_time,
42 const std::set<std::string>& channels)
43 : begin_time_(begin_time),
52 if (begin_time_ > end_time_) {
53 AERROR <<
"Begin time must be earlier than end time"
54 <<
", begin_time=" << begin_time_ <<
", end_time=" << end_time_;
63 if (msg_buffer_.empty() && !FillBuffer()) {
66 auto& msg = msg_buffer_.begin()->second;
67 if (channels_.empty() || channels_.count(msg->channel_name) == 1) {
71 msg_buffer_.erase(msg_buffer_.begin());
85void RecordViewer::Init() {
87 for (
auto& reader : readers_) {
88 auto all_channel = reader->GetChannelList();
89 std::set_intersection(all_channel.begin(), all_channel.end(),
90 channels_.begin(), channels_.end(),
91 std::inserter(channel_list_, channel_list_.end()));
93 readers_finished_.resize(readers_.size(),
false);
96 std::sort(readers_.begin(), readers_.end(),
98 const auto& lhs_header = lhs->GetHeader();
99 const auto& rhs_header = rhs->GetHeader();
100 if (lhs_header.begin_time() == rhs_header.begin_time()) {
101 return lhs_header.end_time() < rhs_header.end_time();
103 return lhs_header.begin_time() < rhs_header.begin_time();
107void RecordViewer::Reset() {
108 for (
auto& reader : readers_) {
111 std::fill(readers_finished_.begin(), readers_finished_.end(),
false);
112 curr_begin_time_ = begin_time_;
116void RecordViewer::UpdateTime() {
117 uint64_t min_begin_time = std::numeric_limits<uint64_t>::max();
118 uint64_t max_end_time = 0;
120 for (
auto& reader : readers_) {
121 if (!reader->IsValid()) {
124 const auto& header = reader->GetHeader();
125 if (min_begin_time > header.begin_time()) {
126 min_begin_time = header.begin_time();
128 if (max_end_time < header.end_time()) {
129 max_end_time = header.end_time();
133 if (begin_time_ < min_begin_time) {
134 begin_time_ = min_begin_time;
137 if (end_time_ > max_end_time) {
138 end_time_ = max_end_time;
141 curr_begin_time_ = begin_time_;
144bool RecordViewer::FillBuffer() {
145 while (curr_begin_time_ <= end_time_ && msg_buffer_.size() < kBufferMinSize) {
146 uint64_t this_begin_time = curr_begin_time_;
147 uint64_t this_end_time = this_begin_time + kStepTimeNanoSec;
148 if (this_end_time > end_time_) {
149 this_end_time = end_time_;
152 for (
size_t i = 0; i < readers_.size(); ++i) {
153 if (!readers_finished_[i] &&
154 readers_[i]->GetHeader().end_time() < this_begin_time) {
155 readers_finished_[i] =
true;
156 readers_[i]->Reset();
160 for (
size_t i = 0; i < readers_.size(); ++i) {
161 if (readers_finished_[i]) {
164 auto& reader = readers_[i];
166 auto record_msg = std::make_shared<RecordMessage>();
167 if (!reader->ReadMessage(record_msg.get(), this_begin_time,
171 msg_buffer_.emplace(std::make_pair(record_msg->time, record_msg));
176 curr_begin_time_ = this_end_time + 1;
179 return !msg_buffer_.empty();
183 : end_(end), viewer_(viewer) {
188 if (!viewer_->
IsValid() || !viewer_->Update(&message_instance_)) {
197 return index_ == other.index_ && viewer_ == other.viewer_;
201 return !(*
this == rhs);
206 if (!viewer_->Update(&message_instance_)) {
213 return &message_instance_;
217 return message_instance_;
bool operator==(Iterator const &other) const
Overloading operator ==.
Iterator & operator++()
Overloading operator ++.
bool operator!=(const Iterator &rhs) const
Overloading operator !=.
pointer operator->()
Overloading operator ->.
reference operator*()
Overloading operator *.
void set_curr_itr(const Iterator &curr_itr)
Iterator begin()
Get the begin iterator.
bool IsValid() const
Is this record reader is valid.
RecordViewer(const RecordReaderPtr &reader, uint64_t begin_time=0, uint64_t end_time=std::numeric_limits< uint64_t >::max(), const std::set< std::string > &channels={})
The constructor with single reader.
std::shared_ptr< RecordReader > RecordReaderPtr
Iterator curr_itr()
Get current iterator.
Iterator end()
Get the end iterator.
bool Init(const char *binary_name, const std::string &dag_info)
Basic data struct of record message.