17#ifndef CYBER_DATA_CHANNEL_BUFFER_H_
18#define CYBER_DATA_CHANNEL_BUFFER_H_
42 bool Fetch(uint64_t* index, std::shared_ptr<T>& m);
46 bool FetchMulti(uint64_t fetch_size, std::vector<std::shared_ptr<T>>* vec);
49 std::shared_ptr<BufferType>
Buffer()
const {
return buffer_; }
53 std::shared_ptr<BufferType> buffer_;
58 std::shared_ptr<T>& m) {
59 std::lock_guard<std::mutex> lock(buffer_->Mutex());
60 if (buffer_->Empty()) {
65 *index = buffer_->Tail();
66 }
else if (*index == buffer_->Tail() + 1) {
68 }
else if (*index < buffer_->Head()) {
69 auto interval = buffer_->Tail() - *index;
71 <<
"read buffer overflow, drop_message[" << interval <<
"] pre_index["
72 << *index <<
"] current_index[" << buffer_->Tail() <<
"] ";
73 *index = buffer_->Tail();
75 m = buffer_->at(*index);
81 std::lock_guard<std::mutex> lock(buffer_->Mutex());
82 if (buffer_->Empty()) {
92 std::vector<std::shared_ptr<T>>* vec) {
93 std::lock_guard<std::mutex> lock(buffer_->Mutex());
94 if (buffer_->Empty()) {
98 auto num = std::min(buffer_->Size(), fetch_size);
100 for (
auto index = buffer_->Tail() - num + 1; index <= buffer_->Tail();
102 vec->emplace_back(buffer_->at(index));
static std::string GetChannelById(uint64_t id)
bool Latest(std::shared_ptr< T > &m)
std::shared_ptr< BufferType > Buffer() const
uint64_t channel_id() const
CacheBuffer< std::shared_ptr< T > > BufferType
bool Fetch(uint64_t *index, std::shared_ptr< T > &m)
bool FetchMulti(uint64_t fetch_size, std::vector< std::shared_ptr< T > > *vec)
ChannelBuffer(uint64_t channel_id, BufferType *buffer)