17#ifndef CYBER_RECORD_FILE_RECORD_FILE_WRITER_H_
18#define CYBER_RECORD_FILE_RECORD_FILE_WRITER_H_
20#include <condition_variable>
26#include <unordered_map>
29#include "google/protobuf/io/zero_copy_stream_impl.h"
30#include "google/protobuf/message.h"
31#include "google/protobuf/text_format.h"
54 std::lock_guard<std::mutex> lock(
mutex_);
74 std::unique_ptr<proto::ChunkBody>
body_ =
nullptr;
81 bool Open(
const std::string& path)
override;
82 void Close()
override;
92 bool WriteSection(
const T& message);
95 std::atomic_bool is_writing_;
96 std::atomic_bool in_writing_{
false};
97 std::unique_ptr<Chunk> chunk_active_ =
nullptr;
98 std::unique_ptr<Chunk> chunk_flush_ =
nullptr;
99 std::shared_ptr<std::thread> flush_thread_ =
nullptr;
100 std::mutex flush_mutex_;
101 std::condition_variable flush_cv_;
102 std::unordered_map<std::string, uint64_t> channel_message_number_map_;
106bool RecordFileWriter::WriteSection(
const T& message) {
108 if (std::is_same<T, proto::ChunkHeader>::value) {
110 }
else if (std::is_same<T, proto::ChunkBody>::value) {
112 }
else if (std::is_same<T, proto::Channel>::value) {
114 }
else if (std::is_same<T, proto::Header>::value) {
117 AERROR <<
"Jump to position #0 failed";
120 }
else if (std::is_same<T, proto::Index>::value) {
123 AERROR <<
"Do not support this template typename.";
128 memset(§ion, 0,
sizeof(section));
129 section = {type,
static_cast<int64_t
>(message.ByteSizeLong())};
130 ssize_t count = write(
fd_, §ion,
sizeof(section));
132 AERROR <<
"Write fd failed, fd: " <<
fd_ <<
", errno: " << errno;
135 if (count !=
sizeof(section)) {
137 <<
", expect count: " <<
sizeof(section)
138 <<
", actual count: " << count;
142 google::protobuf::io::FileOutputStream raw_output(
fd_);
143 message.SerializeToZeroCopyStream(&raw_output);
149 AERROR <<
"Write fd failed, fd: " <<
fd_ <<
", errno: " << errno;
152 if (
static_cast<size_t>(count) !=
HEADER_LENGTH - message.ByteSizeLong()) {
154 <<
", expect count: " <<
sizeof(section)
155 <<
", actual count: " << count;
bool SetPosition(int64_t position)
int64_t CurrentPosition()
bool Open(const std::string &path) override
bool WriteHeader(const proto::Header &header)
uint64_t GetMessageNumber(const std::string &channel_name) const
bool WriteChannel(const proto::Channel &channel)
bool WriteMessage(const proto::SingleMessage &message)
virtual ~RecordFileWriter()
void add(const proto::SingleMessage &message)
std::unique_ptr< proto::ChunkBody > body_
proto::ChunkHeader header_