34 if (!file_reader_->Open(file)) {
35 AERROR <<
"Failed to open record file: " << file;
40 header_ = file_reader_->GetHeader();
41 if (file_reader_->ReadIndex()) {
42 index_ = file_reader_->GetIndex();
43 for (
int i = 0; i < index_.indexes_size(); ++i) {
44 auto single_idx = index_.mutable_indexes(i);
45 if (single_idx->type() != SectionType::SECTION_CHANNEL) {
48 if (!single_idx->has_channel_cache()) {
49 AERROR <<
"Single channel index does not have channel_cache.";
52 auto channel_cache = single_idx->mutable_channel_cache();
54 std::make_pair(channel_cache->name(), *channel_cache));
57 file_reader_->Reset();
61 file_reader_->Reset();
68 std::set<std::string> channel_list;
69 for (
auto& item : channel_info_) {
70 channel_list.insert(item.first);
85 while (message_index_ < chunk_->messages_size()) {
86 const auto& next_message = chunk_->messages(message_index_);
87 uint64_t time = next_message.time();
88 if (time > end_time) {
92 if (time < begin_time) {
97 message->
content = next_message.content();
102 ADEBUG <<
"Read next chunk.";
103 if (ReadNextChunk(begin_time, end_time)) {
104 ADEBUG <<
"Read chunk successfully.";
108 ADEBUG <<
"No chunk to read.";
112bool RecordReader::ReadNextChunk(uint64_t begin_time, uint64_t end_time) {
113 bool skip_next_chunk_body =
false;
114 while (!reach_end_) {
116 if (!file_reader_->ReadSection(§ion)) {
117 AERROR <<
"Failed to read section, file: " << file_reader_->GetPath();
120 switch (section.
type) {
121 case SectionType::SECTION_INDEX: {
122 file_reader_->SkipSection(section.
size);
126 case SectionType::SECTION_CHANNEL: {
127 ADEBUG <<
"Read channel section of size: " << section.
size;
129 if (!file_reader_->ReadSection<Channel>(section.
size, &channel)) {
130 AERROR <<
"Failed to read channel section.";
135 case SectionType::SECTION_CHUNK_HEADER: {
136 ADEBUG <<
"Read chunk header section of size: " << section.
size;
138 if (!file_reader_->ReadSection<ChunkHeader>(section.
size, &header)) {
139 AERROR <<
"Failed to read chunk header section.";
142 if (header.end_time() < begin_time) {
143 skip_next_chunk_body =
true;
145 if (header.begin_time() > end_time) {
150 case SectionType::SECTION_CHUNK_BODY: {
151 if (skip_next_chunk_body) {
152 file_reader_->SkipSection(section.
size);
153 skip_next_chunk_body =
false;
157 chunk_.reset(
new ChunkBody());
158 if (!file_reader_->ReadSection<ChunkBody>(section.
size, chunk_.get())) {
159 AERROR <<
"Failed to read chunk body section.";
165 AERROR <<
"Invalid section, type: " << section.
type
166 <<
", size: " << section.
size;
175 auto search = channel_info_.find(channel_name);
176 if (search == channel_info_.end()) {
179 return search->second.message_number();
183 const std::string& channel_name)
const {
184 auto search = channel_info_.find(channel_name);
185 if (search == channel_info_.end()) {
188 return search->second.message_type();
192 const std::string& channel_name)
const {
193 auto search = channel_info_.find(channel_name);
194 if (search == channel_info_.end()) {
197 return search->second.proto_desc();
void Reset()
Reset the message index of record reader.
const std::string & GetProtoDesc(const std::string &channel_name) const override
Get proto descriptor string by channel name.
bool ReadMessage(RecordMessage *message, uint64_t begin_time=0, uint64_t end_time=std::numeric_limits< uint64_t >::max())
Read one message from reader.
RecordReader(const std::string &file)
The constructor with record file path as parameter.
uint64_t GetMessageNumber(const std::string &channel_name) const override
Get message number by channel name.
const std::string & GetMessageType(const std::string &channel_name) const override
Get message type by channel name.
std::set< std::string > GetChannelList() const override
Get channel list.
virtual ~RecordReader()
The destructor.
Basic data struct of record message.
std::string content
The content of the message.
uint64_t time
The time (nanosecond) of the message.
std::string channel_name
The channel name of the message.