31using proto::SingleMessage;
42 sstream_.str(std::string());
44 sstream_ <<
"." << std::setw(5) << std::setfill(
'0') << file_index_++ <<
".";
46 path_ =
file_ + sstream_.str() +
47 UnixSecondsToString(time(
nullptr),
"%Y%m%d%H%M%S");
52 if (!file_writer_->Open(path_)) {
53 AERROR <<
"Failed to open output record file: " << path_;
56 if (!file_writer_->WriteHeader(
header_)) {
57 AERROR <<
"Failed to write header: " << path_;
58 file_writer_->Close();
67 file_writer_->Close();
72bool RecordWriter::SplitOutfile() {
74 if (file_index_ > 99999) {
75 AWARN <<
"More than 99999 record files had been recored, will restart "
76 <<
"counting from 0.";
79 sstream_.str(std::string());
81 sstream_ <<
"." << std::setw(5) << std::setfill(
'0') << file_index_++ <<
".";
82 path_ =
file_ + sstream_.str() +
83 UnixSecondsToString(time(
nullptr),
"%Y%m%d%H%M%S");
84 segment_raw_size_ = 0;
85 segment_begin_time_ = 0;
86 if (!file_writer_->Open(path_)) {
87 AERROR <<
"Failed to open record file: " << path_;
90 if (!file_writer_->WriteHeader(
header_)) {
91 AERROR <<
"Failed to write header for record file: " << path_;
94 for (
const auto& i : channel_message_number_map_) {
96 channel.set_name(i.first);
97 channel.set_message_type(channel_message_type_map_[i.first]);
98 channel.set_proto_desc(channel_proto_desc_map_[i.first]);
99 if (!file_writer_->WriteChannel(channel)) {
100 AERROR <<
"Failed to write channel for record file: " << path_;
108 const std::string& message_type,
109 const std::string& proto_desc) {
110 std::lock_guard<std::mutex> lg(mutex_);
112 OnNewChannel(channel_name, message_type, proto_desc);
114 channel.set_name(channel_name);
115 channel.set_message_type(message_type);
116 channel.set_proto_desc(proto_desc);
117 if (!file_writer_->WriteChannel(channel)) {
118 AERROR <<
"Failed to write channel: " << channel_name;
122 AWARN <<
"Intercept write channel request, duplicate channel: "
129 std::lock_guard<std::mutex> lg(mutex_);
131 if (!file_writer_->WriteMessage(message)) {
132 AERROR <<
"Write message is failed.";
136 segment_raw_size_ += message.
content().size();
137 if (segment_begin_time_ == 0) {
138 segment_begin_time_ = message.
time();
140 if (segment_begin_time_ > message.
time()) {
141 segment_begin_time_ = message.
time();
148 file_writer_backup_.swap(file_writer_);
149 file_writer_backup_->Close();
150 if (!SplitOutfile()) {
151 AERROR <<
"Split out file is failed.";
160 AWARN <<
"Please call this interface before opening file.";
163 header_.set_segment_raw_size(size_kilobytes * 1024UL);
169 AWARN <<
"Please call this interface before opening file.";
172 header_.set_segment_interval(time_sec * 1000000000UL);
177 return channel_message_number_map_.find(channel_name) ==
178 channel_message_number_map_.end();
181void RecordWriter::OnNewChannel(
const std::string& channel_name,
182 const std::string& message_type,
183 const std::string& proto_desc) {
184 channel_message_number_map_[channel_name] = 0;
185 channel_message_type_map_[channel_name] = message_type;
186 channel_proto_desc_map_[channel_name] = proto_desc;
189void RecordWriter::OnNewMessage(
const std::string& channel_name) {
190 auto iter = channel_message_number_map_.find(channel_name);
191 if (iter != channel_message_number_map_.end()) {
197 auto search = channel_message_number_map_.find(channel_name);
198 if (search != channel_message_number_map_.end()) {
199 return search->second;
205 const std::string& channel_name)
const {
206 auto search = channel_message_type_map_.find(channel_name);
207 if (search != channel_message_type_map_.end()) {
208 return search->second;
214 const std::string& channel_name)
const {
215 auto search = channel_proto_desc_map_.find(channel_name);
216 if (search != channel_proto_desc_map_.end()) {
217 return search->second;
223 std::set<std::string> channel_list;
224 for (
const auto& item : channel_message_number_map_) {
225 channel_list.insert(item.first);
uint64_t GetMessageNumber(const std::string &channel_name) const override
Get message number by channel name.
const std::string & GetProtoDesc(const std::string &channel_name) const override
Get proto descriptor string by channel name.
bool SetIntervalOfFileSegmentation(uint64_t time_sec)
Set max interval (Second) to segment record file.
bool SetSizeOfFileSegmentation(uint64_t size_kilobytes)
Set max size (KB) to segment record file
bool Open(const std::string &file)
Open a record to write.
RecordWriter()
The default constructor.
bool WriteMessage(const std::string &channel_name, const MessageT &message, const uint64_t time_nanosec, const std::string &proto_desc="")
Write a message to record.
virtual ~RecordWriter()
Virtual Destructor.
bool IsNewChannel(const std::string &channel_name) const
Is a new channel recording or not.
const std::string & GetMessageType(const std::string &channel_name) const override
Get message type by channel name.
void Close()
Clean the record.
std::set< std::string > GetChannelList() const override
Get channel list.
bool WriteChannel(const std::string &channel_name, const std::string &message_type, const std::string &proto_desc)
Write a channel to record.
std::string UnixSecondsToString(uint64_t unix_seconds, const std::string &format_str="%Y-%m-%d-%H:%M:%S")
optional string channel_name