28 const std::vector<std::string>& white_channels,
29 const std::vector<std::string>& black_channels)
31 all_channels_(all_channels),
32 white_channels_(white_channels),
33 black_channels_(black_channels) {
38 const std::vector<std::string>& white_channels,
39 const std::vector<std::string>& black_channels,
42 all_channels_(all_channels),
43 white_channels_(white_channels),
44 black_channels_(black_channels),
50 for (
const auto& channel_name : white_channels_) {
51 if (std::find(black_channels_.begin(), black_channels_.end(),
52 channel_name) != black_channels_.end()) {
53 AERROR <<
"find channel in both of white list and black list, channel: "
59 auto get_patterns_func = [](
const std::vector<std::string>& channels,
60 std::vector<std::regex>* channel_patterns) {
61 for (
const auto& channel_name : channels) {
63 std::string name =
"";
65 for (
auto& c : channel_name) {
72 channel_patterns->emplace_back(name);
73 }
catch (std::regex_error& e) {
78 get_patterns_func(white_channels_, &white_channel_patterns_);
79 get_patterns_func(black_channels_, &black_channel_patterns_);
82 if (!writer_->Open(output_)) {
83 AERROR <<
"Datafile open file error.";
86 std::string node_name =
"cyber_recorder_record_" + std::to_string(getpid());
88 if (node_ ==
nullptr) {
89 AERROR <<
"create node failed, node: " << node_name;
92 if (!InitReadersImpl()) {
93 AERROR <<
" _init_readers error.";
100 std::make_shared<std::thread>([
this]() { this->ShowProgress(); });
101 if (display_thread_ ==
nullptr) {
102 AERROR <<
"init display thread error.";
109 if (!is_started_ || is_stopping_) {
113 if (!FreeReadersImpl()) {
114 AERROR <<
" _free_readers error.";
119 if (display_thread_ && display_thread_->joinable()) {
120 display_thread_->join();
121 display_thread_ =
nullptr;
124 is_stopping_ =
false;
128void Recorder::TopologyCallback(
const ChangeMsg& change_message) {
129 ADEBUG <<
"ChangeMsg in Topology Callback:" << std::endl
130 << change_message.ShortDebugString();
132 ADEBUG <<
"Change message role type is not ROLE_WRITER.";
135 FindNewChannel(change_message.
role_attr());
139 if (!role_attr.has_channel_name() || role_attr.
channel_name().empty()) {
140 AWARN <<
"change message not has a channel name or has an empty one.";
143 if (!role_attr.has_message_type() || role_attr.
message_type().empty()) {
144 AWARN <<
"Change message not has a message type or has an empty one.";
147 if (!role_attr.has_proto_desc() || role_attr.
proto_desc().empty()) {
148 AWARN <<
"Change message not has a proto desc or has an empty one.";
152 if (channel_reader_map_.find(role_attr.
channel_name()) !=
153 channel_reader_map_.end()) {
157 auto channel_match_func = [&role_attr](
158 const std::vector<std::string> channels,
159 std::vector<std::regex>& patterns) ->
bool {
160 if (std::find(channels.begin(), channels.end(), role_attr.
channel_name()) !=
164 for (
const auto& pattern : patterns) {
165 if (std::regex_match(role_attr.
channel_name(), pattern)) {
173 if (!all_channels_ &&
174 !channel_match_func(white_channels_, white_channel_patterns_)) {
176 <<
"' was found, but not in record list.";
180 if (channel_match_func(black_channels_, black_channel_patterns_)) {
182 <<
"' was found, but it appears in the blacklist.";
193bool Recorder::InitReadersImpl() {
194 std::shared_ptr<ChannelManager> channel_manager =
195 TopologyManager::Instance()->channel_manager();
198 std::vector<proto::RoleAttributes> role_attr_vec;
199 channel_manager->GetWriters(&role_attr_vec);
200 for (
auto role_attr : role_attr_vec) {
201 FindNewChannel(role_attr);
205 change_conn_ = channel_manager->AddChangeListener(
206 std::bind(&Recorder::TopologyCallback,
this, std::placeholders::_1));
208 AERROR <<
"change connection is not connected";
214bool Recorder::FreeReadersImpl() {
215 std::shared_ptr<ChannelManager> channel_manager =
216 TopologyManager::Instance()->channel_manager();
218 channel_manager->RemoveChangeListener(change_conn_);
223bool Recorder::InitReaderImpl(
const std::string& channel_name,
224 const std::string& message_type) {
226 std::weak_ptr<Recorder> weak_this = shared_from_this();
227 std::shared_ptr<ReaderBase> reader =
nullptr;
228 auto callback = [weak_this, channel_name](
229 const std::shared_ptr<RawMessage>& raw_message) {
230 auto share_this = weak_this.lock();
234 share_this->ReaderCallback(raw_message, channel_name);
237 config.channel_name = channel_name;
238 config.pending_queue_size =
239 gflags::Int32FromEnv(
"CYBER_PENDING_QUEUE_SIZE", 50);
240 reader = node_->CreateReader<
RawMessage>(config, callback);
241 if (reader ==
nullptr) {
242 AERROR <<
"Create reader failed.";
245 channel_reader_map_[channel_name] = reader;
247 }
catch (
const std::bad_weak_ptr& e) {
253void Recorder::ReaderCallback(
const std::shared_ptr<RawMessage>& message,
254 const std::string& channel_name) {
255 if (!is_started_ || is_stopping_) {
256 AERROR <<
"record procedure is not started or stopping.";
260 if (message ==
nullptr) {
261 AERROR <<
"message is nullptr, channel: " << channel_name;
266 if (!writer_->WriteMessage(channel_name, message, message_time_)) {
267 AERROR <<
"write data fail, channel: " << channel_name;
274void Recorder::ShowProgress() {
275 while (is_started_ && !is_stopping_) {
276 std::cout <<
"\r[RUNNING] Record Time: " << std::setprecision(3)
277 << message_time_ / 1000000000
278 <<
" Progress: " << channel_reader_map_.size() <<
" channels, "
279 << message_count_ <<
" messages";
281 std::this_thread::sleep_for(std::chrono::milliseconds(100));
283 std::cout << std::endl;
uint64_t ToNanosecond() const
convert time to nanosecond.
static Time Now()
get the current time.
Recorder(const std::string &output, bool all_channels, const std::vector< std::string > &white_channels, const std::vector< std::string > &black_channels)
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space)
optional RoleAttributes role_attr
optional RoleType role_type
optional bytes proto_desc
optional string channel_name
optional string message_type