Apollo 10.0
自动驾驶开放平台
record_reader.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <utility>
20
21namespace apollo {
22namespace cyber {
23namespace record {
24
29
31
32RecordReader::RecordReader(const std::string& file) {
33 file_reader_.reset(new RecordFileReader());
34 if (!file_reader_->Open(file)) {
35 AERROR << "Failed to open record file: " << file;
36 return;
37 }
38 chunk_.reset(new ChunkBody());
39 is_valid_ = true;
40 header_ = file_reader_->GetHeader();
41 if (file_reader_->ReadIndex()) {
42 index_ = file_reader_->GetIndex();
43 for (int i = 0; i < index_.indexes_size(); ++i) {
44 auto single_idx = index_.mutable_indexes(i);
45 if (single_idx->type() != SectionType::SECTION_CHANNEL) {
46 continue;
47 }
48 if (!single_idx->has_channel_cache()) {
49 AERROR << "Single channel index does not have channel_cache.";
50 continue;
51 }
52 auto channel_cache = single_idx->mutable_channel_cache();
53 channel_info_.insert(
54 std::make_pair(channel_cache->name(), *channel_cache));
55 }
56 }
57 file_reader_->Reset();
58}
59
61 file_reader_->Reset();
62 reach_end_ = false;
63 message_index_ = 0;
64 chunk_.reset(new ChunkBody());
65}
66
67std::set<std::string> RecordReader::GetChannelList() const {
68 std::set<std::string> channel_list;
69 for (auto& item : channel_info_) {
70 channel_list.insert(item.first);
71 }
72 return channel_list;
73}
74
75bool RecordReader::ReadMessage(RecordMessage* message, uint64_t begin_time,
76 uint64_t end_time) {
77 if (!is_valid_) {
78 return false;
79 }
80
81 if (begin_time > header_.end_time() || end_time < header_.begin_time()) {
82 return false;
83 }
84
85 while (message_index_ < chunk_->messages_size()) {
86 const auto& next_message = chunk_->messages(message_index_);
87 uint64_t time = next_message.time();
88 if (time > end_time) {
89 return false;
90 }
91 ++message_index_;
92 if (time < begin_time) {
93 continue;
94 }
95
96 message->channel_name = next_message.channel_name();
97 message->content = next_message.content();
98 message->time = time;
99 return true;
100 }
101
102 ADEBUG << "Read next chunk.";
103 if (ReadNextChunk(begin_time, end_time)) {
104 ADEBUG << "Read chunk successfully.";
105 message_index_ = 0;
106 return ReadMessage(message, begin_time, end_time);
107 }
108 ADEBUG << "No chunk to read.";
109 return false;
110}
111
112bool RecordReader::ReadNextChunk(uint64_t begin_time, uint64_t end_time) {
113 bool skip_next_chunk_body = false;
114 while (!reach_end_) {
115 Section section;
116 if (!file_reader_->ReadSection(&section)) {
117 AERROR << "Failed to read section, file: " << file_reader_->GetPath();
118 return false;
119 }
120 switch (section.type) {
121 case SectionType::SECTION_INDEX: {
122 file_reader_->SkipSection(section.size);
123 reach_end_ = true;
124 break;
125 }
126 case SectionType::SECTION_CHANNEL: {
127 ADEBUG << "Read channel section of size: " << section.size;
128 Channel channel;
129 if (!file_reader_->ReadSection<Channel>(section.size, &channel)) {
130 AERROR << "Failed to read channel section.";
131 return false;
132 }
133 break;
134 }
135 case SectionType::SECTION_CHUNK_HEADER: {
136 ADEBUG << "Read chunk header section of size: " << section.size;
137 ChunkHeader header;
138 if (!file_reader_->ReadSection<ChunkHeader>(section.size, &header)) {
139 AERROR << "Failed to read chunk header section.";
140 return false;
141 }
142 if (header.end_time() < begin_time) {
143 skip_next_chunk_body = true;
144 }
145 if (header.begin_time() > end_time) {
146 return false;
147 }
148 break;
149 }
150 case SectionType::SECTION_CHUNK_BODY: {
151 if (skip_next_chunk_body) {
152 file_reader_->SkipSection(section.size);
153 skip_next_chunk_body = false;
154 break;
155 }
156
157 chunk_.reset(new ChunkBody());
158 if (!file_reader_->ReadSection<ChunkBody>(section.size, chunk_.get())) {
159 AERROR << "Failed to read chunk body section.";
160 return false;
161 }
162 return true;
163 }
164 default: {
165 AERROR << "Invalid section, type: " << section.type
166 << ", size: " << section.size;
167 return false;
168 }
169 }
170 }
171 return false;
172}
173
174uint64_t RecordReader::GetMessageNumber(const std::string& channel_name) const {
175 auto search = channel_info_.find(channel_name);
176 if (search == channel_info_.end()) {
177 return 0;
178 }
179 return search->second.message_number();
180}
181
183 const std::string& channel_name) const {
184 auto search = channel_info_.find(channel_name);
185 if (search == channel_info_.end()) {
186 return kEmptyString;
187 }
188 return search->second.message_type();
189}
190
191const std::string& RecordReader::GetProtoDesc(
192 const std::string& channel_name) const {
193 auto search = channel_info_.find(channel_name);
194 if (search == channel_info_.end()) {
195 return kEmptyString;
196 }
197 return search->second.proto_desc();
198}
199
200} // namespace record
201} // namespace cyber
202} // namespace apollo
void Reset()
Reset the message index of record reader.
const std::string & GetProtoDesc(const std::string &channel_name) const override
Get proto descriptor string by channel name.
bool ReadMessage(RecordMessage *message, uint64_t begin_time=0, uint64_t end_time=std::numeric_limits< uint64_t >::max())
Read one message from reader.
RecordReader(const std::string &file)
The constructor with record file path as parameter.
uint64_t GetMessageNumber(const std::string &channel_name) const override
Get message number by channel name.
const std::string & GetMessageType(const std::string &channel_name) const override
Get message type by channel name.
std::set< std::string > GetChannelList() const override
Get channel list.
virtual ~RecordReader()
The destructor.
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
class register implement
Definition arena_queue.h:37
optional uint64 begin_time
Definition record.proto:71
Basic data struct of record message.
std::string content
The content of the message.
uint64_t time
The time (nanosecond) of the message.
std::string channel_name
The channel name of the message.
proto::SectionType type
Definition section.h:25