Apollo 10.0
自动驾驶开放平台
shm_transmitter.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_TRANSPORT_TRANSMITTER_SHM_TRANSMITTER_H_
18#define CYBER_TRANSPORT_TRANSMITTER_SHM_TRANSMITTER_H_
19
20#include <cstring>
21#include <iostream>
22#include <memory>
23#include <string>
24#include <type_traits>
25
27#include "cyber/common/log.h"
28#include "cyber/common/util.h"
37
38namespace apollo {
39namespace cyber {
40namespace transport {
41
42template <typename T, typename U>
43struct type_check : std::is_same<typename std::decay<T>::type, U>::type {};
44
45template <typename M>
46class ShmTransmitter : public Transmitter<M> {
47 public:
48 using MessagePtr = std::shared_ptr<M>;
49
50 explicit ShmTransmitter(const RoleAttributes& attr);
51 virtual ~ShmTransmitter();
52
53 void Enable() override;
54 void Disable() override;
55
56 void Enable(const RoleAttributes& opposite_attr);
57 void Disable(const RoleAttributes& opposite_attr);
58
59 bool Transmit(const MessagePtr& msg, const MessageInfo& msg_info) override;
60
61 bool AcquireMessage(std::shared_ptr<M>& msg);
62
63 private:
64 bool Transmit(const M& msg, const MessageInfo& msg_info);
65
66 SegmentPtr segment_;
67 uint64_t channel_id_;
68 uint64_t host_id_;
69 NotifierPtr notifier_;
70 std::atomic<int> serialized_receiver_count_;
71 std::atomic<int> arena_receiver_count_;
72 bool arena_transmit_;
73};
74
75template <typename M>
76bool ShmTransmitter<M>::AcquireMessage(std::shared_ptr<M>& msg) {
77 if (this->enabled_) {
78 auto msg_o = msg.get();
79 auto arena_manager = ProtobufArenaManager::Instance();
80 if (!arena_manager->Enable() ||
81 !arena_manager->EnableSegment(this->attr_.channel_id())) {
82 ADEBUG << "arena manager enable failed.";
83 return false;
84 }
85 arena_manager->AcquireArenaMessage(channel_id_, msg);
86 if (msg.get() != msg_o) {
87 return true;
88 } else {
89 return false;
90 }
91 }
92 return false;
93}
94
95template <typename M>
97 : Transmitter<M>(attr),
98 segment_(nullptr),
99 channel_id_(attr.channel_id()),
100 notifier_(nullptr),
101 serialized_receiver_count_(0),
102 arena_receiver_count_(0) {
103 host_id_ = common::Hash(attr.host_ip());
104 arena_transmit_ = common::GlobalData::Instance()->IsChannelEnableArenaShm(
105 this->attr_.channel_id()) &&
108}
109
110template <typename M>
114
115template <typename M>
116void ShmTransmitter<M>::Enable(const RoleAttributes& opposite_attr) {
117 if (arena_transmit_) {
118 if (opposite_attr.message_type() ==
119 message::MessageType<message::RawMessage>() ||
120 opposite_attr.message_type() ==
121 message::MessageType<message::PyMessageWrap>()) {
122 serialized_receiver_count_.fetch_add(1);
123 } else {
124 arena_receiver_count_.fetch_add(1);
125 }
126 } else {
127 serialized_receiver_count_.fetch_add(1);
128 }
129 if (!this->enabled_) {
130 this->Enable();
131 }
132}
133
134template <typename M>
135void ShmTransmitter<M>::Disable(const RoleAttributes& opposite_attr) {
136 if (this->enabled_) {
137 if (arena_transmit_) {
138 if (opposite_attr.message_type() ==
139 message::MessageType<message::RawMessage>() ||
140 opposite_attr.message_type() ==
141 message::MessageType<message::PyMessageWrap>()) {
142 serialized_receiver_count_.fetch_sub(1);
143 } else {
144 arena_receiver_count_.fetch_sub(1);
145 }
146 if (serialized_receiver_count_.load() <= 0 &&
147 arena_receiver_count_.load() <= 0) {
148 this->Disable();
149 }
150 } else {
151 serialized_receiver_count_.fetch_sub(1);
152 if (serialized_receiver_count_.load() <= 0) {
153 this->Disable();
154 }
155 }
156 }
157}
158
159template <typename M>
161 if (this->enabled_) {
162 return;
163 }
164
165 if (serialized_receiver_count_.load() == 0 &&
166 arena_receiver_count_.load() == 0) {
167 AERROR << "please enable shm transmitter by passing role attr.";
168 return;
169 }
170
171 if (arena_transmit_) {
172 auto arena_manager = ProtobufArenaManager::Instance();
173 if (!arena_manager->Enable() ||
174 !arena_manager->EnableSegment(this->attr_.channel_id())) {
175 AERROR << "arena manager enable failed.";
176 return;
177 }
178 }
179
180 segment_ = SegmentFactory::CreateSegment(channel_id_);
182 this->enabled_ = true;
183}
184
185template <typename M>
187 if (this->enabled_) {
188 segment_ = nullptr;
189 notifier_ = nullptr;
190 this->enabled_ = false;
191 }
192}
193
194template <typename M>
196 const MessageInfo& msg_info) {
197 return Transmit(*msg, msg_info);
198}
199
200template <typename M>
201bool ShmTransmitter<M>::Transmit(const M& msg, const MessageInfo& msg_info) {
202 if (!this->enabled_) {
203 ADEBUG << "not enable.";
204 return false;
205 }
206
207 ReadableInfo readable_info;
208 WritableBlock arena_wb;
209 WritableBlock wb;
210
211 readable_info.set_host_id(host_id_);
212 readable_info.set_channel_id(channel_id_);
213 readable_info.set_arena_block_index(-1);
214 readable_info.set_block_index(-1);
215
216 if (arena_transmit_) {
217 // std::size_t msg_size = sizeof(message::ArenaMessageWrapper);
218 std::size_t msg_size = 1024;
219 if (!segment_->AcquireArenaBlockToWrite(msg_size, &arena_wb)) {
220 AERROR << "acquire block failed.";
221 return false;
222 }
223
224 ADEBUG << "arena block index: " << arena_wb.index;
225 auto arena_manager = ProtobufArenaManager::Instance();
226 auto msg_wrapper = arena_manager->CreateMessageWrapper();
227 arena_manager->SetMessageChannelId(msg_wrapper.get(), channel_id_);
228 M* msg_p;
229 // arena_manager->CreateMessage(msg_wrapper.get(), msg);
230 if (!message::SerializeToArenaMessageWrapper(msg, msg_wrapper.get(),
231 &msg_p)) {
232 AERROR << "serialize to arena message wrapper failed.";
233 segment_->ReleaseArenaWrittenBlock(arena_wb);
234 return false;
235 }
236 auto segment = arena_manager->GetSegment(channel_id_);
237 // auto msg_n =
238 // std::shared_ptr<M>(
239 // msg_p, [arena_manager, segment, msg_wrapper](M* p) {
240 // auto related_blocks =
241 // arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
242 // for (auto block_index : related_blocks) {
243 // // segment->ReleaseBlockForWriteByIndex(block_index);
244 // segment->RemoveBlockWriteLock(block_index);
245 // }
246 // });
247 // for (auto block_index :
248 // arena_manager->GetMessageRelatedBlocks(msg_wrapper.get())) {
249 // segment->LockBlockForWriteByIndex(block_index);
250 // }
251 memcpy(arena_wb.buf, msg_wrapper->GetData(), msg_size);
252 arena_wb.block->set_msg_size(msg_size);
253
254 char* msg_info_addr = reinterpret_cast<char*>(arena_wb.buf) + msg_size;
255 if (!msg_info.SerializeTo(msg_info_addr, MessageInfo::kSize)) {
256 AERROR << "serialize message info failed.";
257 segment_->ReleaseArenaWrittenBlock(arena_wb);
258 return false;
259 }
260 arena_wb.block->set_msg_info_size(MessageInfo::kSize);
261 readable_info.set_arena_block_index(arena_wb.index);
262 if (serialized_receiver_count_.load() > 0) {
263 std::size_t msg_size = message::ByteSize(msg);
264 if (!segment_->AcquireBlockToWrite(msg_size, &wb)) {
265 AERROR << "acquire block failed.";
266 return false;
267 }
268
269 ADEBUG << "block index: " << wb.index;
270 if (!message::SerializeToArray(msg, wb.buf, static_cast<int>(msg_size))) {
271 AERROR << "serialize to array failed.";
272 segment_->ReleaseWrittenBlock(wb);
273 return false;
274 }
275 wb.block->set_msg_size(msg_size);
276
277 char* msg_info_addr = reinterpret_cast<char*>(wb.buf) + msg_size;
278 if (!msg_info.SerializeTo(msg_info_addr, MessageInfo::kSize)) {
279 AERROR << "serialize message info failed.";
280 segment_->ReleaseWrittenBlock(wb);
281 return false;
282 }
283 wb.block->set_msg_info_size(MessageInfo::kSize);
284 segment_->ReleaseWrittenBlock(wb);
285 segment_->ReleaseArenaWrittenBlock(arena_wb);
286 readable_info.set_block_index(wb.index);
287 } else {
288 segment_->ReleaseArenaWrittenBlock(arena_wb);
289 }
290 } else {
291 std::size_t msg_size = message::ByteSize(msg);
292 if (!segment_->AcquireBlockToWrite(msg_size, &wb)) {
293 AERROR << "acquire block failed.";
294 return false;
295 }
296
297 ADEBUG << "block index: " << wb.index;
298 if (!message::SerializeToArray(msg, wb.buf, static_cast<int>(msg_size))) {
299 AERROR << "serialize to array failed.";
300 segment_->ReleaseWrittenBlock(wb);
301 return false;
302 }
303 wb.block->set_msg_size(msg_size);
304
305 char* msg_info_addr = reinterpret_cast<char*>(wb.buf) + msg_size;
306 if (!msg_info.SerializeTo(msg_info_addr, MessageInfo::kSize)) {
307 AERROR << "serialize message info failed.";
308 segment_->ReleaseWrittenBlock(wb);
309 return false;
310 }
311 wb.block->set_msg_info_size(MessageInfo::kSize);
312 segment_->ReleaseWrittenBlock(wb);
313 readable_info.set_block_index(wb.index);
314 }
315
316 ADEBUG << "Writing sharedmem message: "
318 << " to normal block: " << readable_info.block_index()
319 << " to arena block: " << readable_info.arena_block_index();
320 return notifier_->Notify(readable_info);
321}
322
323} // namespace transport
324} // namespace cyber
325} // namespace apollo
326
327#endif // CYBER_TRANSPORT_TRANSMITTER_SHM_TRANSMITTER_H_
static std::string GetChannelById(uint64_t id)
static const std::size_t kSize
bool SerializeTo(std::string *dst) const
static SegmentPtr CreateSegment(uint64_t channel_id)
ShmTransmitter(const RoleAttributes &attr)
bool Transmit(const MessagePtr &msg, const MessageInfo &msg_info) override
bool AcquireMessage(std::shared_ptr< M > &msg)
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
std::size_t Hash(const std::string &key)
Definition util.h:27
std::enable_if< HasSerializeToArray< T >::value, bool >::type SerializeToArray(const T &message, void *data, int size)
std::enable_if< HasByteSize< T >::value, int >::type ByteSize(const T &message)
std::enable_if< HasSerializeToArenaMessageWrapper< T >::value, bool >::type SerializeToArenaMessageWrapper(const T &message, ArenaMessageWrapper *wrapper, T **message_ptr)
std::shared_ptr< Segment > SegmentPtr
Definition segment.h:34
class register implement
Definition arena_queue.h:37