Apollo 10.0
自动驾驶开放平台
record_file_writer.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <fcntl.h>
20
21#include "cyber/common/file.h"
22#include "cyber/time/time.h"
23
24namespace apollo {
25namespace cyber {
26namespace record {
27
37
38RecordFileWriter::RecordFileWriter() : is_writing_(false) {}
39
41
42bool RecordFileWriter::Open(const std::string& path) {
43 std::lock_guard<std::mutex> lock(mutex_);
44 path_ = path;
46 AWARN << "File exist and overwrite, file: " << path_;
47 }
48 fd_ = open(path_.data(), O_CREAT | O_WRONLY,
49 S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
50 if (fd_ < 0) {
51 AERROR << "Open file failed, file: " << path_ << ", fd: " << fd_
52 << ", errno: " << errno;
53 return false;
54 }
55 chunk_active_.reset(new Chunk());
56 chunk_flush_.reset(new Chunk());
57 is_writing_ = true;
58 flush_thread_ = std::make_shared<std::thread>([this]() { this->Flush(); });
59 if (flush_thread_ == nullptr) {
60 AERROR << "Init flush thread error.";
61 return false;
62 }
63 return true;
64}
65
67 if (is_writing_) {
68 // wait for the flush operation that may exist now
69 while (1) {
70 {
71 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
72 if (chunk_flush_->empty()) {
73 break;
74 }
75 }
76 std::this_thread::sleep_for(std::chrono::milliseconds(100));
77 }
78
79 // last swap
80 {
81 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
82 chunk_flush_.swap(chunk_active_);
83 flush_cv_.notify_one();
84 }
85
86 // wait for the last flush operation
87 while (1) {
88 {
89 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
90 if (chunk_flush_->empty()) {
91 break;
92 }
93 }
94 std::this_thread::sleep_for(std::chrono::milliseconds(100));
95 }
96
97 is_writing_ = false;
98 flush_cv_.notify_all();
99 if (flush_thread_ && flush_thread_->joinable()) {
100 flush_thread_->join();
101 flush_thread_ = nullptr;
102 }
103
104 if (!WriteIndex()) {
105 AERROR << "Write index section failed, file: " << path_;
106 }
107
108 header_.set_is_complete(true);
109 if (!WriteHeader(header_)) {
110 AERROR << "Overwrite header section failed, file: " << path_;
111 }
112
113 if (close(fd_) < 0) {
114 AERROR << "Close file failed, file: " << path_ << ", fd: " << fd_
115 << ", errno: " << errno;
116 }
117 }
118}
119
121 std::lock_guard<std::mutex> lock(mutex_);
122 header_ = header;
123 if (!WriteSection<Header>(header_)) {
124 AERROR << "Write header section fail";
125 return false;
126 }
127 return true;
128}
129
130bool RecordFileWriter::WriteIndex() {
131 std::lock_guard<std::mutex> lock(mutex_);
132 for (int i = 0; i < index_.indexes_size(); i++) {
133 SingleIndex* single_index = index_.mutable_indexes(i);
134 if (single_index->type() == SectionType::SECTION_CHANNEL) {
135 ChannelCache* channel_cache = single_index->mutable_channel_cache();
136 if (channel_message_number_map_.find(channel_cache->name()) !=
137 channel_message_number_map_.end()) {
138 channel_cache->set_message_number(
139 channel_message_number_map_[channel_cache->name()]);
140 }
141 }
142 }
143 header_.set_index_position(CurrentPosition());
144 if (!WriteSection<proto::Index>(index_)) {
145 AERROR << "Write section fail";
146 return false;
147 }
148 return true;
149}
150
152 std::lock_guard<std::mutex> lock(mutex_);
153 uint64_t pos = CurrentPosition();
154 if (!WriteSection<Channel>(channel)) {
155 AERROR << "Write section fail";
156 return false;
157 }
158 header_.set_channel_number(header_.channel_number() + 1);
159 SingleIndex* single_index = index_.add_indexes();
160 single_index->set_type(SectionType::SECTION_CHANNEL);
161 single_index->set_position(pos);
162 ChannelCache* channel_cache = new ChannelCache();
163 channel_cache->set_name(channel.name());
164 channel_cache->set_message_number(0);
165 channel_cache->set_message_type(channel.message_type());
166 channel_cache->set_proto_desc(channel.proto_desc());
167 single_index->set_allocated_channel_cache(channel_cache);
168 return true;
169}
170
171bool RecordFileWriter::WriteChunk(const ChunkHeader& chunk_header,
172 const ChunkBody& chunk_body) {
173 std::lock_guard<std::mutex> lock(mutex_);
174 uint64_t pos = CurrentPosition();
175 if (!WriteSection<ChunkHeader>(chunk_header)) {
176 AERROR << "Write chunk header fail";
177 return false;
178 }
179 SingleIndex* single_index = index_.add_indexes();
180 single_index->set_type(SectionType::SECTION_CHUNK_HEADER);
181 single_index->set_position(pos);
182 ChunkHeaderCache* chunk_header_cache = new ChunkHeaderCache();
183 chunk_header_cache->set_begin_time(chunk_header.begin_time());
184 chunk_header_cache->set_end_time(chunk_header.end_time());
185 chunk_header_cache->set_message_number(chunk_header.message_number());
186 chunk_header_cache->set_raw_size(chunk_header.raw_size());
187 single_index->set_allocated_chunk_header_cache(chunk_header_cache);
188
189 pos = CurrentPosition();
190 if (!WriteSection<ChunkBody>(chunk_body)) {
191 AERROR << "Write chunk body fail";
192 return false;
193 }
194 header_.set_chunk_number(header_.chunk_number() + 1);
195 if (header_.begin_time() == 0) {
196 header_.set_begin_time(chunk_header.begin_time());
197 }
198 header_.set_end_time(chunk_header.end_time());
199 header_.set_message_number(header_.message_number() +
200 chunk_header.message_number());
201 single_index = index_.add_indexes();
202 single_index->set_type(SectionType::SECTION_CHUNK_BODY);
203 single_index->set_position(pos);
204 ChunkBodyCache* chunk_body_cache = new ChunkBodyCache();
205 chunk_body_cache->set_message_number(chunk_body.messages_size());
206 single_index->set_allocated_chunk_body_cache(chunk_body_cache);
207 return true;
208}
209
211 chunk_active_->add(message);
212 auto it = channel_message_number_map_.find(message.channel_name());
213 if (it != channel_message_number_map_.end()) {
214 it->second++;
215 } else {
216 channel_message_number_map_.insert(
217 std::make_pair(message.channel_name(), 1));
218 }
219 bool need_flush = false;
220 if (header_.chunk_interval() > 0 &&
221 message.time() - chunk_active_->header_.begin_time() >
223 need_flush = true;
224 }
225 if (!in_writing_ && header_.chunk_raw_size() > 0 &&
226 chunk_active_->header_.raw_size() > header_.chunk_raw_size()) {
227 need_flush = true;
228 }
229 if (!need_flush) {
230 return true;
231 }
232 {
233 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
234 chunk_flush_.swap(chunk_active_);
235 flush_cv_.notify_one();
236 }
237 return true;
238}
239
240void RecordFileWriter::Flush() {
241 while (is_writing_) {
242 std::unique_lock<std::mutex> flush_lock(flush_mutex_);
243 flush_cv_.wait(flush_lock,
244 [this] { return !chunk_flush_->empty() || !is_writing_; });
245 if (!is_writing_) {
246 break;
247 }
248 if (chunk_flush_->empty()) {
249 continue;
250 }
251 in_writing_ = true;
252 if (!WriteChunk(chunk_flush_->header_, *(chunk_flush_->body_.get()))) {
253 AERROR << "Write chunk fail.";
254 }
255 in_writing_ = false;
256 chunk_flush_->clear();
257 }
258}
259
261 const std::string& channel_name) const {
262 auto search = channel_message_number_map_.find(channel_name);
263 if (search != channel_message_number_map_.end()) {
264 return search->second;
265 }
266 return 0;
267}
268
269} // namespace record
270} // namespace cyber
271} // namespace apollo
bool Open(const std::string &path) override
bool WriteHeader(const proto::Header &header)
uint64_t GetMessageNumber(const std::string &channel_name) const
bool WriteChannel(const proto::Channel &channel)
bool WriteMessage(const proto::SingleMessage &message)
#define AERROR
Definition log.h:44
#define AWARN
Definition log.h:43
bool PathExists(const std::string &path)
Check if the path exists.
Definition file.cc:195
class register implement
Definition arena_queue.h:37
optional string message_type
Definition record.proto:84
optional uint64 channel_number
Definition record.proto:70
optional uint64 chunk_number
Definition record.proto:69
optional uint64 chunk_interval
Definition record.proto:66
optional uint64 begin_time
Definition record.proto:71
optional uint64 message_number
Definition record.proto:73
optional uint64 chunk_raw_size
Definition record.proto:76
optional SectionType type
Definition record.proto:20