Apollo 10.0
自动驾驶开放平台
shm_dispatcher.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_TRANSPORT_DISPATCHER_SHM_DISPATCHER_H_
18#define CYBER_TRANSPORT_DISPATCHER_SHM_DISPATCHER_H_
19
20#include <cstring>
21#include <memory>
22#include <string>
23#include <thread>
24#include <unordered_map>
25
28#include "cyber/common/log.h"
29#include "cyber/common/macros.h"
34#include "cyber/time/time.h"
39
40namespace apollo {
41namespace cyber {
42namespace transport {
43
44class ShmDispatcher;
49
50class ShmDispatcher : public Dispatcher {
51 public:
52 // key: channel_id
53 using SegmentContainer = std::unordered_map<uint64_t, SegmentPtr>;
54
55 virtual ~ShmDispatcher();
56
57 void Shutdown() override;
58
59 template <typename MessageT>
60 void AddListener(const RoleAttributes& self_attr,
61 const MessageListener<MessageT>& listener);
62
63 template <typename MessageT>
64 void AddListener(const RoleAttributes& self_attr,
65 const RoleAttributes& opposite_attr,
66 const MessageListener<MessageT>& listener);
67
68 template <typename MessageT>
69 void AddArenaListener(const RoleAttributes& self_attr,
70 const MessageListener<MessageT>& listener);
71
72 template <typename MessageT>
73 void AddArenaListener(const RoleAttributes& self_attr,
74 const RoleAttributes& opposite_attr,
75 const MessageListener<MessageT>& listener);
76
77 private:
78 void AddSegment(const RoleAttributes& self_attr);
79 void ReadMessage(uint64_t channel_id, uint32_t block_index);
80 void OnMessage(uint64_t channel_id, const std::shared_ptr<ReadableBlock>& rb,
81 const MessageInfo& msg_info);
82 void ReadArenaMessage(uint64_t channel_id, uint32_t arena_block_index);
83 void OnArenaMessage(uint64_t channel_id,
84 const std::shared_ptr<ReadableBlock>& rb,
85 const MessageInfo& msg_info);
86 void ThreadFunc();
87 bool Init();
88
89 uint64_t host_id_;
90 SegmentContainer segments_;
91 std::unordered_map<uint64_t, uint32_t> previous_indexes_;
92 std::unordered_map<uint64_t, uint32_t> arena_previous_indexes_;
94 AtomicRWLock segments_lock_;
95 std::thread thread_;
96 NotifierPtr notifier_;
97
99};
100
101template <typename MessageT>
103 const RoleAttributes& self_attr,
104 const MessageListener<MessageT>& listener) {
105 if (is_shutdown_.load()) {
106 return;
107 }
108 uint64_t channel_id = self_attr.channel_id();
109
110 std::shared_ptr<ListenerHandler<MessageT>> handler;
111 ListenerHandlerBasePtr* handler_base = nullptr;
112 if (arena_msg_listeners_.Get(channel_id, &handler_base)) {
113 handler =
114 std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
115 if (handler == nullptr) {
116 AERROR << "please ensure that readers with the same channel["
117 << self_attr.channel_name()
118 << "] in the same process have the same message type";
119 return;
120 }
121 } else {
122 ADEBUG << "new reader for channel:"
123 << GlobalData::GetChannelById(channel_id);
124 handler.reset(new ListenerHandler<MessageT>());
125 arena_msg_listeners_.Set(channel_id, handler);
126 }
127 handler->Connect(self_attr.id(), listener);
128}
129
130template <typename MessageT>
132 const RoleAttributes& self_attr, const RoleAttributes& opposite_attr,
133 const MessageListener<MessageT>& listener) {
134 if (is_shutdown_.load()) {
135 return;
136 }
137 uint64_t channel_id = self_attr.channel_id();
138 std::shared_ptr<ListenerHandler<MessageT>> handler;
139 ListenerHandlerBasePtr* handler_base = nullptr;
140 if (arena_msg_listeners_.Get(channel_id, &handler_base)) {
141 handler =
142 std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base);
143 if (handler == nullptr) {
144 AERROR << "please ensuore that readers with the same channel["
145 << self_attr.channel_name()
146 << "] in the same process have the same message type";
147 return;
148 }
149 } else {
150 ADEBUG << "new reader for channel:"
151 << GlobalData::GetChannelById(channel_id);
152 handler.reset(new ListenerHandler<MessageT>());
153 arena_msg_listeners_.Set(channel_id, handler);
154 }
155 handler->Connect(self_attr.id(), listener);
156}
157
158template <typename MessageT>
160 const MessageListener<MessageT>& listener) {
161 // FIXME: make it more clean
162 if (cyber::common::GlobalData::Instance()->IsChannelEnableArenaShm(
163 self_attr.channel_id()) &&
164 self_attr.message_type() != message::MessageType<message::RawMessage>() &&
165 self_attr.message_type() !=
166 message::MessageType<message::PyMessageWrap>()) {
167 auto listener_adapter = [listener, self_attr](
168 const std::shared_ptr<ReadableBlock>& rb,
169 const MessageInfo& msg_info) {
170 auto msg = std::make_shared<MessageT>();
171 // TODO(ALL): read config from msg_info
172 auto arena_manager = ProtobufArenaManager::Instance();
173 auto msg_wrapper = arena_manager->CreateMessageWrapper();
174 memcpy(msg_wrapper->GetData(), rb->buf, 1024);
175 MessageT* msg_p;
176 if (!message::ParseFromArenaMessageWrapper(msg_wrapper.get(), msg.get(),
177 &msg_p)) {
178 AERROR << "ParseFromArenaMessageWrapper failed";
179 }
180 // msg->CopyFrom(*msg_p);
181 // msg = arena_manager->LoadMessage<MessageT>(msg_wrapper.get())
182 auto segment = arena_manager->GetSegment(self_attr.channel_id());
183 auto msg_addr = reinterpret_cast<uint64_t>(msg_p);
184 msg.reset(reinterpret_cast<MessageT*>(msg_addr),
185 [arena_manager, segment, msg_wrapper](MessageT* p) {
186 // fprintf(stderr, "msg deleter invoked\n");
187 // auto related_blocks =
188 // arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
189 // for (auto block_index : related_blocks) {
190 // // segment->ReleaseBlockForReadByIndex(block_index);
191 // segment->RemoveBlockReadLock(block_index);
192 // }
193 });
194 auto related_blocks_for_lock =
195 arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
196 for (int i = 0; i < related_blocks_for_lock.size(); ++i) {
197 auto block_index = related_blocks_for_lock[i];
198 if (!segment->AddBlockReadLock(block_index)) {
199 AWARN << "failed to acquire block for read, channel: "
200 << self_attr.channel_id() << " index: " << block_index;
201 for (int j = 0; j < i; ++j) {
202 // restore the lock
203 segment->RemoveBlockReadLock(related_blocks_for_lock[j]);
204 }
205 return;
206 }
207 }
208
209 auto send_time = msg_info.send_time();
210
211 statistics::Statistics::Instance()->AddRecvCount(self_attr,
212 msg_info.seq_num());
213 statistics::Statistics::Instance()->SetTotalMsgsStatus(
214 self_attr, msg_info.seq_num());
215
216 auto recv_time = Time::Now().ToNanosecond();
217
218 // sampling in microsecond
219 auto tran_diff = (recv_time - send_time) / 1000;
220 if (tran_diff > 0) {
221 // sample transport latency in microsecond
222 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
223 self_attr, tran_diff);
224 }
225 statistics::Statistics::Instance()->SetProcStatus(self_attr,
226 recv_time / 1000);
227 listener(msg, msg_info);
228 auto related_blocks =
229 arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
230 for (auto block_index : related_blocks) {
231 // segment->ReleaseBlockForReadByIndex(block_index);
232 segment->RemoveBlockReadLock(block_index);
233 }
234 };
235
236 AddArenaListener<ReadableBlock>(self_attr, listener_adapter);
237 } else {
238 auto listener_adapter = [listener, self_attr](
239 const std::shared_ptr<ReadableBlock>& rb,
240 const MessageInfo& msg_info) {
241 auto msg = std::make_shared<MessageT>();
242 // TODO(ALL): read config from msg_info
244 rb->buf, static_cast<int>(rb->block->msg_size()), msg.get()));
245
246 auto send_time = msg_info.send_time();
247
248 statistics::Statistics::Instance()->AddRecvCount(self_attr,
249 msg_info.seq_num());
250 statistics::Statistics::Instance()->SetTotalMsgsStatus(
251 self_attr, msg_info.seq_num());
252
253 auto recv_time = Time::Now().ToNanosecond();
254
255 // sampling in microsecond
256 auto tran_diff = (recv_time - send_time) / 1000;
257 if (tran_diff > 0) {
258 // sample transport latency in microsecond
259 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
260 self_attr, tran_diff);
261 }
262 statistics::Statistics::Instance()->SetProcStatus(self_attr,
263 recv_time / 1000);
264 listener(msg, msg_info);
265 };
266
267 Dispatcher::AddListener<ReadableBlock>(self_attr, listener_adapter);
268 }
269 AddSegment(self_attr);
270}
271
272template <typename MessageT>
274 const RoleAttributes& opposite_attr,
275 const MessageListener<MessageT>& listener) {
276 // FIXME: make it more clean
277 if (cyber::common::GlobalData::Instance()->IsChannelEnableArenaShm(
278 self_attr.channel_id()) &&
279 self_attr.message_type() != message::MessageType<message::RawMessage>() &&
280 self_attr.message_type() !=
281 message::MessageType<message::PyMessageWrap>()) {
282 auto listener_adapter = [listener, self_attr](
283 const std::shared_ptr<ReadableBlock>& rb,
284 const MessageInfo& msg_info) {
285 auto msg = std::make_shared<MessageT>();
286 auto arena_manager = ProtobufArenaManager::Instance();
287 auto msg_wrapper = arena_manager->CreateMessageWrapper();
288 memcpy(msg_wrapper->GetData(), rb->buf, 1024);
289 MessageT* msg_p;
290 if (!message::ParseFromArenaMessageWrapper(msg_wrapper.get(), msg.get(),
291 &msg_p)) {
292 AERROR << "ParseFromArenaMessageWrapper failed";
293 }
294 // msg->CopyFrom(*msg_p);
295 // msg = arena_manager->LoadMessage<MessageT>(msg_wrapper.get())
296 auto segment = arena_manager->GetSegment(self_attr.channel_id());
297 auto msg_addr = reinterpret_cast<uint64_t>(msg_p);
298 msg.reset(reinterpret_cast<MessageT*>(msg_addr),
299 [arena_manager, segment, msg_wrapper](MessageT* p) {
300 // fprintf(stderr, "msg deleter invoked\n");
301 // auto related_blocks =
302 // arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
303 // for (auto block_index : related_blocks) {
304 // // segment->ReleaseBlockForReadByIndex(block_index);
305 // segment->RemoveBlockReadLock(block_index);
306 // }
307 });
308 auto related_blocks_for_lock =
309 arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
310 for (int i = 0; i < related_blocks_for_lock.size(); ++i) {
311 auto block_index = related_blocks_for_lock[i];
312 if (!segment->AddBlockReadLock(block_index)) {
313 AWARN << "failed to acquire block for read, channel: "
314 << self_attr.channel_id() << " index: " << block_index;
315 for (int j = 0; j < i; ++j) {
316 // restore the lock
317 segment->RemoveBlockReadLock(related_blocks_for_lock[j]);
318 }
319 return;
320 }
321 }
322
323 auto send_time = msg_info.send_time();
324
325 statistics::Statistics::Instance()->AddRecvCount(self_attr,
326 msg_info.seq_num());
327 statistics::Statistics::Instance()->SetTotalMsgsStatus(
328 self_attr, msg_info.seq_num());
329
330 auto recv_time = Time::Now().ToNanosecond();
331
332 // sampling in microsecond
333 auto tran_diff = (recv_time - send_time) / 1000;
334 if (tran_diff > 0) {
335 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
336 self_attr, tran_diff);
337 }
338 statistics::Statistics::Instance()->SetProcStatus(self_attr,
339 recv_time / 1000);
340
341 listener(msg, msg_info);
342 auto related_blocks =
343 arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
344 for (auto block_index : related_blocks) {
345 // segment->ReleaseBlockForReadByIndex(block_index);
346 segment->RemoveBlockReadLock(block_index);
347 }
348 };
349
350 AddArenaListener<ReadableBlock>(self_attr, opposite_attr, listener_adapter);
351 } else {
352 auto listener_adapter = [listener, self_attr](
353 const std::shared_ptr<ReadableBlock>& rb,
354 const MessageInfo& msg_info) {
355 auto msg = std::make_shared<MessageT>();
357 rb->buf, static_cast<int>(rb->block->msg_size()), msg.get()));
358
359 auto send_time = msg_info.send_time();
360 auto msg_seq_num = msg_info.seq_num();
361
362 statistics::Statistics::Instance()->AddRecvCount(self_attr, msg_seq_num);
363 statistics::Statistics::Instance()->SetTotalMsgsStatus(self_attr,
364 msg_seq_num);
365
366 auto recv_time = Time::Now().ToNanosecond();
367
368 // sampling in microsecond
369 auto tran_diff = (recv_time - send_time) / 1000;
370 if (tran_diff > 0) {
371 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
372 self_attr, tran_diff);
373 }
374 statistics::Statistics::Instance()->SetProcStatus(self_attr,
375 recv_time / 1000);
376
377 listener(msg, msg_info);
378 };
379
380 Dispatcher::AddListener<ReadableBlock>(self_attr, opposite_attr,
381 listener_adapter);
382 }
383 AddSegment(self_attr);
384}
385
386} // namespace transport
387} // namespace cyber
388} // namespace apollo
389
390#endif // CYBER_TRANSPORT_DISPATCHER_SHM_DISPATCHER_H_
uint64_t ToNanosecond() const
convert time to nanosecond.
Definition time.cc:83
static Time Now()
get the current time.
Definition time.cc:57
A implementation of lock-free fixed size hash map
static std::string GetChannelById(uint64_t id)
void AddArenaListener(const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
void AddListener(const RoleAttributes &self_attr, const MessageListener< MessageT > &listener)
std::unordered_map< uint64_t, SegmentPtr > SegmentContainer
#define DECLARE_SINGLETON(classname)
Definition macros.h:52
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
#define RETURN_IF(condition)
Definition log.h:106
#define AWARN
Definition log.h:43
std::enable_if< HasParseFromArenaMessageWrapper< T >::value, bool >::type ParseFromArenaMessageWrapper(ArenaMessageWrapper *wrapper, T *message, T **message_ptr)
std::enable_if< HasParseFromArray< T >::value, bool >::type ParseFromArray(const void *data, int size, T *message)
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr
std::function< void(const std::shared_ptr< MessageT > &, const MessageInfo &)> MessageListener
Definition dispatcher.h:53
class register implement
Definition arena_queue.h:37