17#ifndef CYBER_BLOCKER_BLOCKER_H_
18#define CYBER_BLOCKER_BLOCKER_H_
26#include <unordered_map>
72 using CallbackMap = std::unordered_map<std::string, Callback>;
73 using Iterator =
typename std::list<std::shared_ptr<T>>::const_iterator;
88 bool Unsubscribe(
const std::string& callback_id)
override;
103 void Reset()
override;
110 mutable std::mutex msg_mutex_;
113 mutable std::mutex cb_mutex_;
123 published_msg_queue_.clear();
124 observed_msg_queue_.clear();
125 published_callbacks_.clear();
130 Publish(std::make_shared<MessageType>(msg));
142 std::lock_guard<std::mutex> lock(msg_mutex_);
143 observed_msg_queue_.clear();
144 published_msg_queue_.clear();
147 std::lock_guard<std::mutex> lock(cb_mutex_);
148 published_callbacks_.clear();
154 std::lock_guard<std::mutex> lock(msg_mutex_);
155 observed_msg_queue_.clear();
160 std::lock_guard<std::mutex> lock(msg_mutex_);
161 published_msg_queue_.clear();
166 std::lock_guard<std::mutex> lock(msg_mutex_);
167 observed_msg_queue_ = published_msg_queue_;
172 std::lock_guard<std::mutex> lock(msg_mutex_);
173 return observed_msg_queue_.empty();
178 std::lock_guard<std::mutex> lock(msg_mutex_);
179 return published_msg_queue_.empty();
185 std::lock_guard<std::mutex> lock(cb_mutex_);
186 if (published_callbacks_.find(callback_id) != published_callbacks_.end()) {
189 published_callbacks_[callback_id] = callback;
195 std::lock_guard<std::mutex> lock(cb_mutex_);
196 return published_callbacks_.erase(callback_id) != 0;
201 std::lock_guard<std::mutex> lock(msg_mutex_);
202 if (observed_msg_queue_.empty()) {
205 return *observed_msg_queue_.front();
210 std::lock_guard<std::mutex> lock(msg_mutex_);
211 if (observed_msg_queue_.empty()) {
214 return observed_msg_queue_.front();
219 std::lock_guard<std::mutex> lock(msg_mutex_);
220 if (observed_msg_queue_.empty()) {
223 return observed_msg_queue_.back();
228 std::lock_guard<std::mutex> lock(msg_mutex_);
229 if (published_msg_queue_.empty()) {
232 return published_msg_queue_.front();
237 return observed_msg_queue_.begin();
242 return observed_msg_queue_.end();
247 return attr_.capacity;
252 std::lock_guard<std::mutex> lock(msg_mutex_);
253 attr_.capacity = capacity;
254 while (published_msg_queue_.size() > capacity) {
255 published_msg_queue_.pop_back();
261 return attr_.channel_name;
266 if (attr_.capacity == 0) {
269 std::lock_guard<std::mutex> lock(msg_mutex_);
270 published_msg_queue_.push_front(msg);
271 while (published_msg_queue_.size() > attr_.capacity) {
272 published_msg_queue_.pop_back();
277void Blocker<T>::Notify(
const MessagePtr& msg) {
278 std::lock_guard<std::mutex> lock(cb_mutex_);
279 for (
const auto& item : published_callbacks_) {
virtual ~BlockerBase()=default
virtual const std::string & channel_name() const =0
virtual bool IsPublishedEmpty() const =0
virtual size_t capacity() const =0
virtual void ClearPublished()=0
virtual bool Unsubscribe(const std::string &callback_id)=0
virtual void set_capacity(size_t capacity)=0
virtual void ClearObserved()=0
virtual bool IsObservedEmpty() const =0
Blocker(const BlockerAttr &attr)
size_t capacity() const override
void ClearObserved() override
typename std::list< std::shared_ptr< T > >::const_iterator Iterator
bool Unsubscribe(const std::string &callback_id) override
std::list< MessagePtr > MessageQueue
void ClearPublished() override
Iterator ObservedEnd() const
bool IsObservedEmpty() const override
void Publish(const MessageType &msg)
bool Subscribe(const std::string &callback_id, const Callback &callback)
const MessagePtr GetOldestObservedPtr() const
const MessagePtr GetLatestPublishedPtr() const
const MessageType & GetLatestObserved() const
std::function< void(const MessagePtr &)> Callback
const MessagePtr GetLatestObservedPtr() const
bool IsPublishedEmpty() const override
std::unordered_map< std::string, Callback > CallbackMap
std::shared_ptr< T > MessagePtr
const std::string & channel_name() const override
void set_capacity(size_t capacity) override
Iterator ObservedBegin() const
BlockerAttr(const BlockerAttr &attr)
BlockerAttr(size_t cap, const std::string &channel)
BlockerAttr(const std::string &channel)