Apollo 10.0
自动驾驶开放平台
record_writer.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <iomanip>
20#include <iostream>
21
22#include "cyber/common/log.h"
24
25namespace apollo {
26namespace cyber {
27namespace record {
28
30using proto::Channel;
31using proto::SingleMessage;
32
34
36
38
39bool RecordWriter::Open(const std::string& file) {
40 file_ = file;
41 file_index_ = 0;
42 sstream_.str(std::string());
43 sstream_.clear();
44 sstream_ << "." << std::setw(5) << std::setfill('0') << file_index_++ << ".";
46 path_ = file_ + sstream_.str() +
47 UnixSecondsToString(time(nullptr), "%Y%m%d%H%M%S");
48 } else {
49 path_ = file_;
50 }
51 file_writer_.reset(new RecordFileWriter());
52 if (!file_writer_->Open(path_)) {
53 AERROR << "Failed to open output record file: " << path_;
54 return false;
55 }
56 if (!file_writer_->WriteHeader(header_)) {
57 AERROR << "Failed to write header: " << path_;
58 file_writer_->Close();
59 return false;
60 }
61 is_opened_ = true;
62 return is_opened_;
63}
64
66 if (is_opened_) {
67 file_writer_->Close();
68 is_opened_ = false;
69 }
70}
71
72bool RecordWriter::SplitOutfile() {
73 file_writer_.reset(new RecordFileWriter());
74 if (file_index_ > 99999) {
75 AWARN << "More than 99999 record files had been recored, will restart "
76 << "counting from 0.";
77 file_index_ = 0;
78 }
79 sstream_.str(std::string());
80 sstream_.clear();
81 sstream_ << "." << std::setw(5) << std::setfill('0') << file_index_++ << ".";
82 path_ = file_ + sstream_.str() +
83 UnixSecondsToString(time(nullptr), "%Y%m%d%H%M%S");
84 segment_raw_size_ = 0;
85 segment_begin_time_ = 0;
86 if (!file_writer_->Open(path_)) {
87 AERROR << "Failed to open record file: " << path_;
88 return false;
89 }
90 if (!file_writer_->WriteHeader(header_)) {
91 AERROR << "Failed to write header for record file: " << path_;
92 return false;
93 }
94 for (const auto& i : channel_message_number_map_) {
95 Channel channel;
96 channel.set_name(i.first);
97 channel.set_message_type(channel_message_type_map_[i.first]);
98 channel.set_proto_desc(channel_proto_desc_map_[i.first]);
99 if (!file_writer_->WriteChannel(channel)) {
100 AERROR << "Failed to write channel for record file: " << path_;
101 return false;
102 }
103 }
104 return true;
105}
106
107bool RecordWriter::WriteChannel(const std::string& channel_name,
108 const std::string& message_type,
109 const std::string& proto_desc) {
110 std::lock_guard<std::mutex> lg(mutex_);
111 if (IsNewChannel(channel_name)) {
112 OnNewChannel(channel_name, message_type, proto_desc);
113 Channel channel;
114 channel.set_name(channel_name);
115 channel.set_message_type(message_type);
116 channel.set_proto_desc(proto_desc);
117 if (!file_writer_->WriteChannel(channel)) {
118 AERROR << "Failed to write channel: " << channel_name;
119 return false;
120 }
121 } else {
122 AWARN << "Intercept write channel request, duplicate channel: "
123 << channel_name;
124 }
125 return true;
126}
127
128bool RecordWriter::WriteMessage(const SingleMessage& message) {
129 std::lock_guard<std::mutex> lg(mutex_);
130 OnNewMessage(message.channel_name());
131 if (!file_writer_->WriteMessage(message)) {
132 AERROR << "Write message is failed.";
133 return false;
134 }
135
136 segment_raw_size_ += message.content().size();
137 if (segment_begin_time_ == 0) {
138 segment_begin_time_ = message.time();
139 }
140 if (segment_begin_time_ > message.time()) {
141 segment_begin_time_ = message.time();
142 }
143
144 if ((header_.segment_interval() > 0 &&
145 message.time() - segment_begin_time_ > header_.segment_interval()) ||
146 (header_.segment_raw_size() > 0 &&
147 segment_raw_size_ > header_.segment_raw_size())) {
148 file_writer_backup_.swap(file_writer_);
149 file_writer_backup_->Close();
150 if (!SplitOutfile()) {
151 AERROR << "Split out file is failed.";
152 return false;
153 }
154 }
155 return true;
156}
157
158bool RecordWriter::SetSizeOfFileSegmentation(uint64_t size_kilobytes) {
159 if (is_opened_) {
160 AWARN << "Please call this interface before opening file.";
161 return false;
162 }
163 header_.set_segment_raw_size(size_kilobytes * 1024UL);
164 return true;
165}
166
168 if (is_opened_) {
169 AWARN << "Please call this interface before opening file.";
170 return false;
171 }
172 header_.set_segment_interval(time_sec * 1000000000UL);
173 return true;
174}
175
176bool RecordWriter::IsNewChannel(const std::string& channel_name) const {
177 return channel_message_number_map_.find(channel_name) ==
178 channel_message_number_map_.end();
179}
180
181void RecordWriter::OnNewChannel(const std::string& channel_name,
182 const std::string& message_type,
183 const std::string& proto_desc) {
184 channel_message_number_map_[channel_name] = 0;
185 channel_message_type_map_[channel_name] = message_type;
186 channel_proto_desc_map_[channel_name] = proto_desc;
187}
188
189void RecordWriter::OnNewMessage(const std::string& channel_name) {
190 auto iter = channel_message_number_map_.find(channel_name);
191 if (iter != channel_message_number_map_.end()) {
192 iter->second++;
193 }
194}
195
196uint64_t RecordWriter::GetMessageNumber(const std::string& channel_name) const {
197 auto search = channel_message_number_map_.find(channel_name);
198 if (search != channel_message_number_map_.end()) {
199 return search->second;
200 }
201 return 0;
202}
203
205 const std::string& channel_name) const {
206 auto search = channel_message_type_map_.find(channel_name);
207 if (search != channel_message_type_map_.end()) {
208 return search->second;
209 }
210 return kEmptyString;
211}
212
213const std::string& RecordWriter::GetProtoDesc(
214 const std::string& channel_name) const {
215 auto search = channel_proto_desc_map_.find(channel_name);
216 if (search != channel_proto_desc_map_.end()) {
217 return search->second;
218 }
219 return kEmptyString;
220}
221
222std::set<std::string> RecordWriter::GetChannelList() const {
223 std::set<std::string> channel_list;
224 for (const auto& item : channel_message_number_map_) {
225 channel_list.insert(item.first);
226 }
227 return channel_list;
228}
229
230} // namespace record
231} // namespace cyber
232} // namespace apollo
static proto::Header GetHeader()
Build a default record header.
uint64_t GetMessageNumber(const std::string &channel_name) const override
Get message number by channel name.
const std::string & GetProtoDesc(const std::string &channel_name) const override
Get proto descriptor string by channel name.
bool SetIntervalOfFileSegmentation(uint64_t time_sec)
Set max interval (Second) to segment record file.
bool SetSizeOfFileSegmentation(uint64_t size_kilobytes)
Set max size (KB) to segment record file
bool Open(const std::string &file)
Open a record to write.
RecordWriter()
The default constructor.
bool WriteMessage(const std::string &channel_name, const MessageT &message, const uint64_t time_nanosec, const std::string &proto_desc="")
Write a message to record.
virtual ~RecordWriter()
Virtual Destructor.
bool IsNewChannel(const std::string &channel_name) const
Is a new channel recording or not.
const std::string & GetMessageType(const std::string &channel_name) const override
Get message type by channel name.
std::set< std::string > GetChannelList() const override
Get channel list.
bool WriteChannel(const std::string &channel_name, const std::string &message_type, const std::string &proto_desc)
Write a channel to record.
#define AERROR
Definition log.h:44
#define AWARN
Definition log.h:43
std::string UnixSecondsToString(uint64_t unix_seconds, const std::string &format_str="%Y-%m-%d-%H:%M:%S")
class register implement
Definition arena_queue.h:37
optional uint64 segment_interval
Definition record.proto:67
optional uint64 segment_raw_size
Definition record.proto:77