17#ifndef CYBER_TRANSPORT_TRANSMITTER_SHM_TRANSMITTER_H_
18#define CYBER_TRANSPORT_TRANSMITTER_SHM_TRANSMITTER_H_
42template <
typename T,
typename U>
43struct type_check : std::is_same<typename std::decay<T>::type, U>::type {};
70 std::atomic<int> serialized_receiver_count_;
71 std::atomic<int> arena_receiver_count_;
78 auto msg_o = msg.get();
79 auto arena_manager = ProtobufArenaManager::Instance();
80 if (!arena_manager->Enable() ||
81 !arena_manager->EnableSegment(this->attr_.channel_id())) {
82 ADEBUG <<
"arena manager enable failed.";
85 arena_manager->AcquireArenaMessage(channel_id_, msg);
86 if (msg.get() != msg_o) {
99 channel_id_(attr.channel_id()),
101 serialized_receiver_count_(0),
102 arena_receiver_count_(0) {
104 arena_transmit_ = common::GlobalData::Instance()->IsChannelEnableArenaShm(
117 if (arena_transmit_) {
119 message::MessageType<message::RawMessage>() ||
121 message::MessageType<message::PyMessageWrap>()) {
122 serialized_receiver_count_.fetch_add(1);
124 arena_receiver_count_.fetch_add(1);
127 serialized_receiver_count_.fetch_add(1);
129 if (!this->enabled_) {
136 if (this->enabled_) {
137 if (arena_transmit_) {
139 message::MessageType<message::RawMessage>() ||
141 message::MessageType<message::PyMessageWrap>()) {
142 serialized_receiver_count_.fetch_sub(1);
144 arena_receiver_count_.fetch_sub(1);
146 if (serialized_receiver_count_.load() <= 0 &&
147 arena_receiver_count_.load() <= 0) {
151 serialized_receiver_count_.fetch_sub(1);
152 if (serialized_receiver_count_.load() <= 0) {
161 if (this->enabled_) {
165 if (serialized_receiver_count_.load() == 0 &&
166 arena_receiver_count_.load() == 0) {
167 AERROR <<
"please enable shm transmitter by passing role attr.";
171 if (arena_transmit_) {
172 auto arena_manager = ProtobufArenaManager::Instance();
173 if (!arena_manager->Enable() ||
174 !arena_manager->EnableSegment(this->attr_.channel_id())) {
175 AERROR <<
"arena manager enable failed.";
182 this->enabled_ =
true;
187 if (this->enabled_) {
190 this->enabled_ =
false;
197 return Transmit(*msg, msg_info);
202 if (!this->enabled_) {
207 ReadableInfo readable_info;
208 WritableBlock arena_wb;
211 readable_info.set_host_id(host_id_);
212 readable_info.set_channel_id(channel_id_);
213 readable_info.set_arena_block_index(-1);
214 readable_info.set_block_index(-1);
216 if (arena_transmit_) {
218 std::size_t msg_size = 1024;
219 if (!segment_->AcquireArenaBlockToWrite(msg_size, &arena_wb)) {
220 AERROR <<
"acquire block failed.";
224 ADEBUG <<
"arena block index: " << arena_wb.index;
225 auto arena_manager = ProtobufArenaManager::Instance();
226 auto msg_wrapper = arena_manager->CreateMessageWrapper();
227 arena_manager->SetMessageChannelId(msg_wrapper.get(), channel_id_);
232 AERROR <<
"serialize to arena message wrapper failed.";
233 segment_->ReleaseArenaWrittenBlock(arena_wb);
236 auto segment = arena_manager->GetSegment(channel_id_);
251 memcpy(arena_wb.buf, msg_wrapper->GetData(), msg_size);
252 arena_wb.block->set_msg_size(msg_size);
254 char* msg_info_addr =
reinterpret_cast<char*
>(arena_wb.buf) + msg_size;
256 AERROR <<
"serialize message info failed.";
257 segment_->ReleaseArenaWrittenBlock(arena_wb);
261 readable_info.set_arena_block_index(arena_wb.index);
262 if (serialized_receiver_count_.load() > 0) {
264 if (!segment_->AcquireBlockToWrite(msg_size, &wb)) {
265 AERROR <<
"acquire block failed.";
269 ADEBUG <<
"block index: " << wb.index;
271 AERROR <<
"serialize to array failed.";
272 segment_->ReleaseWrittenBlock(wb);
275 wb.block->set_msg_size(msg_size);
277 char* msg_info_addr =
reinterpret_cast<char*
>(wb.buf) + msg_size;
279 AERROR <<
"serialize message info failed.";
280 segment_->ReleaseWrittenBlock(wb);
284 segment_->ReleaseWrittenBlock(wb);
285 segment_->ReleaseArenaWrittenBlock(arena_wb);
286 readable_info.set_block_index(wb.index);
288 segment_->ReleaseArenaWrittenBlock(arena_wb);
292 if (!segment_->AcquireBlockToWrite(msg_size, &wb)) {
293 AERROR <<
"acquire block failed.";
297 ADEBUG <<
"block index: " << wb.index;
299 AERROR <<
"serialize to array failed.";
300 segment_->ReleaseWrittenBlock(wb);
303 wb.block->set_msg_size(msg_size);
305 char* msg_info_addr =
reinterpret_cast<char*
>(wb.buf) + msg_size;
307 AERROR <<
"serialize message info failed.";
308 segment_->ReleaseWrittenBlock(wb);
312 segment_->ReleaseWrittenBlock(wb);
313 readable_info.set_block_index(wb.index);
316 ADEBUG <<
"Writing sharedmem message: "
318 <<
" to normal block: " << readable_info.block_index()
319 <<
" to arena block: " << readable_info.arena_block_index();
320 return notifier_->Notify(readable_info);
static std::string GetChannelById(uint64_t id)
static const std::size_t kSize
bool SerializeTo(std::string *dst) const
static NotifierPtr CreateNotifier()
static SegmentPtr CreateSegment(uint64_t channel_id)
std::shared_ptr< M > MessagePtr
ShmTransmitter(const RoleAttributes &attr)
virtual ~ShmTransmitter()
bool Transmit(const MessagePtr &msg, const MessageInfo &msg_info) override
bool AcquireMessage(std::shared_ptr< M > &msg)
std::size_t Hash(const std::string &key)
std::enable_if< HasSerializeToArray< T >::value, bool >::type SerializeToArray(const T &message, void *data, int size)
std::enable_if< HasByteSize< T >::value, int >::type ByteSize(const T &message)
std::enable_if< HasSerializeToArenaMessageWrapper< T >::value, bool >::type SerializeToArenaMessageWrapper(const T &message, ArenaMessageWrapper *wrapper, T **message_ptr)
std::shared_ptr< Segment > SegmentPtr
optional string message_type
optional uint64 channel_id