Apollo 10.0
自动驾驶开放平台
record_writer.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_RECORD_RECORD_WRITER_H_
18#define CYBER_RECORD_RECORD_WRITER_H_
19
20#include <cstdint>
21#include <memory>
22#include <mutex>
23#include <set>
24#include <sstream>
25#include <string>
26#include <unordered_map>
27
28#include "cyber/proto/record.pb.h"
29
30#include "cyber/common/log.h"
36
37namespace apollo {
38namespace cyber {
39namespace record {
40
44class RecordWriter : public RecordBase {
45 public:
46 using MessageNumberMap = std::unordered_map<std::string, uint64_t>;
47 using MessageTypeMap = std::unordered_map<std::string, std::string>;
48 using MessageProtoDescMap = std::unordered_map<std::string, std::string>;
49 using FileWriterPtr = std::unique_ptr<RecordFileWriter>;
50
55
61 explicit RecordWriter(const proto::Header& header);
62
66 virtual ~RecordWriter();
67
75 bool Open(const std::string& file);
76
80 void Close();
81
91 bool WriteChannel(const std::string& channel_name,
92 const std::string& message_type,
93 const std::string& proto_desc);
94
106 template <typename MessageT>
107 bool WriteMessage(const std::string& channel_name, const MessageT& message,
108 const uint64_t time_nanosec,
109 const std::string& proto_desc = "");
110
118 bool SetSizeOfFileSegmentation(uint64_t size_kilobytes);
119
127 bool SetIntervalOfFileSegmentation(uint64_t time_sec);
128
136 uint64_t GetMessageNumber(const std::string& channel_name) const override;
137
145 const std::string& GetMessageType(
146 const std::string& channel_name) const override;
147
155 const std::string& GetProtoDesc(
156 const std::string& channel_name) const override;
157
163 std::set<std::string> GetChannelList() const override;
164
170 bool IsNewChannel(const std::string& channel_name) const;
171
172 private:
173 bool WriteMessage(const proto::SingleMessage& single_msg);
174 bool SplitOutfile();
175 void OnNewChannel(const std::string& channel_name,
176 const std::string& message_type,
177 const std::string& proto_desc);
178 void OnNewMessage(const std::string& channel_name);
179
180 std::string path_;
181 uint64_t segment_raw_size_ = 0;
182 uint64_t segment_begin_time_ = 0;
183 uint32_t file_index_ = 0;
184 MessageNumberMap channel_message_number_map_;
185 MessageTypeMap channel_message_type_map_;
186 MessageProtoDescMap channel_proto_desc_map_;
187 FileWriterPtr file_writer_ = nullptr;
188 FileWriterPtr file_writer_backup_ = nullptr;
189 std::mutex mutex_;
190 std::stringstream sstream_;
191};
192
193template <>
194inline bool RecordWriter::WriteMessage(const std::string& channel_name,
195 const std::string& message,
196 const uint64_t time_nanosec,
197 const std::string& proto_desc) {
198 proto::SingleMessage single_msg;
199 single_msg.set_channel_name(channel_name);
200 single_msg.set_content(message);
201 single_msg.set_time(time_nanosec);
202 return WriteMessage(single_msg);
203}
204
205template <>
207 const std::string& channel_name,
208 const std::shared_ptr<message::RawMessage>& message,
209 const uint64_t time_nanosec, const std::string& proto_desc) {
210 if (message == nullptr) {
211 AERROR << "nullptr error, channel: " << channel_name;
212 return false;
213 }
214 return WriteMessage(channel_name, message->message, time_nanosec);
215}
216
217template <typename MessageT>
218bool RecordWriter::WriteMessage(const std::string& channel_name,
219 const MessageT& message,
220 const uint64_t time_nanosec,
221 const std::string& proto_desc) {
222 const std::string& message_type = GetMessageType(channel_name);
223 if (message_type.empty()) {
224 if (!WriteChannel(channel_name, message::GetMessageName<MessageT>(),
225 proto_desc)) {
226 AERROR << "Failed to write meta data to channel [" << channel_name
227 << "].";
228 return false;
229 }
230 } else {
231 if (MessageT::descriptor()->full_name() != message_type) {
232 AERROR << "Message type is invalid, expect: " << message_type
233 << ", actual: " << message::GetMessageName<MessageT>();
234 return false;
235 }
236 }
237 std::string content("");
238 if (!message.SerializeToString(&content)) {
239 AERROR << "Failed to serialize message, channel: " << channel_name;
240 return false;
241 }
242 return WriteMessage(channel_name, content, time_nanosec);
243}
244
245} // namespace record
246} // namespace cyber
247} // namespace apollo
248
249#endif // CYBER_RECORD_RECORD_WRITER_H_
Base class for record reader and writer.
Definition record_base.h:35
std::unordered_map< std::string, std::string > MessageProtoDescMap
std::unordered_map< std::string, std::string > MessageTypeMap
std::unordered_map< std::string, uint64_t > MessageNumberMap
uint64_t GetMessageNumber(const std::string &channel_name) const override
Get message number by channel name.
const std::string & GetProtoDesc(const std::string &channel_name) const override
Get proto descriptor string by channel name.
bool SetIntervalOfFileSegmentation(uint64_t time_sec)
Set max interval (Second) to segment record file.
bool SetSizeOfFileSegmentation(uint64_t size_kilobytes)
Set max size (KB) to segment record file
bool Open(const std::string &file)
Open a record to write.
RecordWriter()
The default constructor.
std::unique_ptr< RecordFileWriter > FileWriterPtr
bool WriteMessage(const std::string &channel_name, const MessageT &message, const uint64_t time_nanosec, const std::string &proto_desc="")
Write a message to record.
virtual ~RecordWriter()
Virtual Destructor.
bool IsNewChannel(const std::string &channel_name) const
Is a new channel recording or not.
const std::string & GetMessageType(const std::string &channel_name) const override
Get message type by channel name.
std::set< std::string > GetChannelList() const override
Get channel list.
bool WriteChannel(const std::string &channel_name, const std::string &message_type, const std::string &proto_desc)
Write a channel to record.
#define AERROR
Definition log.h:44
class register implement
Definition arena_queue.h:37