34using base::AtomicRWLock;
35using base::ReadLockGuard;
36using base::WriteLockGuard;
40 AERROR <<
"Poller init failed!";
48 if (is_shutdown_.exchange(
true)) {
55 if (is_shutdown_.load()) {
60 AERROR <<
"input is invalid";
65 ctrl_param.
fd = req.
fd;
66 ctrl_param.event.data.fd = req.
fd;
67 ctrl_param.event.events = req.
events;
71 if (requests_.count(req.
fd) == 0) {
72 ctrl_param.operation = EPOLL_CTL_ADD;
73 requests_[req.
fd] = std::make_shared<PollRequest>();
75 ctrl_param.operation = EPOLL_CTL_MOD;
77 *requests_[req.
fd] = req;
78 ctrl_params_[ctrl_param.fd] = ctrl_param;
86 if (is_shutdown_.load()) {
91 AERROR <<
"input is invalid";
97 auto size = requests_.erase(req.
fd);
99 AERROR <<
"unregister failed, can't find fd: " << req.
fd;
105 ctrl_param.
fd = req.
fd;
106 ctrl_params_[ctrl_param.
fd] = ctrl_param;
114 epoll_fd_ = epoll_create(kPollSize);
116 AERROR <<
"epoll create failed, " << strerror(errno);
121 if (pipe(pipe_fd_) == -1) {
122 AERROR <<
"create pipe failed, " << strerror(errno);
125 if (fcntl(pipe_fd_[0], F_SETFL, O_NONBLOCK) == -1) {
126 AERROR <<
"set nonblock failed, " << strerror(errno);
129 if (fcntl(pipe_fd_[1], F_SETFL, O_NONBLOCK) == -1) {
130 AERROR <<
"set nonblock failed, " << strerror(errno);
135 auto request = std::make_shared<PollRequest>();
136 request->fd = pipe_fd_[0];
137 request->events = EPOLLIN;
138 request->timeout_ms = -1;
139 request->callback = [
this](
const PollResponse&) {
141 while (read(pipe_fd_[0], &c, 1) > 0) {
144 requests_[request->fd] = request;
146 PollCtrlParam ctrl_param{};
147 ctrl_param.operation = EPOLL_CTL_ADD;
148 ctrl_param.fd = pipe_fd_[0];
149 ctrl_param.event.data.fd = pipe_fd_[0];
150 ctrl_param.event.events = EPOLLIN;
151 ctrl_params_[ctrl_param.fd] = ctrl_param;
153 is_shutdown_.store(
false);
154 thread_ = std::thread(&Poller::ThreadFunc,
this);
159void Poller::Clear() {
160 if (thread_.joinable()) {
164 if (epoll_fd_ >= 0) {
169 if (pipe_fd_[0] >= 0) {
174 if (pipe_fd_[1] >= 0) {
180 WriteLockGuard<AtomicRWLock> lck(poll_data_lock_);
182 ctrl_params_.clear();
186void Poller::Poll(
int timeout_ms) {
187 epoll_event evt[kPollSize];
189 int ready_num = epoll_wait(epoll_fd_, evt, kPollSize, timeout_ms);
192 static_cast<int>((after_time_ns - before_time_ns) / 1000000);
193 if (interval_ms == 0) {
197 std::unordered_map<int, PollResponse> responses;
199 ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
200 for (
auto& item : requests_) {
201 auto& request = item.second;
202 if (ctrl_params_.count(request->fd) != 0) {
206 if (request->timeout_ms > 0) {
207 request->timeout_ms -= interval_ms;
208 if (request->timeout_ms < 0) {
209 request->timeout_ms = 0;
213 if (request->timeout_ms == 0) {
214 responses[item.first] = PollResponse();
215 request->timeout_ms = -1;
221 for (
int i = 0; i < ready_num; ++i) {
222 int fd = evt[i].data.fd;
223 uint32_t events = evt[i].events;
224 responses[fd] = PollResponse(events);
228 for (
auto& item : responses) {
230 auto& response = item.second;
232 ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
233 auto search = requests_.find(fd);
234 if (search != requests_.end()) {
235 search->second->timeout_ms = -1;
236 search->second->callback(response);
241 if (errno != EINTR) {
242 AERROR <<
"epoll wait failed, " << strerror(errno);
247void Poller::ThreadFunc() {
250 sigfillset(&signal_set);
251 pthread_sigmask(SIG_BLOCK, &signal_set,
nullptr);
253 while (!is_shutdown_.load()) {
255 int timeout_ms = GetTimeoutMs();
256 ADEBUG <<
"this poll timeout ms: " << timeout_ms;
261void Poller::HandleChanges() {
264 ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
265 if (ctrl_params_.empty()) {
268 local_params.swap(ctrl_params_);
271 for (
auto& pair : local_params) {
272 auto& item = pair.second;
273 ADEBUG <<
"epoll ctl, op[" << item.operation <<
"] fd[" << item.fd
274 <<
"] events[" << item.event.events <<
"]";
275 if (epoll_ctl(epoll_fd_, item.operation, item.fd, &item.event) != 0 &&
277 AERROR <<
"epoll ctl failed, " << strerror(errno);
283int Poller::GetTimeoutMs() {
284 int timeout_ms = kPollTimeoutMs;
285 ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
286 for (
auto& item : requests_) {
287 auto& req = item.second;
288 if (req->timeout_ms >= 0 && req->timeout_ms < timeout_ms) {
289 timeout_ms = req->timeout_ms;
295void Poller::Notify() {
296 std::unique_lock<std::mutex> lock(pipe_mutex_, std::try_to_lock);
297 if (!lock.owns_lock()) {
302 if (write(pipe_fd_[1], &msg, 1) < 0) {
303 AWARN <<
"notify failed, " << strerror(errno);
uint64_t ToNanosecond() const
convert time to nanosecond.
static Time Now()
get the current time.
bool Unregister(const PollRequest &req)
std::unordered_map< int, PollCtrlParam > CtrlParamMap
bool Register(const PollRequest &req)
void SetInnerThreadAttr(const std::string &name, std::thread *thr)
std::function< void(const PollResponse &)> callback