Apollo 10.0
自动驾驶开放平台
subscriber_listener.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2024 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include "cyber/common/log.h"
21
22namespace apollo {
23namespace cyber {
24namespace transport {
25namespace dispatcher {
26
28 : callback_(callback) {}
29
31
33 eprosima::fastdds::dds::DataReader* reader) {
34 RETURN_IF_NULL(reader);
35 RETURN_IF_NULL(callback_);
36
37 // fetch channel name
38 auto channel_id = common::Hash(reader->get_topicdescription()->get_name());
39 eprosima::fastdds::dds::SampleInfo m_info;
41
42 while (reader->take_next_sample(reinterpret_cast<void*>(&m), &m_info) ==
43 eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK) {
44 if (m_info.valid_data) {
45 char* ptr = reinterpret_cast<char*>(
46 &m_info.related_sample_identity.writer_guid());
47 Identity sender_id(false);
48 sender_id.set_data(ptr);
49 msg_info_.set_sender_id(sender_id);
50
51 Identity spare_id(false);
52 spare_id.set_data(ptr + ID_SIZE);
53 msg_info_.set_spare_id(spare_id);
54
55 msg_info_.set_seq_num(m.seq());
56 msg_info_.set_send_time(m.timestamp());
57 // callback
58 callback_(std::make_shared<std::string>(m.data()), channel_id, msg_info_);
59 } else {
60 AERROR << "Remote writer for topic "
61 << reader->get_topicdescription()->get_name() << " is dead";
62 }
63 }
64}
65
67 eprosima::fastdds::dds::DataReader* reader,
68 const eprosima::fastdds::dds::SubscriptionMatchedStatus& info) {
69 (void)reader;
70 (void)info;
71}
72
73} // namespace dispatcher
74} // namespace transport
75} // namespace cyber
76} // namespace apollo
void set_data(const char *data)
Definition identity.h:45
void set_seq_num(uint64_t seq_num)
void set_spare_id(const Identity &spare_id)
void set_send_time(uint64_t send_time)
void set_sender_id(const Identity &sender_id)
This class represents the structure UnderlayMessage defined by the user in the IDL file.
void seq(uint64_t _seq)
This function sets a value in member seq
void timestamp(uint64_t _timestamp)
This function sets a value in member timestamp
void data(const std::string &_data)
This function copies the value in member data
void on_subscription_matched(eprosima::fastdds::dds::DataReader *reader, const eprosima::fastdds::dds::SubscriptionMatchedStatus &info) override
void on_data_available(eprosima::fastdds::dds::DataReader *reader) override
SubscriberListener(const rtps::subsciber_callback &callback)
#define RETURN_IF_NULL(ptr)
Definition log.h:90
#define AERROR
Definition log.h:44
std::size_t Hash(const std::string &key)
Definition util.h:27
std::function< void(const std::shared_ptr< std::string > &msg_str, uint64_t channel_id, const MessageInfo &msg_info)> subsciber_callback
Definition common_type.h:30
constexpr uint8_t ID_SIZE
Definition identity.h:28
class register implement
Definition arena_queue.h:37