39 {
40
41 if (begin_time_ >= end_time_) {
42 AERROR <<
"begin time larger or equal than end time, begin_time_: "
43 << begin_time_ << "end_time_: " << end_time_;
44 return false;
45 }
46 for (const auto& channel_name : white_channels_) {
47 if (std::find(black_channels_.begin(), black_channels_.end(),
48 channel_name) != black_channels_.end()) {
49 AERROR <<
"find channel in both of white list and black list, channel: "
50 << channel_name;
51 return false;
52 }
53 }
54
55 AINFO <<
"split record file started.";
56
57
58 if (!reader_.
Open(input_file_)) {
59 AERROR <<
"open input file failed, file: " << input_file_;
60 return false;
61 }
63 if (begin_time_ > header.end_time() || end_time_ < header.begin_time()) {
64 AERROR <<
"time range " << begin_time_ <<
" to " << end_time_
65 << " is not include in this record file.";
66 return false;
67 }
68
69
71 if (!writer_.
Open(output_file_)) {
72 AERROR <<
"open output file failed. file: " << output_file_;
73 return false;
74 }
76 AERROR <<
"write header to output file failed. file: " << output_file_;
77 return false;
78 }
79
80
81 bool skip_next_chunk_body(false);
84 Section section;
86 AERROR <<
"read section failed.";
87 return false;
88 }
89 if (section.type == SectionType::SECTION_INDEX) {
90 break;
91 }
92 switch (section.type) {
93 case SectionType::SECTION_CHANNEL: {
94 Channel chan;
95 if (!reader_.
ReadSection<Channel>(section.size, &chan)) {
96 AERROR <<
"read channel section fail.";
97 return false;
98 }
99 if (white_channels_.empty() ||
100 std::find(white_channels_.begin(), white_channels_.end(),
101 chan.name()) != white_channels_.end()) {
102 if (std::find(black_channels_.begin(), black_channels_.end(),
103 chan.name()) == black_channels_.end()) {
105 }
106 }
107 break;
108 }
109 case SectionType::SECTION_CHUNK_HEADER: {
110 ChunkHeader chdr;
111 if (!reader_.
ReadSection<ChunkHeader>(section.size, &chdr)) {
112 AERROR <<
"read chunk header section fail.";
113 return false;
114 }
115 if (begin_time_ > chdr.end_time() || end_time_ < chdr.begin_time()) {
116 skip_next_chunk_body = true;
117 }
118 break;
119 }
120 case SectionType::SECTION_CHUNK_BODY: {
121 if (skip_next_chunk_body) {
123 skip_next_chunk_body = false;
124 break;
125 }
126 ChunkBody cbd;
127 if (!reader_.
ReadSection<ChunkBody>(section.size, &cbd)) {
128 AERROR <<
"read chunk body section fail.";
129 return false;
130 }
131 for (int idx = 0; idx < cbd.messages_size(); ++idx) {
132 if (!white_channels_.empty() &&
133 std::find(white_channels_.begin(), white_channels_.end(),
134 cbd.messages(idx).channel_name()) ==
135 white_channels_.end()) {
136 continue;
137 }
138 if (std::find(black_channels_.begin(), black_channels_.end(),
139 cbd.messages(idx).channel_name()) !=
140 black_channels_.end()) {
141 continue;
142 }
143 if (cbd.messages(idx).time() < begin_time_ ||
144 cbd.messages(idx).time() > end_time_) {
145 continue;
146 }
148 AERROR <<
"add new message failed.";
149 return false;
150 }
151 }
152 break;
153 }
154 default: {
155 AERROR <<
"this section should not be here, section type: "
156 << section.type;
157 break;
158 }
159 }
160 }
161 AINFO <<
"split record file done.";
162 return true;
163}
const proto::Header & GetHeader() const
bool ReadSection(Section *section)
bool SkipSection(int64_t size)
bool Open(const std::string &path) override
bool Open(const std::string &path) override
bool WriteHeader(const proto::Header &header)
bool WriteChannel(const proto::Channel &channel)
bool WriteMessage(const proto::SingleMessage &message)