Apollo 10.0
自动驾驶开放平台
poller.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#include "cyber/io/poller.h"
18
19#include <fcntl.h>
20#include <sys/epoll.h>
21#include <unistd.h>
22
23#include <csignal>
24#include <cstring>
25
26#include "cyber/common/log.h"
28#include "cyber/time/time.h"
29
30namespace apollo {
31namespace cyber {
32namespace io {
33
34using base::AtomicRWLock;
35using base::ReadLockGuard;
36using base::WriteLockGuard;
37
38Poller::Poller() {
39 if (!Init()) {
40 AERROR << "Poller init failed!";
41 Clear();
42 }
43}
44
46
48 if (is_shutdown_.exchange(true)) {
49 return;
50 }
51 Clear();
52}
53
54bool Poller::Register(const PollRequest& req) {
55 if (is_shutdown_.load()) {
56 return false;
57 }
58
59 if (req.fd < 0 || req.callback == nullptr) {
60 AERROR << "input is invalid";
61 return false;
62 }
63
64 PollCtrlParam ctrl_param{};
65 ctrl_param.fd = req.fd;
66 ctrl_param.event.data.fd = req.fd;
67 ctrl_param.event.events = req.events;
68
69 {
70 WriteLockGuard<AtomicRWLock> lck(poll_data_lock_);
71 if (requests_.count(req.fd) == 0) {
72 ctrl_param.operation = EPOLL_CTL_ADD;
73 requests_[req.fd] = std::make_shared<PollRequest>();
74 } else {
75 ctrl_param.operation = EPOLL_CTL_MOD;
76 }
77 *requests_[req.fd] = req;
78 ctrl_params_[ctrl_param.fd] = ctrl_param;
79 }
80
81 Notify();
82 return true;
83}
84
86 if (is_shutdown_.load()) {
87 return false;
88 }
89
90 if (req.fd < 0 || req.callback == nullptr) {
91 AERROR << "input is invalid";
92 return false;
93 }
94
95 {
96 WriteLockGuard<AtomicRWLock> lck(poll_data_lock_);
97 auto size = requests_.erase(req.fd);
98 if (size == 0) {
99 AERROR << "unregister failed, can't find fd: " << req.fd;
100 return false;
101 }
102
103 PollCtrlParam ctrl_param;
104 ctrl_param.operation = EPOLL_CTL_DEL;
105 ctrl_param.fd = req.fd;
106 ctrl_params_[ctrl_param.fd] = ctrl_param;
107 }
108
109 Notify();
110 return true;
111}
112
113bool Poller::Init() {
114 epoll_fd_ = epoll_create(kPollSize);
115 if (epoll_fd_ < 0) {
116 AERROR << "epoll create failed, " << strerror(errno);
117 return false;
118 }
119
120 // create pipe, and set nonblock
121 if (pipe(pipe_fd_) == -1) {
122 AERROR << "create pipe failed, " << strerror(errno);
123 return false;
124 }
125 if (fcntl(pipe_fd_[0], F_SETFL, O_NONBLOCK) == -1) {
126 AERROR << "set nonblock failed, " << strerror(errno);
127 return false;
128 }
129 if (fcntl(pipe_fd_[1], F_SETFL, O_NONBLOCK) == -1) {
130 AERROR << "set nonblock failed, " << strerror(errno);
131 return false;
132 }
133
134 // add pipe[0] to epoll
135 auto request = std::make_shared<PollRequest>();
136 request->fd = pipe_fd_[0];
137 request->events = EPOLLIN;
138 request->timeout_ms = -1;
139 request->callback = [this](const PollResponse&) {
140 char c = 0;
141 while (read(pipe_fd_[0], &c, 1) > 0) {
142 }
143 };
144 requests_[request->fd] = request;
145
146 PollCtrlParam ctrl_param{};
147 ctrl_param.operation = EPOLL_CTL_ADD;
148 ctrl_param.fd = pipe_fd_[0];
149 ctrl_param.event.data.fd = pipe_fd_[0];
150 ctrl_param.event.events = EPOLLIN;
151 ctrl_params_[ctrl_param.fd] = ctrl_param;
152
153 is_shutdown_.store(false);
154 thread_ = std::thread(&Poller::ThreadFunc, this);
155 scheduler::Instance()->SetInnerThreadAttr("io_poller", &thread_);
156 return true;
157}
158
159void Poller::Clear() {
160 if (thread_.joinable()) {
161 thread_.join();
162 }
163
164 if (epoll_fd_ >= 0) {
165 close(epoll_fd_);
166 epoll_fd_ = -1;
167 }
168
169 if (pipe_fd_[0] >= 0) {
170 close(pipe_fd_[0]);
171 pipe_fd_[0] = -1;
172 }
173
174 if (pipe_fd_[1] >= 0) {
175 close(pipe_fd_[1]);
176 pipe_fd_[1] = -1;
177 }
178
179 {
180 WriteLockGuard<AtomicRWLock> lck(poll_data_lock_);
181 requests_.clear();
182 ctrl_params_.clear();
183 }
184}
185
186void Poller::Poll(int timeout_ms) {
187 epoll_event evt[kPollSize];
188 auto before_time_ns = Time::Now().ToNanosecond();
189 int ready_num = epoll_wait(epoll_fd_, evt, kPollSize, timeout_ms);
190 auto after_time_ns = Time::Now().ToNanosecond();
191 int interval_ms =
192 static_cast<int>((after_time_ns - before_time_ns) / 1000000);
193 if (interval_ms == 0) {
194 interval_ms = 1;
195 }
196
197 std::unordered_map<int, PollResponse> responses;
198 {
199 ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
200 for (auto& item : requests_) {
201 auto& request = item.second;
202 if (ctrl_params_.count(request->fd) != 0) {
203 continue;
204 }
205
206 if (request->timeout_ms > 0) {
207 request->timeout_ms -= interval_ms;
208 if (request->timeout_ms < 0) {
209 request->timeout_ms = 0;
210 }
211 }
212
213 if (request->timeout_ms == 0) {
214 responses[item.first] = PollResponse();
215 request->timeout_ms = -1;
216 }
217 }
218 }
219
220 if (ready_num > 0) {
221 for (int i = 0; i < ready_num; ++i) {
222 int fd = evt[i].data.fd;
223 uint32_t events = evt[i].events;
224 responses[fd] = PollResponse(events);
225 }
226 }
227
228 for (auto& item : responses) {
229 int fd = item.first;
230 auto& response = item.second;
231
232 ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
233 auto search = requests_.find(fd);
234 if (search != requests_.end()) {
235 search->second->timeout_ms = -1;
236 search->second->callback(response);
237 }
238 }
239
240 if (ready_num < 0) {
241 if (errno != EINTR) {
242 AERROR << "epoll wait failed, " << strerror(errno);
243 }
244 }
245}
246
247void Poller::ThreadFunc() {
248 // block all signals in this thread
249 sigset_t signal_set;
250 sigfillset(&signal_set);
251 pthread_sigmask(SIG_BLOCK, &signal_set, nullptr);
252
253 while (!is_shutdown_.load()) {
254 HandleChanges();
255 int timeout_ms = GetTimeoutMs();
256 ADEBUG << "this poll timeout ms: " << timeout_ms;
257 Poll(timeout_ms);
258 }
259}
260
261void Poller::HandleChanges() {
262 CtrlParamMap local_params;
263 {
264 ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
265 if (ctrl_params_.empty()) {
266 return;
267 }
268 local_params.swap(ctrl_params_);
269 }
270
271 for (auto& pair : local_params) {
272 auto& item = pair.second;
273 ADEBUG << "epoll ctl, op[" << item.operation << "] fd[" << item.fd
274 << "] events[" << item.event.events << "]";
275 if (epoll_ctl(epoll_fd_, item.operation, item.fd, &item.event) != 0 &&
276 errno != EBADF) {
277 AERROR << "epoll ctl failed, " << strerror(errno);
278 }
279 }
280}
281
282// min heap can be used to optimize
283int Poller::GetTimeoutMs() {
284 int timeout_ms = kPollTimeoutMs;
285 ReadLockGuard<AtomicRWLock> lck(poll_data_lock_);
286 for (auto& item : requests_) {
287 auto& req = item.second;
288 if (req->timeout_ms >= 0 && req->timeout_ms < timeout_ms) {
289 timeout_ms = req->timeout_ms;
290 }
291 }
292 return timeout_ms;
293}
294
295void Poller::Notify() {
296 std::unique_lock<std::mutex> lock(pipe_mutex_, std::try_to_lock);
297 if (!lock.owns_lock()) {
298 return;
299 }
300
301 char msg = 'C';
302 if (write(pipe_fd_[1], &msg, 1) < 0) {
303 AWARN << "notify failed, " << strerror(errno);
304 }
305}
306
307} // namespace io
308} // namespace cyber
309} // namespace apollo
uint64_t ToNanosecond() const
convert time to nanosecond.
Definition time.cc:83
static Time Now()
get the current time.
Definition time.cc:57
bool Unregister(const PollRequest &req)
Definition poller.cc:85
std::unordered_map< int, PollCtrlParam > CtrlParamMap
Definition poller.h:40
bool Register(const PollRequest &req)
Definition poller.cc:54
void SetInnerThreadAttr(const std::string &name, std::thread *thr)
Definition scheduler.cc:90
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
#define AWARN
Definition log.h:43
class register implement
Definition arena_queue.h:37
std::function< void(const PollResponse &)> callback
Definition poll_data.h:39