43 std::lock_guard<std::mutex> lock(
mutex_);
46 AWARN <<
"File exist and overwrite, file: " <<
path_;
48 fd_ = open(
path_.data(), O_CREAT | O_WRONLY,
49 S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
52 <<
", errno: " << errno;
55 chunk_active_.reset(
new Chunk());
56 chunk_flush_.reset(
new Chunk());
58 flush_thread_ = std::make_shared<std::thread>([
this]() { this->Flush(); });
59 if (flush_thread_ ==
nullptr) {
60 AERROR <<
"Init flush thread error.";
71 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
72 if (chunk_flush_->empty()) {
76 std::this_thread::sleep_for(std::chrono::milliseconds(100));
81 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
82 chunk_flush_.swap(chunk_active_);
83 flush_cv_.notify_one();
89 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
90 if (chunk_flush_->empty()) {
94 std::this_thread::sleep_for(std::chrono::milliseconds(100));
98 flush_cv_.notify_all();
99 if (flush_thread_ && flush_thread_->joinable()) {
100 flush_thread_->join();
101 flush_thread_ =
nullptr;
105 AERROR <<
"Write index section failed, file: " <<
path_;
110 AERROR <<
"Overwrite header section failed, file: " <<
path_;
113 if (close(
fd_) < 0) {
115 <<
", errno: " << errno;
121 std::lock_guard<std::mutex> lock(
mutex_);
123 if (!WriteSection<Header>(
header_)) {
124 AERROR <<
"Write header section fail";
130bool RecordFileWriter::WriteIndex() {
131 std::lock_guard<std::mutex> lock(
mutex_);
132 for (
int i = 0; i <
index_.indexes_size(); i++) {
134 if (single_index->
type() == SectionType::SECTION_CHANNEL) {
135 ChannelCache* channel_cache = single_index->mutable_channel_cache();
136 if (channel_message_number_map_.find(channel_cache->
name()) !=
137 channel_message_number_map_.end()) {
138 channel_cache->set_message_number(
139 channel_message_number_map_[channel_cache->
name()]);
144 if (!WriteSection<proto::Index>(
index_)) {
145 AERROR <<
"Write section fail";
152 std::lock_guard<std::mutex> lock(
mutex_);
154 if (!WriteSection<Channel>(channel)) {
155 AERROR <<
"Write section fail";
160 single_index->set_type(SectionType::SECTION_CHANNEL);
161 single_index->set_position(pos);
163 channel_cache->set_name(channel.
name());
164 channel_cache->set_message_number(0);
165 channel_cache->set_message_type(channel.
message_type());
166 channel_cache->set_proto_desc(channel.
proto_desc());
167 single_index->set_allocated_channel_cache(channel_cache);
171bool RecordFileWriter::WriteChunk(
const ChunkHeader& chunk_header,
173 std::lock_guard<std::mutex> lock(
mutex_);
175 if (!WriteSection<ChunkHeader>(chunk_header)) {
176 AERROR <<
"Write chunk header fail";
179 SingleIndex* single_index =
index_.add_indexes();
180 single_index->set_type(SectionType::SECTION_CHUNK_HEADER);
181 single_index->set_position(pos);
182 ChunkHeaderCache* chunk_header_cache =
new ChunkHeaderCache();
183 chunk_header_cache->set_begin_time(chunk_header.
begin_time());
184 chunk_header_cache->set_end_time(chunk_header.
end_time());
185 chunk_header_cache->set_message_number(chunk_header.
message_number());
186 chunk_header_cache->set_raw_size(chunk_header.
raw_size());
187 single_index->set_allocated_chunk_header_cache(chunk_header_cache);
190 if (!WriteSection<ChunkBody>(chunk_body)) {
191 AERROR <<
"Write chunk body fail";
201 single_index =
index_.add_indexes();
202 single_index->set_type(SectionType::SECTION_CHUNK_BODY);
203 single_index->set_position(pos);
204 ChunkBodyCache* chunk_body_cache =
new ChunkBodyCache();
205 chunk_body_cache->set_message_number(chunk_body.messages_size());
206 single_index->set_allocated_chunk_body_cache(chunk_body_cache);
211 chunk_active_->add(message);
212 auto it = channel_message_number_map_.find(message.
channel_name());
213 if (it != channel_message_number_map_.end()) {
216 channel_message_number_map_.insert(
219 bool need_flush =
false;
221 message.
time() - chunk_active_->header_.begin_time() >
233 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
234 chunk_flush_.swap(chunk_active_);
235 flush_cv_.notify_one();
240void RecordFileWriter::Flush() {
241 while (is_writing_) {
242 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
243 flush_cv_.wait(flush_lock,
244 [
this] {
return !chunk_flush_->empty() || !is_writing_; });
248 if (chunk_flush_->empty()) {
252 if (!WriteChunk(chunk_flush_->header_, *(chunk_flush_->body_.get()))) {
253 AERROR <<
"Write chunk fail.";
256 chunk_flush_->clear();
261 const std::string& channel_name)
const {
262 auto search = channel_message_number_map_.find(channel_name);
263 if (search != channel_message_number_map_.end()) {
264 return search->second;
int64_t CurrentPosition()
bool Open(const std::string &path) override
bool WriteHeader(const proto::Header &header)
uint64_t GetMessageNumber(const std::string &channel_name) const
bool WriteChannel(const proto::Channel &channel)
bool WriteMessage(const proto::SingleMessage &message)
virtual ~RecordFileWriter()
bool PathExists(const std::string &path)
Check if the path exists.
optional string message_type
optional bytes proto_desc
optional SectionType type
optional string channel_name