Apollo 10.0
自动驾驶开放平台
rtps_dispatcher.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_TRANSPORT_DISPATCHER_RTPS_DISPATCHER_H_
18#define CYBER_TRANSPORT_DISPATCHER_RTPS_DISPATCHER_H_
19
20#include <cstdint>
21#include <iostream>
22#include <memory>
23#include <mutex>
24#include <string>
25#include <unordered_map>
26
27#include "cyber/common/log.h"
28#include "cyber/common/macros.h"
31#include "cyber/time/time.h"
38
39namespace apollo {
40namespace cyber {
41namespace transport {
42
43class RtpsDispatcher;
45
47 std::shared_ptr<Participant>;
48
49class RtpsDispatcher : public Dispatcher {
50 public:
51 virtual ~RtpsDispatcher();
52
53 void Shutdown() override;
54
55 template <typename MessageT>
56 void AddListener(const RoleAttributes& self_attr,
57 const MessageListener<MessageT>& listener);
58
59 template <typename MessageT>
60 void AddListener(const RoleAttributes& self_attr,
61 const RoleAttributes& opposite_attr,
62 const MessageListener<MessageT>& listener);
63
64 void SetParticipant(const ParticipantPtr& participant) {
65 participant_ = participant;
66 }
67
68 private:
69 void OnMessage(uint64_t channel_id,
70 const std::shared_ptr<std::string>& msg_str,
71 const MessageInfo& msg_info);
72 void AddSubscriber(const RoleAttributes& self_attr);
73 // key: channel_id
74 std::unordered_map<uint64_t,
75 std::shared_ptr<transport::Subscriber>>
76 subs_;
77 std::mutex subs_mutex_;
78
79 ParticipantPtr participant_;
80
82};
83
84template <typename MessageT>
86 const MessageListener<MessageT>& listener) {
87 auto listener_adapter = [listener, self_attr](
88 const std::shared_ptr<std::string>& msg_str,
89 const MessageInfo& msg_info) {
90 auto msg = std::make_shared<MessageT>();
91 RETURN_IF(!message::ParseFromString(*msg_str, msg.get()));
92 uint64_t recv_time = Time::Now().ToNanosecond();
93 uint64_t send_time = msg_info.send_time();
94 if (send_time > recv_time) {
95 AWARN << "The message is received (" << recv_time
96 << ") earlier than the message is sent (" << send_time << ")";
97 } else {
98 uint64_t diff = recv_time - send_time;
99 // sample transport latency in microsecond
100 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
101 self_attr, diff);
102 }
103 statistics::Statistics::Instance()->SetProcStatus(self_attr, recv_time);
104 listener(msg, msg_info);
105 };
106
107 Dispatcher::AddListener<std::string>(self_attr, listener_adapter);
108 AddSubscriber(self_attr);
109}
110
111template <typename MessageT>
113 const RoleAttributes& opposite_attr,
114 const MessageListener<MessageT>& listener) {
115 auto listener_adapter = [listener, self_attr](
116 const std::shared_ptr<std::string>& msg_str,
117 const MessageInfo& msg_info) {
118 auto msg = std::make_shared<MessageT>();
119 RETURN_IF(!message::ParseFromString(*msg_str, msg.get()));
120 uint64_t recv_time = Time::Now().ToNanosecond();
121 uint64_t send_time = msg_info.send_time();
122 if (send_time > recv_time) {
123 AWARN << "The message is received (" << recv_time
124 << ") earlier than the message is sent (" << send_time << ")";
125 } else {
126 uint64_t diff = recv_time - send_time;
127 // sample transport latency in microsecond
128 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
129 self_attr, diff);
130 }
131 statistics::Statistics::Instance()->SetProcStatus(self_attr, recv_time);
132 listener(msg, msg_info);
133 };
134
135 Dispatcher::AddListener<std::string>(self_attr, opposite_attr,
136 listener_adapter);
137 AddSubscriber(self_attr);
138}
139
140} // namespace transport
141} // namespace cyber
142} // namespace apollo
143
144#endif // CYBER_TRANSPORT_DISPATCHER_RTPS_DISPATCHER_H_
uint64_t ToNanosecond() const
convert time to nanosecond.
Definition time.cc:83
static Time Now()
get the current time.
Definition time.cc:57
void AddListener(const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
void SetParticipant(const ParticipantPtr &participant)
#define DECLARE_SINGLETON(classname)
Definition macros.h:52
#define RETURN_IF(condition)
Definition log.h:106
#define AWARN
Definition log.h:43
std::enable_if< HasParseFromString< T >::value, bool >::type ParseFromString(const std::string &str, T *message)
std::shared_ptr< Participant > ParticipantPtr
std::function< void(const std::shared_ptr< MessageT > &, const MessageInfo &)> MessageListener
Definition dispatcher.h:53
class register implement
Definition arena_queue.h:37