Apollo 10.0
自动驾驶开放平台
recorder.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <algorithm>
20
22
23namespace apollo {
24namespace cyber {
25namespace record {
26
27Recorder::Recorder(const std::string& output, bool all_channels,
28 const std::vector<std::string>& white_channels,
29 const std::vector<std::string>& black_channels)
30 : output_(output),
31 all_channels_(all_channels),
32 white_channels_(white_channels),
33 black_channels_(black_channels) {
34 header_ = HeaderBuilder::GetHeader();
35}
36
37Recorder::Recorder(const std::string& output, bool all_channels,
38 const std::vector<std::string>& white_channels,
39 const std::vector<std::string>& black_channels,
40 const proto::Header& header)
41 : output_(output),
42 all_channels_(all_channels),
43 white_channels_(white_channels),
44 black_channels_(black_channels),
45 header_(header) {}
46
48
50 for (const auto& channel_name : white_channels_) {
51 if (std::find(black_channels_.begin(), black_channels_.end(),
52 channel_name) != black_channels_.end()) {
53 AERROR << "find channel in both of white list and black list, channel: "
54 << channel_name;
55 return false;
56 }
57 }
58
59 auto get_patterns_func = [](const std::vector<std::string>& channels,
60 std::vector<std::regex>* channel_patterns) {
61 for (const auto& channel_name : channels) {
62 try {
63 std::string name = "";
64 /* replace escape character: \ to \\ */
65 for (auto& c : channel_name) {
66 if (c != '\\') {
67 name += c;
68 } else {
69 name += "\\";
70 }
71 }
72 channel_patterns->emplace_back(name);
73 } catch (std::regex_error& e) {
74 // ignored if channel name is not a regex string.
75 }
76 }
77 };
78 get_patterns_func(white_channels_, &white_channel_patterns_);
79 get_patterns_func(black_channels_, &black_channel_patterns_);
80
81 writer_.reset(new RecordWriter(header_));
82 if (!writer_->Open(output_)) {
83 AERROR << "Datafile open file error.";
84 return false;
85 }
86 std::string node_name = "cyber_recorder_record_" + std::to_string(getpid());
87 node_ = ::apollo::cyber::CreateNode(node_name);
88 if (node_ == nullptr) {
89 AERROR << "create node failed, node: " << node_name;
90 return false;
91 }
92 if (!InitReadersImpl()) {
93 AERROR << " _init_readers error.";
94 return false;
95 }
96 message_count_ = 0;
97 message_time_ = 0;
98 is_started_ = true;
99 display_thread_ =
100 std::make_shared<std::thread>([this]() { this->ShowProgress(); });
101 if (display_thread_ == nullptr) {
102 AERROR << "init display thread error.";
103 return false;
104 }
105 return true;
106}
107
109 if (!is_started_ || is_stopping_) {
110 return false;
111 }
112 is_stopping_ = true;
113 if (!FreeReadersImpl()) {
114 AERROR << " _free_readers error.";
115 return false;
116 }
117 writer_->Close();
118 node_.reset();
119 if (display_thread_ && display_thread_->joinable()) {
120 display_thread_->join();
121 display_thread_ = nullptr;
122 }
123 is_started_ = false;
124 is_stopping_ = false;
125 return true;
126}
127
128void Recorder::TopologyCallback(const ChangeMsg& change_message) {
129 ADEBUG << "ChangeMsg in Topology Callback:" << std::endl
130 << change_message.ShortDebugString();
131 if (change_message.role_type() != apollo::cyber::proto::ROLE_WRITER) {
132 ADEBUG << "Change message role type is not ROLE_WRITER.";
133 return;
134 }
135 FindNewChannel(change_message.role_attr());
136}
137
138void Recorder::FindNewChannel(const RoleAttributes& role_attr) {
139 if (!role_attr.has_channel_name() || role_attr.channel_name().empty()) {
140 AWARN << "change message not has a channel name or has an empty one.";
141 return;
142 }
143 if (!role_attr.has_message_type() || role_attr.message_type().empty()) {
144 AWARN << "Change message not has a message type or has an empty one.";
145 return;
146 }
147 if (!role_attr.has_proto_desc() || role_attr.proto_desc().empty()) {
148 AWARN << "Change message not has a proto desc or has an empty one.";
149 return;
150 }
151
152 if (channel_reader_map_.find(role_attr.channel_name()) !=
153 channel_reader_map_.end()) {
154 return;
155 }
156
157 auto channel_match_func = [&role_attr](
158 const std::vector<std::string> channels,
159 std::vector<std::regex>& patterns) -> bool {
160 if (std::find(channels.begin(), channels.end(), role_attr.channel_name()) !=
161 channels.end()) {
162 return true;
163 }
164 for (const auto& pattern : patterns) {
165 if (std::regex_match(role_attr.channel_name(), pattern)) {
166 return true;
167 }
168 }
169 return false;
170 };
171
172 // white channel matching
173 if (!all_channels_ &&
174 !channel_match_func(white_channels_, white_channel_patterns_)) {
175 ADEBUG << "New channel '" << role_attr.channel_name()
176 << "' was found, but not in record list.";
177 return;
178 }
179 // black channel matching
180 if (channel_match_func(black_channels_, black_channel_patterns_)) {
181 ADEBUG << "New channel '" << role_attr.channel_name()
182 << "' was found, but it appears in the blacklist.";
183 return;
184 }
185
186 if (!writer_->WriteChannel(role_attr.channel_name(), role_attr.message_type(),
187 role_attr.proto_desc())) {
188 AERROR << "write channel fail, channel:" << role_attr.channel_name();
189 }
190 InitReaderImpl(role_attr.channel_name(), role_attr.message_type());
191}
192
193bool Recorder::InitReadersImpl() {
194 std::shared_ptr<ChannelManager> channel_manager =
195 TopologyManager::Instance()->channel_manager();
196
197 // get historical writers
198 std::vector<proto::RoleAttributes> role_attr_vec;
199 channel_manager->GetWriters(&role_attr_vec);
200 for (auto role_attr : role_attr_vec) {
201 FindNewChannel(role_attr);
202 }
203
204 // listen new writers in future
205 change_conn_ = channel_manager->AddChangeListener(
206 std::bind(&Recorder::TopologyCallback, this, std::placeholders::_1));
207 if (!change_conn_.IsConnected()) {
208 AERROR << "change connection is not connected";
209 return false;
210 }
211 return true;
212}
213
214bool Recorder::FreeReadersImpl() {
215 std::shared_ptr<ChannelManager> channel_manager =
216 TopologyManager::Instance()->channel_manager();
217
218 channel_manager->RemoveChangeListener(change_conn_);
219
220 return true;
221}
222
223bool Recorder::InitReaderImpl(const std::string& channel_name,
224 const std::string& message_type) {
225 try {
226 std::weak_ptr<Recorder> weak_this = shared_from_this();
227 std::shared_ptr<ReaderBase> reader = nullptr;
228 auto callback = [weak_this, channel_name](
229 const std::shared_ptr<RawMessage>& raw_message) {
230 auto share_this = weak_this.lock();
231 if (!share_this) {
232 return;
233 }
234 share_this->ReaderCallback(raw_message, channel_name);
235 };
236 ReaderConfig config;
237 config.channel_name = channel_name;
238 config.pending_queue_size =
239 gflags::Int32FromEnv("CYBER_PENDING_QUEUE_SIZE", 50);
240 reader = node_->CreateReader<RawMessage>(config, callback);
241 if (reader == nullptr) {
242 AERROR << "Create reader failed.";
243 return false;
244 }
245 channel_reader_map_[channel_name] = reader;
246 return true;
247 } catch (const std::bad_weak_ptr& e) {
248 AERROR << e.what();
249 return false;
250 }
251}
252
253void Recorder::ReaderCallback(const std::shared_ptr<RawMessage>& message,
254 const std::string& channel_name) {
255 if (!is_started_ || is_stopping_) {
256 AERROR << "record procedure is not started or stopping.";
257 return;
258 }
259
260 if (message == nullptr) {
261 AERROR << "message is nullptr, channel: " << channel_name;
262 return;
263 }
264
265 message_time_ = Time::Now().ToNanosecond();
266 if (!writer_->WriteMessage(channel_name, message, message_time_)) {
267 AERROR << "write data fail, channel: " << channel_name;
268 return;
269 }
270
271 message_count_++;
272}
273
274void Recorder::ShowProgress() {
275 while (is_started_ && !is_stopping_) {
276 std::cout << "\r[RUNNING] Record Time: " << std::setprecision(3)
277 << message_time_ / 1000000000
278 << " Progress: " << channel_reader_map_.size() << " channels, "
279 << message_count_ << " messages";
280 std::cout.flush();
281 std::this_thread::sleep_for(std::chrono::milliseconds(100));
282 }
283 std::cout << std::endl;
284}
285
286} // namespace record
287} // namespace cyber
288} // namespace apollo
uint64_t ToNanosecond() const
convert time to nanosecond.
Definition time.cc:83
static Time Now()
get the current time.
Definition time.cc:57
static proto::Header GetHeader()
Build a default record header.
Recorder(const std::string &output, bool all_channels, const std::vector< std::string > &white_channels, const std::vector< std::string > &black_channels)
Definition recorder.cc:27
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
#define AWARN
Definition log.h:43
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space)
Definition cyber.cc:33
class register implement
Definition arena_queue.h:37
optional RoleAttributes role_attr