Apollo 11.0
自动驾驶开放平台
prediction_thread_pool.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2019 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
21#pragma once
22
23#include <future>
24#include <memory>
25#include <utility>
26#include <vector>
27
29#include "cyber/common/log.h"
30
31namespace apollo {
32namespace prediction {
33
35 public:
36 BaseThreadPool(int thread_num, int next_thread_pool_level);
37
38 void Stop();
39
41
42 template <typename InputIter, typename F>
43 void ForEach(InputIter begin, InputIter end, F f) {
44 std::vector<std::future<void>> futures;
45 for (auto iter = begin; iter != end; ++iter) {
46 auto& elem = *iter;
47 futures.emplace_back(this->Post([&] { f(elem); }));
48 }
49 for (auto& future : futures) {
50 if (future.valid()) {
51 future.get();
52 } else {
53 AERROR << "Future is invalid.";
54 }
55 }
56 }
57
58 template <typename FuncType>
59 std::future<typename std::result_of<FuncType()>::type> Post(FuncType&& func) {
60 typedef typename std::result_of<FuncType()>::type ReturnType;
61 typedef typename std::packaged_task<ReturnType()> TaskType;
62 // Post requires that the functions in it are copy-constructible.
63 // We used a shared pointer for the packaged_task,
64 // Since it's only movable and non-copyable
65 std::shared_ptr<TaskType> task =
66 std::make_shared<TaskType>(std::move(func));
67 std::future<ReturnType> returned_future = task->get_future();
68
69 // Note: variables eg. `task` must be copied here because of the lifetime
70 if (stopped_) {
71 return std::future<ReturnType>();
72 }
73 task_queue_.Enqueue([task]() { (*task)(); });
74 return returned_future;
75 }
76
77 static std::vector<int> THREAD_POOL_CAPACITY;
78
79 private:
80 std::vector<std::thread> workers_;
81 apollo::cyber::base::BoundedQueue<std::function<void()>> task_queue_;
82 std::atomic_bool stopped_;
83};
84
85template <int LEVEL>
87 public:
89 static LevelThreadPool<LEVEL> pool;
90 return &pool;
91 }
92
93 private:
95 ADEBUG << "Level = " << LEVEL
96 << "; thread pool capacity = " << THREAD_POOL_CAPACITY[LEVEL];
97 }
98};
99
101 public:
102 static BaseThreadPool* Instance();
103
104 static thread_local int s_thread_pool_level;
105
106 template <typename InputIter, typename F>
107 static void ForEach(InputIter begin, InputIter end, F f) {
108 Instance()->ForEach(begin, end, f);
109 }
110};
111
112} // namespace prediction
113} // namespace apollo
double f
bool Enqueue(const T &element)
std::future< typename std::result_of< FuncType()>::type > Post(FuncType &&func)
void ForEach(InputIter begin, InputIter end, F f)
static std::vector< int > THREAD_POOL_CAPACITY
static void ForEach(InputIter begin, InputIter end, F f)
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
class register implement
Definition arena_queue.h:37