28using common::GlobalData;
30ShmDispatcher::ShmDispatcher() : host_id_(0) {
Init(); }
32ShmDispatcher::~ShmDispatcher() { Shutdown(); }
34void ShmDispatcher::Shutdown() {
35 if (is_shutdown_.exchange(
true)) {
39 if (thread_.joinable()) {
52 if (segments_.count(channel_id) > 0) {
55 auto segment = SegmentFactory::CreateSegment(channel_id);
56 segments_[channel_id] = segment;
57 previous_indexes_[channel_id] = UINT32_MAX;
58 arena_previous_indexes_[channel_id] = UINT32_MAX;
61void ShmDispatcher::ReadMessage(uint64_t channel_id, uint32_t block_index) {
62 ADEBUG <<
"Reading sharedmem message: "
63 << GlobalData::GetChannelById(channel_id)
64 <<
" from block: " << block_index;
65 auto rb = std::make_shared<ReadableBlock>();
66 rb->index = block_index;
67 if (!segments_[channel_id]->AcquireBlockToRead(rb.get())) {
68 AWARN <<
"fail to acquire block, channel: "
69 << GlobalData::GetChannelById(channel_id)
70 <<
" index: " << block_index;
75 const char* msg_info_addr =
76 reinterpret_cast<char*
>(rb->buf) + rb->block->msg_size();
78 if (msg_info.DeserializeFrom(msg_info_addr, rb->block->msg_info_size())) {
79 OnMessage(channel_id, rb, msg_info);
81 AERROR <<
"error msg info of channel:"
82 << GlobalData::GetChannelById(channel_id);
84 segments_[channel_id]->ReleaseReadBlock(*rb);
87void ShmDispatcher::ReadArenaMessage(uint64_t channel_id,
88 uint32_t arena_block_index) {
89 ADEBUG <<
"Reading sharedmem arena message: "
90 << GlobalData::GetChannelById(channel_id)
91 <<
" from block: " << arena_block_index;
92 auto rb = std::make_shared<ReadableBlock>();
93 rb->index = arena_block_index;
94 if (!segments_[channel_id]->AcquireArenaBlockToRead(rb.get())) {
95 AWARN <<
"fail to acquire block, channel: "
96 << GlobalData::GetChannelById(channel_id)
97 <<
" index: " << arena_block_index;
101 MessageInfo msg_info;
102 const char* msg_info_addr =
103 reinterpret_cast<char*
>(rb->buf) + rb->block->msg_size();
104 if (msg_info.DeserializeFrom(msg_info_addr, rb->block->msg_info_size())) {
105 OnArenaMessage(channel_id, rb, msg_info);
107 AERROR <<
"error msg info of channel:"
108 << GlobalData::GetChannelById(channel_id);
110 segments_[channel_id]->ReleaseArenaReadBlock(*rb);
113void ShmDispatcher::OnMessage(uint64_t channel_id,
114 const std::shared_ptr<ReadableBlock>& rb,
115 const MessageInfo& msg_info) {
116 if (is_shutdown_.load()) {
120 if (msg_listeners_.Get(channel_id, &handler_base)) {
121 auto handler = std::dynamic_pointer_cast<ListenerHandler<ReadableBlock>>(
123 handler->Run(rb, msg_info);
125 if (!arena_msg_listeners_.Get(channel_id, &handler_base)) {
126 AERROR <<
"Cannot find " << GlobalData::GetChannelById(channel_id)
132void ShmDispatcher::OnArenaMessage(uint64_t channel_id,
133 const std::shared_ptr<ReadableBlock>& rb,
134 const MessageInfo& msg_info) {
135 if (is_shutdown_.load()) {
139 if (arena_msg_listeners_.Get(channel_id, &handler_base)) {
140 auto handler = std::dynamic_pointer_cast<ListenerHandler<ReadableBlock>>(
142 handler->Run(rb, msg_info);
144 if (!msg_listeners_.Get(channel_id, &handler_base)) {
145 AERROR <<
"Cannot find " << GlobalData::GetChannelById(channel_id)
151void ShmDispatcher::ThreadFunc() {
152 ReadableInfo readable_info;
153 while (!is_shutdown_.load()) {
154 if (!notifier_->Listen(100, &readable_info)) {
155 ADEBUG <<
"listen failed.";
159 if (readable_info.host_id() != host_id_) {
160 ADEBUG <<
"shm readable info from other host.";
164 uint64_t channel_id = readable_info.channel_id();
165 int32_t block_index = readable_info.block_index();
166 int32_t arena_block_index = readable_info.arena_block_index();
169 ReadLockGuard<AtomicRWLock> lock(segments_lock_);
170 if (segments_.count(channel_id) == 0) {
174 if (block_index != -1) {
176 if (previous_indexes_.count(channel_id) == 0) {
177 previous_indexes_[channel_id] = UINT32_MAX;
179 uint32_t& previous_index = previous_indexes_[channel_id];
180 if (block_index != 0 && previous_index != UINT32_MAX) {
181 if (block_index == previous_index) {
182 ADEBUG <<
"Receive SAME index " << block_index <<
" of channel "
184 }
else if (block_index < previous_index) {
185 ADEBUG <<
"Receive PREVIOUS message. last: " << previous_index
186 <<
", now: " << block_index;
187 }
else if (block_index - previous_index > 1) {
188 ADEBUG <<
"Receive JUMP message. last: " << previous_index
189 <<
", now: " << block_index;
192 previous_index = block_index;
193 ReadMessage(channel_id, block_index);
196 if (arena_block_index != -1) {
197 if (arena_previous_indexes_.count(channel_id) == 0) {
198 arena_previous_indexes_[channel_id] = UINT32_MAX;
200 uint32_t& arena_previous_index = arena_previous_indexes_[channel_id];
201 if (arena_block_index != 0 && arena_previous_index != UINT32_MAX) {
202 if (arena_block_index == arena_previous_index) {
203 ADEBUG <<
"Receive SAME index " << arena_block_index
204 <<
" of channel " << channel_id;
205 }
else if (arena_block_index < arena_previous_index) {
206 ADEBUG <<
"Receive PREVIOUS message. last: " << arena_previous_index
207 <<
", now: " << arena_block_index;
208 }
else if (arena_block_index - arena_previous_index > 1) {
209 ADEBUG <<
"Receive JUMP message. last: " << arena_previous_index
210 <<
", now: " << arena_block_index;
213 arena_previous_index = arena_block_index;
214 ReadArenaMessage(channel_id, arena_block_index);
220bool ShmDispatcher::Init() {
221 host_id_ = common::Hash(GlobalData::Instance()->HostIp());
222 notifier_ = NotifierFactory::CreateNotifier();
223 thread_ = std::thread(&ShmDispatcher::ThreadFunc,
this);
224 scheduler::Instance()->SetInnerThreadAttr(
"shm_disp", &thread_);
std::shared_ptr< ListenerHandlerBase > ListenerHandlerBasePtr
bool Init(const char *binary_name, const std::string &dag_info)
optional uint64 channel_id