Apollo 10.0
自动驾驶开放平台
record_file_writer.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_RECORD_FILE_RECORD_FILE_WRITER_H_
18#define CYBER_RECORD_FILE_RECORD_FILE_WRITER_H_
19
20#include <condition_variable>
21#include <fstream>
22#include <memory>
23#include <string>
24#include <thread>
25#include <type_traits>
26#include <unordered_map>
27#include <utility>
28
29#include "google/protobuf/io/zero_copy_stream_impl.h"
30#include "google/protobuf/message.h"
31#include "google/protobuf/text_format.h"
32
33#include "cyber/common/log.h"
36#include "cyber/time/time.h"
37
38namespace apollo {
39namespace cyber {
40namespace record {
41
42struct Chunk {
43 Chunk() { clear(); }
44
45 inline void clear() {
46 body_.reset(new proto::ChunkBody());
47 header_.set_begin_time(0);
48 header_.set_end_time(0);
49 header_.set_message_number(0);
50 header_.set_raw_size(0);
51 }
52
53 inline void add(const proto::SingleMessage& message) {
54 std::lock_guard<std::mutex> lock(mutex_);
55 proto::SingleMessage* p_message = body_->add_messages();
56 *p_message = message;
57 if (header_.begin_time() == 0) {
58 header_.set_begin_time(message.time());
59 }
60 if (header_.begin_time() > message.time()) {
61 header_.set_begin_time(message.time());
62 }
63 if (header_.end_time() < message.time()) {
64 header_.set_end_time(message.time());
65 }
66 header_.set_message_number(header_.message_number() + 1);
67 header_.set_raw_size(header_.raw_size() + message.content().size());
68 }
69
70 inline bool empty() { return header_.message_number() == 0; }
71
72 std::mutex mutex_;
74 std::unique_ptr<proto::ChunkBody> body_ = nullptr;
75};
76
78 public:
80 virtual ~RecordFileWriter();
81 bool Open(const std::string& path) override;
82 void Close() override;
83 bool WriteHeader(const proto::Header& header);
84 bool WriteChannel(const proto::Channel& channel);
85 bool WriteMessage(const proto::SingleMessage& message);
86 uint64_t GetMessageNumber(const std::string& channel_name) const;
87
88 private:
89 bool WriteChunk(const proto::ChunkHeader& chunk_header,
90 const proto::ChunkBody& chunk_body);
91 template <typename T>
92 bool WriteSection(const T& message);
93 bool WriteIndex();
94 void Flush();
95 std::atomic_bool is_writing_;
96 std::atomic_bool in_writing_{false};
97 std::unique_ptr<Chunk> chunk_active_ = nullptr;
98 std::unique_ptr<Chunk> chunk_flush_ = nullptr;
99 std::shared_ptr<std::thread> flush_thread_ = nullptr;
100 std::mutex flush_mutex_;
101 std::condition_variable flush_cv_;
102 std::unordered_map<std::string, uint64_t> channel_message_number_map_;
103};
104
105template <typename T>
106bool RecordFileWriter::WriteSection(const T& message) {
108 if (std::is_same<T, proto::ChunkHeader>::value) {
110 } else if (std::is_same<T, proto::ChunkBody>::value) {
112 } else if (std::is_same<T, proto::Channel>::value) {
114 } else if (std::is_same<T, proto::Header>::value) {
116 if (!SetPosition(0)) {
117 AERROR << "Jump to position #0 failed";
118 return false;
119 }
120 } else if (std::is_same<T, proto::Index>::value) {
122 } else {
123 AERROR << "Do not support this template typename.";
124 return false;
125 }
126 Section section;
128 memset(&section, 0, sizeof(section));
129 section = {type, static_cast<int64_t>(message.ByteSizeLong())};
130 ssize_t count = write(fd_, &section, sizeof(section));
131 if (count < 0) {
132 AERROR << "Write fd failed, fd: " << fd_ << ", errno: " << errno;
133 return false;
134 }
135 if (count != sizeof(section)) {
136 AERROR << "Write fd failed, fd: " << fd_
137 << ", expect count: " << sizeof(section)
138 << ", actual count: " << count;
139 return false;
140 }
141 {
142 google::protobuf::io::FileOutputStream raw_output(fd_);
143 message.SerializeToZeroCopyStream(&raw_output);
144 }
146 static char blank[HEADER_LENGTH] = {'0'};
147 count = write(fd_, &blank, HEADER_LENGTH - message.ByteSizeLong());
148 if (count < 0) {
149 AERROR << "Write fd failed, fd: " << fd_ << ", errno: " << errno;
150 return false;
151 }
152 if (static_cast<size_t>(count) != HEADER_LENGTH - message.ByteSizeLong()) {
153 AERROR << "Write fd failed, fd: " << fd_
154 << ", expect count: " << sizeof(section)
155 << ", actual count: " << count;
156 return false;
157 }
158 }
159 header_.set_size(CurrentPosition());
160 return true;
161}
162
163} // namespace record
164} // namespace cyber
165} // namespace apollo
166
167#endif // CYBER_RECORD_FILE_RECORD_FILE_WRITER_H_
bool Open(const std::string &path) override
bool WriteHeader(const proto::Header &header)
uint64_t GetMessageNumber(const std::string &channel_name) const
bool WriteChannel(const proto::Channel &channel)
bool WriteMessage(const proto::SingleMessage &message)
#define AERROR
Definition log.h:44
class register implement
Definition arena_queue.h:37
void add(const proto::SingleMessage &message)
std::unique_ptr< proto::ChunkBody > body_