Apollo 10.0
自动驾驶开放平台
apollo::cyber::transport::ShmDispatcher类 参考

#include <shm_dispatcher.h>

类 apollo::cyber::transport::ShmDispatcher 继承关系图:
apollo::cyber::transport::ShmDispatcher 的协作图:

Public 类型

using SegmentContainer = std::unordered_map< uint64_t, SegmentPtr >
 

Public 成员函数

virtual ~ShmDispatcher ()
 
void Shutdown () override
 
template<typename MessageT >
void AddListener (const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
 
template<typename MessageT >
void AddListener (const RoleAttributes &self_attr, const RoleAttributes &opposite_attr, const MessageListener< MessageT > &listener)
 
template<typename MessageT >
void AddArenaListener (const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
 
template<typename MessageT >
void AddArenaListener (const RoleAttributes &self_attr, const RoleAttributes &opposite_attr, const MessageListener< MessageT > &listener)
 
- Public 成员函数 继承自 apollo::cyber::transport::Dispatcher
 Dispatcher ()
 
virtual ~Dispatcher ()
 
template<typename MessageT >
void AddListener (const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
 
template<typename MessageT >
void AddListener (const RoleAttributes &self_attr, const RoleAttributes &opposite_attr, const MessageListener< MessageT > &listener)
 
template<typename MessageT >
void RemoveListener (const RoleAttributes &self_attr)
 
template<typename MessageT >
void RemoveListener (const RoleAttributes &self_attr, const RoleAttributes &opposite_attr)
 
bool HasChannel (uint64_t channel_id)
 

额外继承的成员函数

- Protected 属性 继承自 apollo::cyber::transport::Dispatcher
std::atomic< bool > is_shutdown_
 
AtomicHashMap< uint64_t, ListenerHandlerBasePtrmsg_listeners_
 
base::AtomicRWLock rw_lock_
 

详细描述

在文件 shm_dispatcher.h50 行定义.

成员类型定义说明

◆ SegmentContainer

在文件 shm_dispatcher.h53 行定义.

构造及析构函数说明

◆ ~ShmDispatcher()

apollo::cyber::transport::ShmDispatcher::~ShmDispatcher ( )
virtual

在文件 shm_dispatcher.cc32 行定义.

成员函数说明

◆ AddArenaListener() [1/2]

template<typename MessageT >
void apollo::cyber::transport::ShmDispatcher::AddArenaListener ( const RoleAttributes self_attr,
const MessageListener< MessageT > &  listener 
)

在文件 shm_dispatcher.h102 行定义.

104 {
105 if (is_shutdown_.load()) {
106 return;
107 }
108 uint64_t channel_id = self_attr.channel_id();
109
110 std::shared_ptr<ListenerHandler<MessageT>> handler;
111 ListenerHandlerBasePtr* handler_base = nullptr;
112 if (arena_msg_listeners_.Get(channel_id, &handler_base)) {
113 handler =
114 std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
115 if (handler == nullptr) {
116 AERROR << "please ensure that readers with the same channel["
117 << self_attr.channel_name()
118 << "] in the same process have the same message type";
119 return;
120 }
121 } else {
122 ADEBUG << "new reader for channel:"
123 << GlobalData::GetChannelById(channel_id);
124 handler.reset(new ListenerHandler<MessageT>());
125 arena_msg_listeners_.Set(channel_id, handler);
126 }
127 handler->Connect(self_attr.id(), listener);
128}
static std::string GetChannelById(uint64_t id)
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr

◆ AddArenaListener() [2/2]

template<typename MessageT >
void apollo::cyber::transport::ShmDispatcher::AddArenaListener ( const RoleAttributes self_attr,
const RoleAttributes opposite_attr,
const MessageListener< MessageT > &  listener 
)

在文件 shm_dispatcher.h131 行定义.

133 {
134 if (is_shutdown_.load()) {
135 return;
136 }
137 uint64_t channel_id = self_attr.channel_id();
138 std::shared_ptr<ListenerHandler<MessageT>> handler;
139 ListenerHandlerBasePtr* handler_base = nullptr;
140 if (arena_msg_listeners_.Get(channel_id, &handler_base)) {
141 handler =
142 std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
143 if (handler == nullptr) {
144 AERROR << "please ensuore that readers with the same channel["
145 << self_attr.channel_name()
146 << "] in the same process have the same message type";
147 return;
148 }
149 } else {
150 ADEBUG << "new reader for channel:"
151 << GlobalData::GetChannelById(channel_id);
152 handler.reset(new ListenerHandler<MessageT>());
153 arena_msg_listeners_.Set(channel_id, handler);
154 }
155 handler->Connect(self_attr.id(), listener);
156}

◆ AddListener() [1/2]

template<typename MessageT >
void apollo::cyber::transport::ShmDispatcher::AddListener ( const RoleAttributes self_attr,
const MessageListener< MessageT > &  listener 
)

在文件 shm_dispatcher.h159 行定义.

160 {
161 // FIXME: make it more clean
162 if (cyber::common::GlobalData::Instance()->IsChannelEnableArenaShm(
163 self_attr.channel_id()) &&
164 self_attr.message_type() != message::MessageType<message::RawMessage>() &&
165 self_attr.message_type() !=
166 message::MessageType<message::PyMessageWrap>()) {
167 auto listener_adapter = [listener, self_attr](
168 const std::shared_ptr<ReadableBlock>& rb,
169 const MessageInfo& msg_info) {
170 auto msg = std::make_shared<MessageT>();
171 // TODO(ALL): read config from msg_info
172 auto arena_manager = ProtobufArenaManager::Instance();
173 auto msg_wrapper = arena_manager->CreateMessageWrapper();
174 memcpy(msg_wrapper->GetData(), rb->buf, 1024);
175 MessageT* msg_p;
176 if (!message::ParseFromArenaMessageWrapper(msg_wrapper.get(), msg.get(),
177 &msg_p)) {
178 AERROR << "ParseFromArenaMessageWrapper failed";
179 }
180 // msg->CopyFrom(*msg_p);
181 // msg = arena_manager->LoadMessage<MessageT>(msg_wrapper.get())
182 auto segment = arena_manager->GetSegment(self_attr.channel_id());
183 auto msg_addr = reinterpret_cast<uint64_t>(msg_p);
184 msg.reset(reinterpret_cast<MessageT*>(msg_addr),
185 [arena_manager, segment, msg_wrapper](MessageT* p) {
186 // fprintf(stderr, "msg deleter invoked\n");
187 // auto related_blocks =
188 // arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
189 // for (auto block_index : related_blocks) {
190 // // segment->ReleaseBlockForReadByIndex(block_index);
191 // segment->RemoveBlockReadLock(block_index);
192 // }
193 });
194 auto related_blocks_for_lock =
195 arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
196 for (int i = 0; i < related_blocks_for_lock.size(); ++i) {
197 auto block_index = related_blocks_for_lock[i];
198 if (!segment->AddBlockReadLock(block_index)) {
199 AWARN << "failed to acquire block for read, channel: "
200 << self_attr.channel_id() << " index: " << block_index;
201 for (int j = 0; j < i; ++j) {
202 // restore the lock
203 segment->RemoveBlockReadLock(related_blocks_for_lock[j]);
204 }
205 return;
206 }
207 }
208
209 auto send_time = msg_info.send_time();
210
211 statistics::Statistics::Instance()->AddRecvCount(self_attr,
212 msg_info.seq_num());
213 statistics::Statistics::Instance()->SetTotalMsgsStatus(
214 self_attr, msg_info.seq_num());
215
216 auto recv_time = Time::Now().ToNanosecond();
217
218 // sampling in microsecond
219 auto tran_diff = (recv_time - send_time) / 1000;
220 if (tran_diff > 0) {
221 // sample transport latency in microsecond
222 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
223 self_attr, tran_diff);
224 }
225 statistics::Statistics::Instance()->SetProcStatus(self_attr,
226 recv_time / 1000);
227 listener(msg, msg_info);
228 auto related_blocks =
229 arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
230 for (auto block_index : related_blocks) {
231 // segment->ReleaseBlockForReadByIndex(block_index);
232 segment->RemoveBlockReadLock(block_index);
233 }
234 };
235
236 AddArenaListener<ReadableBlock>(self_attr, listener_adapter);
237 } else {
238 auto listener_adapter = [listener, self_attr](
239 const std::shared_ptr<ReadableBlock>& rb,
240 const MessageInfo& msg_info) {
241 auto msg = std::make_shared<MessageT>();
242 // TODO(ALL): read config from msg_info
244 rb->buf, static_cast<int>(rb->block->msg_size()), msg.get()));
245
246 auto send_time = msg_info.send_time();
247
248 statistics::Statistics::Instance()->AddRecvCount(self_attr,
249 msg_info.seq_num());
250 statistics::Statistics::Instance()->SetTotalMsgsStatus(
251 self_attr, msg_info.seq_num());
252
253 auto recv_time = Time::Now().ToNanosecond();
254
255 // sampling in microsecond
256 auto tran_diff = (recv_time - send_time) / 1000;
257 if (tran_diff > 0) {
258 // sample transport latency in microsecond
259 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
260 self_attr, tran_diff);
261 }
262 statistics::Statistics::Instance()->SetProcStatus(self_attr,
263 recv_time / 1000);
264 listener(msg, msg_info);
265 };
266
267 Dispatcher::AddListener<ReadableBlock>(self_attr, listener_adapter);
268 }
269 AddSegment(self_attr);
270}
uint64_t ToNanosecond() const
convert time to nanosecond.
Definition time.cc:83
static Time Now()
get the current time.
Definition time.cc:57
#define RETURN_IF(condition)
Definition log.h:106
#define AWARN
Definition log.h:43
std::enable_if< HasParseFromArenaMessageWrapper< T >::value, bool >::type ParseFromArenaMessageWrapper(ArenaMessageWrapper *wrapper, T *message, T **message_ptr)
std::enable_if< HasParseFromArray< T >::value, bool >::type ParseFromArray(const void *data, int size, T *message)

