Apollo 10.0
自动驾驶开放平台
thread_pool.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_BASE_THREAD_POOL_H_
18#define CYBER_BASE_THREAD_POOL_H_
19
20#include <atomic>
21#include <functional>
22#include <future>
23#include <memory>
24#include <queue>
25#include <stdexcept>
26#include <thread>
27#include <utility>
28#include <vector>
29
31
32namespace apollo {
33namespace cyber {
34namespace base {
35
37 public:
38 explicit ThreadPool(std::size_t thread_num, std::size_t max_task_num = 1000);
39
40 template <typename F, typename... Args>
41 auto Enqueue(F&& f, Args&&... args)
42 -> std::future<typename std::result_of<F(Args...)>::type>;
43
45
46 private:
47 std::vector<std::thread> workers_;
48 BoundedQueue<std::function<void()>> task_queue_;
49 std::atomic_bool stop_;
50};
51
52inline ThreadPool::ThreadPool(std::size_t threads, std::size_t max_task_num)
53 : stop_(false) {
54 if (!task_queue_.Init(max_task_num, new BlockWaitStrategy())) {
55 throw std::runtime_error("Task queue init failed.");
56 }
57 workers_.reserve(threads);
58 for (size_t i = 0; i < threads; ++i) {
59 workers_.emplace_back([this] {
60 while (!stop_) {
61 std::function<void()> task;
62 if (task_queue_.WaitDequeue(&task)) {
63 task();
64 }
65 }
66 });
67 }
68}
69
70// before using the return value, you should check value.valid()
71template <typename F, typename... Args>
72auto ThreadPool::Enqueue(F&& f, Args&&... args)
73 -> std::future<typename std::result_of<F(Args...)>::type> {
74 using return_type = typename std::result_of<F(Args...)>::type;
75
76 auto task = std::make_shared<std::packaged_task<return_type()>>(
77 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
78
79 std::future<return_type> res = task->get_future();
80
81 // don't allow enqueueing after stopping the pool
82 if (stop_) {
83 return std::future<return_type>();
84 }
85 task_queue_.Enqueue([task]() { (*task)(); });
86 return res;
87};
88
89// the destructor joins all threads
91 if (stop_.exchange(true)) {
92 return;
93 }
94 task_queue_.BreakAllWait();
95 for (std::thread& worker : workers_) {
96 worker.join();
97 }
98}
99
100} // namespace base
101} // namespace cyber
102} // namespace apollo
103
104#endif // CYBER_BASE_THREAD_POOL_H_
double f
ThreadPool(std::size_t thread_num, std::size_t max_task_num=1000)
Definition thread_pool.h:52
auto Enqueue(F &&f, Args &&... args) -> std::future< typename std::result_of< F(Args...)>::type >
Definition thread_pool.h:72
class register implement
Definition arena_queue.h:37