59 template <
typename MessageT>
63 template <
typename MessageT>
68 template <
typename MessageT>
72 template <
typename MessageT>
79 void ReadMessage(uint64_t channel_id, uint32_t block_index);
80 void OnMessage(uint64_t channel_id,
const std::shared_ptr<ReadableBlock>& rb,
82 void ReadArenaMessage(uint64_t channel_id, uint32_t arena_block_index);
83 void OnArenaMessage(uint64_t channel_id,
84 const std::shared_ptr<ReadableBlock>& rb,
91 std::unordered_map<uint64_t, uint32_t> previous_indexes_;
92 std::unordered_map<uint64_t, uint32_t> arena_previous_indexes_;
162 if (cyber::common::GlobalData::Instance()->IsChannelEnableArenaShm(
164 self_attr.
message_type() != message::MessageType<message::RawMessage>() &&
166 message::MessageType<message::PyMessageWrap>()) {
167 auto listener_adapter = [listener, self_attr](
168 const std::shared_ptr<ReadableBlock>& rb,
170 auto msg = std::make_shared<MessageT>();
172 auto arena_manager = ProtobufArenaManager::Instance();
173 auto msg_wrapper = arena_manager->CreateMessageWrapper();
174 memcpy(msg_wrapper->GetData(), rb->buf, 1024);
178 AERROR <<
"ParseFromArenaMessageWrapper failed";
182 auto segment = arena_manager->GetSegment(self_attr.
channel_id());
183 auto msg_addr =
reinterpret_cast<uint64_t
>(msg_p);
184 msg.reset(
reinterpret_cast<MessageT*
>(msg_addr),
185 [arena_manager, segment, msg_wrapper](MessageT* p) {
194 auto related_blocks_for_lock =
195 arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
196 for (
int i = 0; i < related_blocks_for_lock.size(); ++i) {
197 auto block_index = related_blocks_for_lock[i];
198 if (!segment->AddBlockReadLock(block_index)) {
199 AWARN <<
"failed to acquire block for read, channel: "
200 << self_attr.
channel_id() <<
" index: " << block_index;
201 for (
int j = 0; j < i; ++j) {
203 segment->RemoveBlockReadLock(related_blocks_for_lock[j]);
209 auto send_time = msg_info.send_time();
211 statistics::Statistics::Instance()->AddRecvCount(self_attr,
213 statistics::Statistics::Instance()->SetTotalMsgsStatus(
214 self_attr, msg_info.seq_num());
219 auto tran_diff = (recv_time - send_time) / 1000;
222 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
223 self_attr, tran_diff);
225 statistics::Statistics::Instance()->SetProcStatus(self_attr,
227 listener(msg, msg_info);
228 auto related_blocks =
229 arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
230 for (
auto block_index : related_blocks) {
232 segment->RemoveBlockReadLock(block_index);
236 AddArenaListener<ReadableBlock>(self_attr, listener_adapter);
238 auto listener_adapter = [listener, self_attr](
239 const std::shared_ptr<ReadableBlock>& rb,
241 auto msg = std::make_shared<MessageT>();
244 rb->buf,
static_cast<int>(rb->block->msg_size()), msg.get()));
246 auto send_time = msg_info.send_time();
248 statistics::Statistics::Instance()->AddRecvCount(self_attr,
250 statistics::Statistics::Instance()->SetTotalMsgsStatus(
251 self_attr, msg_info.seq_num());
256 auto tran_diff = (recv_time - send_time) / 1000;
259 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
260 self_attr, tran_diff);
262 statistics::Statistics::Instance()->SetProcStatus(self_attr,
264 listener(msg, msg_info);
267 Dispatcher::AddListener<ReadableBlock>(self_attr, listener_adapter);
269 AddSegment(self_attr);
277 if (cyber::common::GlobalData::Instance()->IsChannelEnableArenaShm(
279 self_attr.
message_type() != message::MessageType<message::RawMessage>() &&
281 message::MessageType<message::PyMessageWrap>()) {
282 auto listener_adapter = [listener, self_attr](
283 const std::shared_ptr<ReadableBlock>& rb,
285 auto msg = std::make_shared<MessageT>();
286 auto arena_manager = ProtobufArenaManager::Instance();
287 auto msg_wrapper = arena_manager->CreateMessageWrapper();
288 memcpy(msg_wrapper->GetData(), rb->buf, 1024);
292 AERROR <<
"ParseFromArenaMessageWrapper failed";
296 auto segment = arena_manager->GetSegment(self_attr.
channel_id());
297 auto msg_addr =
reinterpret_cast<uint64_t
>(msg_p);
298 msg.reset(
reinterpret_cast<MessageT*
>(msg_addr),
299 [arena_manager, segment, msg_wrapper](MessageT* p) {
308 auto related_blocks_for_lock =
309 arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
310 for (
int i = 0; i < related_blocks_for_lock.size(); ++i) {
311 auto block_index = related_blocks_for_lock[i];
312 if (!segment->AddBlockReadLock(block_index)) {
313 AWARN <<
"failed to acquire block for read, channel: "
314 << self_attr.
channel_id() <<
" index: " << block_index;
315 for (
int j = 0; j < i; ++j) {
317 segment->RemoveBlockReadLock(related_blocks_for_lock[j]);
323 auto send_time = msg_info.send_time();
325 statistics::Statistics::Instance()->AddRecvCount(self_attr,
327 statistics::Statistics::Instance()->SetTotalMsgsStatus(
328 self_attr, msg_info.seq_num());
333 auto tran_diff = (recv_time - send_time) / 1000;
335 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
336 self_attr, tran_diff);
338 statistics::Statistics::Instance()->SetProcStatus(self_attr,
341 listener(msg, msg_info);
342 auto related_blocks =
343 arena_manager->GetMessageRelatedBlocks(msg_wrapper.get());
344 for (
auto block_index : related_blocks) {
346 segment->RemoveBlockReadLock(block_index);
350 AddArenaListener<ReadableBlock>(self_attr, opposite_attr, listener_adapter);
352 auto listener_adapter = [listener, self_attr](
353 const std::shared_ptr<ReadableBlock>& rb,
355 auto msg = std::make_shared<MessageT>();
357 rb->buf,
static_cast<int>(rb->block->msg_size()), msg.get()));
359 auto send_time = msg_info.send_time();
360 auto msg_seq_num = msg_info.seq_num();
362 statistics::Statistics::Instance()->AddRecvCount(self_attr, msg_seq_num);
363 statistics::Statistics::Instance()->SetTotalMsgsStatus(self_attr,
369 auto tran_diff = (recv_time - send_time) / 1000;
371 statistics::Statistics::Instance()->SamplingTranLatency<uint64_t>(
372 self_attr, tran_diff);
374 statistics::Statistics::Instance()->SetProcStatus(self_attr,
377 listener(msg, msg_info);
380 Dispatcher::AddListener<ReadableBlock>(self_attr, opposite_attr,
383 AddSegment(self_attr);