Apollo 10.0
自动驾驶开放平台
apollo::cyber::record::RecordFileWriter类 参考

#include <record_file_writer.h>

类 apollo::cyber::record::RecordFileWriter 继承关系图:
apollo::cyber::record::RecordFileWriter 的协作图:

Public 成员函数

 RecordFileWriter ()
 
virtual ~RecordFileWriter ()
 
bool Open (const std::string &path) override
 
void Close () override
 
bool WriteHeader (const proto::Header &header)
 
bool WriteChannel (const proto::Channel &channel)
 
bool WriteMessage (const proto::SingleMessage &message)
 
uint64_t GetMessageNumber (const std::string &channel_name) const
 
- Public 成员函数 继承自 apollo::cyber::record::RecordFileBase
 RecordFileBase ()=default
 
virtual ~RecordFileBase ()=default
 
const std::string & GetPath () const
 
const proto::HeaderGetHeader () const
 
const proto::IndexGetIndex () const
 
int64_t CurrentPosition ()
 
bool SetPosition (int64_t position)
 

额外继承的成员函数

- Protected 属性 继承自 apollo::cyber::record::RecordFileBase
std::mutex mutex_
 
std::string path_
 
proto::Header header_
 
proto::Index index_
 
int fd_ = -1
 

详细描述

在文件 record_file_writer.h77 行定义.

构造及析构函数说明

◆ RecordFileWriter()

apollo::cyber::record::RecordFileWriter::RecordFileWriter ( )

在文件 record_file_writer.cc38 行定义.

38: is_writing_(false) {}

◆ ~RecordFileWriter()

apollo::cyber::record::RecordFileWriter::~RecordFileWriter ( )
virtual

在文件 record_file_writer.cc40 行定义.

成员函数说明

◆ Close()

void apollo::cyber::record::RecordFileWriter::Close ( )
overridevirtual

实现了 apollo::cyber::record::RecordFileBase.

在文件 record_file_writer.cc66 行定义.

66 {
67 if (is_writing_) {
68 // wait for the flush operation that may exist now
69 while (1) {
70 {
71 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
72 if (chunk_flush_->empty()) {
73 break;
74 }
75 }
76 std::this_thread::sleep_for(std::chrono::milliseconds(100));
77 }
78
79 // last swap
80 {
81 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
82 chunk_flush_.swap(chunk_active_);
83 flush_cv_.notify_one();
84 }
85
86 // wait for the last flush operation
87 while (1) {
88 {
89 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
90 if (chunk_flush_->empty()) {
91 break;
92 }
93 }
94 std::this_thread::sleep_for(std::chrono::milliseconds(100));
95 }
96
97 is_writing_ = false;
98 flush_cv_.notify_all();
99 if (flush_thread_ && flush_thread_->joinable()) {
100 flush_thread_->join();
101 flush_thread_ = nullptr;
102 }
103
104 if (!WriteIndex()) {
105 AERROR << "Write index section failed, file: " << path_;
106 }
107
108 header_.set_is_complete(true);
109 if (!WriteHeader(header_)) {
110 AERROR << "Overwrite header section failed, file: " << path_;
111 }
112
113 if (close(fd_) < 0) {
114 AERROR << "Close file failed, file: " << path_ << ", fd: " << fd_
115 << ", errno: " << errno;
116 }
117 }
118}
bool WriteHeader(const proto::Header &header)
#define AERROR
Definition log.h:44

◆ GetMessageNumber()

uint64_t apollo::cyber::record::RecordFileWriter::GetMessageNumber ( const std::string &  channel_name) const

在文件 record_file_writer.cc260 行定义.

261 {
262 auto search = channel_message_number_map_.find(channel_name);
263 if (search != channel_message_number_map_.end()) {
264 return search->second;
265 }
266 return 0;
267}

◆ Open()

bool apollo::cyber::record::RecordFileWriter::Open ( const std::string &  path)
overridevirtual

实现了 apollo::cyber::record::RecordFileBase.

在文件 record_file_writer.cc42 行定义.

42 {
43 std::lock_guard<std::mutex> lock(mutex_);
44 path_ = path;
46 AWARN << "File exist and overwrite, file: " << path_;
47 }
48 fd_ = open(path_.data(), O_CREAT | O_WRONLY,
49 S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
50 if (fd_ < 0) {
51 AERROR << "Open file failed, file: " << path_ << ", fd: " << fd_
52 << ", errno: " << errno;
53 return false;
54 }
55 chunk_active_.reset(new Chunk());
56 chunk_flush_.reset(new Chunk());
57 is_writing_ = true;
58 flush_thread_ = std::make_shared<std::thread>([this]() { this->Flush(); });
59 if (flush_thread_ == nullptr) {
60 AERROR << "Init flush thread error.";
61 return false;
62 }
63 return true;
64}
#define AWARN
Definition log.h:43
bool PathExists(const std::string &path)
Check if the path exists.
Definition file.cc:195

◆ WriteChannel()

bool apollo::cyber::record::RecordFileWriter::WriteChannel ( const proto::Channel channel)

在文件 record_file_writer.cc151 行定义.

151 {
152 std::lock_guard<std::mutex> lock(mutex_);
153 uint64_t pos = CurrentPosition();
154 if (!WriteSection<Channel>(channel)) {
155 AERROR << "Write section fail";
156 return false;
157 }
158 header_.set_channel_number(header_.channel_number() + 1);
159 SingleIndex* single_index = index_.add_indexes();
160 single_index->set_type(SectionType::SECTION_CHANNEL);
161 single_index->set_position(pos);
162 ChannelCache* channel_cache = new ChannelCache();
163 channel_cache->set_name(channel.name());
164 channel_cache->set_message_number(0);
165 channel_cache->set_message_type(channel.message_type());
166 channel_cache->set_proto_desc(channel.proto_desc());
167 single_index->set_allocated_channel_cache(channel_cache);
168 return true;
169}
optional uint64 channel_number
Definition record.proto:70

◆ WriteHeader()

bool apollo::cyber::record::RecordFileWriter::WriteHeader ( const proto::Header header)

在文件 record_file_writer.cc120 行定义.

120 {
121 std::lock_guard<std::mutex> lock(mutex_);
122 header_ = header;
123 if (!WriteSection<Header>(header_)) {
124 AERROR << "Write header section fail";
125 return false;
126 }
127 return true;
128}

◆ WriteMessage()

bool apollo::cyber::record::RecordFileWriter::WriteMessage ( const proto::SingleMessage message)

在文件 record_file_writer.cc210 行定义.

210 {
211 chunk_active_->add(message);
212 auto it = channel_message_number_map_.find(message.channel_name());
213 if (it != channel_message_number_map_.end()) {
214 it->second++;
215 } else {
216 channel_message_number_map_.insert(
217 std::make_pair(message.channel_name(), 1));
218 }
219 bool need_flush = false;
220 if (header_.chunk_interval() > 0 &&
221 message.time() - chunk_active_->header_.begin_time() >
223 need_flush = true;
224 }
225 if (!in_writing_ && header_.chunk_raw_size() > 0 &&
226 chunk_active_->header_.raw_size() > header_.chunk_raw_size()) {
227 need_flush = true;
228 }
229 if (!need_flush) {
230 return true;
231 }
232 {
233 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
234 chunk_flush_.swap(chunk_active_);
235 flush_cv_.notify_one();
236 }
237 return true;
238}
optional uint64 chunk_interval
Definition record.proto:66
optional uint64 chunk_raw_size
Definition record.proto:76

该类的文档由以下文件生成: