30 channel_id_(channel_id),
33 arena_blocks_(nullptr),
34 managed_shm_(nullptr),
36 arena_block_buf_lock_(),
38 arena_block_buf_addrs_() {}
44 AERROR <<
"create shm failed, can't write now.";
54 AINFO <<
"msg_size: " << msg_size
55 <<
" larger than current shm_buffer_size: "
57 result = Recreate(msg_size);
61 AERROR <<
"segment update failed.";
65 uint32_t index = GetNextWritableBlockIndex();
66 writable_block->
index = index;
76 AERROR <<
"create shm failed, can't write now.";
84 uint32_t index = GetNextArenaWritableBlockIndex();
85 writable_block->
index = index;
92 auto index = writable_block.
index;
96 blocks_[index].ReleaseWriteLock();
100 auto index = writable_block.
index;
110 AERROR <<
"failed to open shared memory, can't read now.";
114 auto index = readable_block->
index;
116 AERROR <<
"invalid block_index[" << index <<
"].";
126 AERROR <<
"segment update failed.";
130 if (!
blocks_[index].TryLockForRead()) {
141 AERROR <<
"failed to open shared memory, can't read now.";
145 auto index = readable_block->
index;
147 AERROR <<
"invalid arena block_index[" << index <<
"].";
157 AERROR <<
"segment update failed.";
170 auto index = readable_block.
index;
178 auto index = readable_block.
index;
182 blocks_[index].ReleaseReadLock();
202 return blocks_[block_index].TryLockForWrite();
209 blocks_[block_index].ReleaseWriteLock();
217 return blocks_[block_index].TryLockForRead();
224 blocks_[block_index].ReleaseReadLock();
267 if (reference_counts == 0) {
278bool Segment::Remap() {
280 ADEBUG <<
"before reset.";
286bool Segment::Recreate(
const uint64_t& msg_size) {
295uint32_t Segment::GetNextWritableBlockIndex() {
299 if (
blocks_[try_idx].TryLockForWrite()) {
306uint32_t Segment::GetNextArenaWritableBlockIndex() {
virtual bool OpenOnly()=0
bool AcquireBlockToRead(ReadableBlock *readable_block)
bool ReleaseArenaBlockForReadByIndex(uint64_t block_index)
bool AcquireArenaBlockToWrite(std::size_t msg_size, WritableBlock *writable_block)
Segment(uint64_t channel_id)
bool LockArenaBlockForReadByIndex(uint64_t block_index)
void ReleaseReadBlock(const ReadableBlock &readable_block)
bool ReleaseBlockForReadByIndex(uint64_t block_index)
bool InitOnly(uint64_t message_size)
bool AcquireBlockToWrite(std::size_t msg_size, WritableBlock *writable_block)
bool LockArenaBlockForWriteByIndex(uint64_t block_index)
std::unordered_map< uint32_t, uint8_t * > block_buf_addrs_
bool LockBlockForWriteByIndex(uint64_t block_index)
bool ReleaseArenaBlockForWriteByIndex(uint64_t block_index)
virtual bool OpenOrCreate()=0
bool AcquireArenaBlockToRead(ReadableBlock *readable_block)
bool ReleaseBlockForWriteByIndex(uint64_t block_index)
void ReleaseWrittenBlock(const WritableBlock &writable_block)
bool LockBlockForReadByIndex(uint64_t block_index)
std::unordered_map< uint32_t, uint8_t * > arena_block_buf_addrs_
void ReleaseArenaWrittenBlock(const WritableBlock &writable_block)
void ReleaseArenaReadBlock(const ReadableBlock &readable_block)
static const uint32_t ARENA_BLOCK_NUM
const uint64_t & ceiling_msg_size()
const uint32_t & block_num()
void Update(const uint64_t &real_msg_size)
uint32_t FetchAddArenaSeq(uint32_t diff)
uint32_t reference_counts()
void set_need_remap(bool need)
void DecreaseReferenceCounts()
uint32_t FetchAddSeq(uint32_t diff)
#define RETURN_VAL_IF_NULL(ptr, val)