Apollo 10.0
自动驾驶开放平台
shm_dispatcher.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
20#include "cyber/common/util.h"
23
24namespace apollo {
25namespace cyber {
26namespace transport {
27
28using common::GlobalData;
29
30ShmDispatcher::ShmDispatcher() : host_id_(0) { Init(); }
31
32ShmDispatcher::~ShmDispatcher() { Shutdown(); }
33
34void ShmDispatcher::Shutdown() {
35 if (is_shutdown_.exchange(true)) {
36 return;
37 }
38
39 if (thread_.joinable()) {
40 thread_.join();
41 }
42
43 {
44 ReadLockGuard<AtomicRWLock> lock(segments_lock_);
45 segments_.clear();
46 }
47}
48
49void ShmDispatcher::AddSegment(const RoleAttributes& self_attr) {
50 uint64_t channel_id = self_attr.channel_id();
51 WriteLockGuard<AtomicRWLock> lock(segments_lock_);
52 if (segments_.count(channel_id) > 0) {
53 return;
54 }
55 auto segment = SegmentFactory::CreateSegment(channel_id);
56 segments_[channel_id] = segment;
57 previous_indexes_[channel_id] = UINT32_MAX;
58 arena_previous_indexes_[channel_id] = UINT32_MAX;
59}
60
61void ShmDispatcher::ReadMessage(uint64_t channel_id, uint32_t block_index) {
62 ADEBUG << "Reading sharedmem message: "
63 << GlobalData::GetChannelById(channel_id)
64 << " from block: " << block_index;
65 auto rb = std::make_shared<ReadableBlock>();
66 rb->index = block_index;
67 if (!segments_[channel_id]->AcquireBlockToRead(rb.get())) {
68 AWARN << "fail to acquire block, channel: "
69 << GlobalData::GetChannelById(channel_id)
70 << " index: " << block_index;
71 return;
72 }
73
74 MessageInfo msg_info;
75 const char* msg_info_addr =
76 reinterpret_cast<char*>(rb->buf) + rb->block->msg_size();
77
78 if (msg_info.DeserializeFrom(msg_info_addr, rb->block->msg_info_size())) {
79 OnMessage(channel_id, rb, msg_info);
80 } else {
81 AERROR << "error msg info of channel:"
82 << GlobalData::GetChannelById(channel_id);
83 }
84 segments_[channel_id]->ReleaseReadBlock(*rb);
85}
86
87void ShmDispatcher::ReadArenaMessage(uint64_t channel_id,
88 uint32_t arena_block_index) {
89 ADEBUG << "Reading sharedmem arena message: "
90 << GlobalData::GetChannelById(channel_id)
91 << " from block: " << arena_block_index;
92 auto rb = std::make_shared<ReadableBlock>();
93 rb->index = arena_block_index;
94 if (!segments_[channel_id]->AcquireArenaBlockToRead(rb.get())) {
95 AWARN << "fail to acquire block, channel: "
96 << GlobalData::GetChannelById(channel_id)
97 << " index: " << arena_block_index;
98 return;
99 }
100
101 MessageInfo msg_info;
102 const char* msg_info_addr =
103 reinterpret_cast<char*>(rb->buf) + rb->block->msg_size();
104 if (msg_info.DeserializeFrom(msg_info_addr, rb->block->msg_info_size())) {
105 OnArenaMessage(channel_id, rb, msg_info);
106 } else {
107 AERROR << "error msg info of channel:"
108 << GlobalData::GetChannelById(channel_id);
109 }
110 segments_[channel_id]->ReleaseArenaReadBlock(*rb);
111}
112
113void ShmDispatcher::OnMessage(uint64_t channel_id,
114 const std::shared_ptr<ReadableBlock>& rb,
115 const MessageInfo& msg_info) {
116 if (is_shutdown_.load()) {
117 return;
118 }
119 ListenerHandlerBasePtr* handler_base = nullptr;
120 if (msg_listeners_.Get(channel_id, &handler_base)) {
121 auto handler = std::dynamic_pointer_cast<ListenerHandler<ReadableBlock>>(
122 *handler_base);
123 handler->Run(rb, msg_info);
124 } else {
125 if (!arena_msg_listeners_.Get(channel_id, &handler_base)) {
126 AERROR << "Cannot find " << GlobalData::GetChannelById(channel_id)
127 << "'s handler.";
128 }
129 }
130}
131
132void ShmDispatcher::OnArenaMessage(uint64_t channel_id,
133 const std::shared_ptr<ReadableBlock>& rb,
134 const MessageInfo& msg_info) {
135 if (is_shutdown_.load()) {
136 return;
137 }
138 ListenerHandlerBasePtr* handler_base = nullptr;
139 if (arena_msg_listeners_.Get(channel_id, &handler_base)) {
140 auto handler = std::dynamic_pointer_cast<ListenerHandler<ReadableBlock>>(
141 *handler_base);
142 handler->Run(rb, msg_info);
143 } else {
144 if (!msg_listeners_.Get(channel_id, &handler_base)) {
145 AERROR << "Cannot find " << GlobalData::GetChannelById(channel_id)
146 << "'s handler.";
147 }
148 }
149}
150
151void ShmDispatcher::ThreadFunc() {
152 ReadableInfo readable_info;
153 while (!is_shutdown_.load()) {
154 if (!notifier_->Listen(100, &readable_info)) {
155 ADEBUG << "listen failed.";
156 continue;
157 }
158
159 if (readable_info.host_id() != host_id_) {
160 ADEBUG << "shm readable info from other host.";
161 continue;
162 }
163
164 uint64_t channel_id = readable_info.channel_id();
165 int32_t block_index = readable_info.block_index();
166 int32_t arena_block_index = readable_info.arena_block_index();
167
168 {
169 ReadLockGuard<AtomicRWLock> lock(segments_lock_);
170 if (segments_.count(channel_id) == 0) {
171 continue;
172 }
173
174 if (block_index != -1) {
175 // check block index
176 if (previous_indexes_.count(channel_id) == 0) {
177 previous_indexes_[channel_id] = UINT32_MAX;
178 }
179 uint32_t& previous_index = previous_indexes_[channel_id];
180 if (block_index != 0 && previous_index != UINT32_MAX) {
181 if (block_index == previous_index) {
182 ADEBUG << "Receive SAME index " << block_index << " of channel "
183 << channel_id;
184 } else if (block_index < previous_index) {
185 ADEBUG << "Receive PREVIOUS message. last: " << previous_index
186 << ", now: " << block_index;
187 } else if (block_index - previous_index > 1) {
188 ADEBUG << "Receive JUMP message. last: " << previous_index
189 << ", now: " << block_index;
190 }
191 }
192 previous_index = block_index;
193 ReadMessage(channel_id, block_index);
194 }
195
196 if (arena_block_index != -1) {
197 if (arena_previous_indexes_.count(channel_id) == 0) {
198 arena_previous_indexes_[channel_id] = UINT32_MAX;
199 }
200 uint32_t& arena_previous_index = arena_previous_indexes_[channel_id];
201 if (arena_block_index != 0 && arena_previous_index != UINT32_MAX) {
202 if (arena_block_index == arena_previous_index) {
203 ADEBUG << "Receive SAME index " << arena_block_index
204 << " of channel " << channel_id;
205 } else if (arena_block_index < arena_previous_index) {
206 ADEBUG << "Receive PREVIOUS message. last: " << arena_previous_index
207 << ", now: " << arena_block_index;
208 } else if (arena_block_index - arena_previous_index > 1) {
209 ADEBUG << "Receive JUMP message. last: " << arena_previous_index
210 << ", now: " << arena_block_index;
211 }
212 }
213 arena_previous_index = arena_block_index;
214 ReadArenaMessage(channel_id, arena_block_index);
215 }
216 }
217 }
218}
219
220bool ShmDispatcher::Init() {
221 host_id_ = common::Hash(GlobalData::Instance()->HostIp());
222 notifier_ = NotifierFactory::CreateNotifier();
223 thread_ = std::thread(&ShmDispatcher::ThreadFunc, this);
224 scheduler::Instance()->SetInnerThreadAttr("shm_disp", &thread_);
225 // statistics::Statistics::Instance()->CreateSpan("protobuf_parse_time");
226 return true;
227}
228
229} // namespace transport
230} // namespace cyber
231} // namespace apollo
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
#define AWARN
Definition log.h:43
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr
bool Init(const char *binary_name, const std::string &dag_info)
Definition init.cc:98
class register implement
Definition arena_queue.h:37