◆ AddListener() [2/2]

template<typename MessageT >
void apollo::cyber::transport::ShmDispatcher::AddListener ( const RoleAttributes self_attr,
const RoleAttributes opposite_attr,
const MessageListener< MessageT > &  listener 
)

在文件 shm_dispatcher.h273 行定义.

275 {
276 // FIXME: make it more clean
277 if (cyber::common::GlobalData::Instance()->IsChannelEnableArenaShm(
278 self_attr.channel_id()) &&
279 self_attr.message_type() != message::MessageType<message::RawMessage>() &&
280 self_attr.message_type() !=
281 message::MessageType<message::PyMessageWrap>()) {
282 auto listener_adapter = [listener, self_attr](
283 const std::shared_ptr<ReadableBlock>& rb,
284 const MessageInfo& msg_info) {
285 auto msg = std::make_shared<MessageT>();
286 auto arena_manager = ProtobufArenaManager::Instance();
287 auto msg_wrapper = arena_manager->CreateMessageWrapper();
288 memcpy(msg_wrapper->GetData(), rb->buf, 1024);
289 MessageT* msg_p;
290 if (!message::ParseFromArenaMessageWrapper(msg_wrapper.get(), msg.get(),
291 &msg_p)) {
292 AERROR << "ParseFromArenaMessageWrapper failed";
293 }
294 // msg->CopyFrom(*msg_p);
295 // msg = arena_manager->LoadMessage<MessageT>(msg_wrapper.get())
296 auto segment = arena_manager->GetSegment(self_attr.channel_id());
297 auto msg_addr = reinterpret_cast<uint64_t>(msg_p);
298 msg.reset(reinterpret_cast<MessageT*>(msg_addr),
299 [arena_manager, segment, msg_wrapper](MessageT* p) {
300 // fprintf(stderr, "msg deleter invoked\n");
301 // auto related_blocks =
302 // arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
303 // for (auto block_index : related_blocks) {
304 // // segment->ReleaseBlockForReadByIndex(block_index);
305 // segment->RemoveBlockReadLock(block_index);
306 // }
307 });
308 auto related_blocks_for_lock =
309 arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
310 for (int i = 0; i < related_blocks_for_lock.size(); ++i) {
311 auto block_index = related_blocks_for_lock[i];
312 if (!segment->AddBlockReadLock(block_index)) {
313 AWARN << "failed to acquire block for read, channel: "
314 << self_attr.channel_id() << " index: " << block_index;
315 for (int j = 0; j < i; ++j) {
316 // restore the lock
317 segment->RemoveBlockReadLock(related_blocks_for_lock[j]);
318 }
319 return;
320 }
321 }
322
323 auto send_time = msg_info.send_time();
324
325 statistics::Statistics::Instance()->AddRecvCount(self_attr,
326 msg_info.seq_num());
327 statistics::Statistics::Instance()->SetTotalMsgsStatus(
328 self_attr, msg_info.seq_num());
329
330 auto recv_time = Time::Now().ToNanosecond();
331
332 // sampling in microsecond
333 auto tran_diff = (recv_time - send_time) / 1000;
334 if (tran_diff > 0) {
335 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
336 self_attr, tran_diff);
337 }
338 statistics::Statistics::Instance()->SetProcStatus(self_attr,
339 recv_time / 1000);
340
341 listener(msg, msg_info);
342 auto related_blocks =
343 arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
344 for (auto block_index : related_blocks) {
345 // segment->ReleaseBlockForReadByIndex(block_index);
346 segment->RemoveBlockReadLock(block_index);
347 }
348 };
349
350 AddArenaListener<ReadableBlock>(self_attr, opposite_attr, listener_adapter);
351 } else {
352 auto listener_adapter = [listener, self_attr](
353 const std::shared_ptr<ReadableBlock>& rb,
354 const MessageInfo& msg_info) {
355 auto msg = std::make_shared<MessageT>();
357 rb->buf, static_cast<int>(rb->block->msg_size()), msg.get()));
358
359 auto send_time = msg_info.send_time();
360 auto msg_seq_num = msg_info.seq_num();
361
362 statistics::Statistics::Instance()->AddRecvCount(self_attr, msg_seq_num);
363 statistics::Statistics::Instance()->SetTotalMsgsStatus(self_attr,
364 msg_seq_num);
365
366 auto recv_time = Time::Now().ToNanosecond();
367
368 // sampling in microsecond
369 auto tran_diff = (recv_time - send_time) / 1000;
370 if (tran_diff > 0) {
371 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
372 self_attr, tran_diff);
373 }
374 statistics::Statistics::Instance()->SetProcStatus(self_attr,
375 recv_time / 1000);
376
377 listener(msg, msg_info);
378 };
379
380 Dispatcher::AddListener<ReadableBlock>(self_attr, opposite_attr,
381 listener_adapter);
382 }
383 AddSegment(self_attr);
384}

◆ Shutdown()

void apollo::cyber::transport::ShmDispatcher::Shutdown ( )
overridevirtual

重载 apollo::cyber::transport::Dispatcher .

在文件 shm_dispatcher.cc34 行定义.

34 {
35 if (is_shutdown_.exchange(true)) {
36 return;
37 }
38
39 if (thread_.joinable()) {
40 thread_.join();
41 }
42
43 {
44 ReadLockGuard<AtomicRWLock> lock(segments_lock_);
45 segments_.clear();
46 }
47}

该类的文档由以下文件生成: