Apollo 10.0
自动驾驶开放平台
thread_pool.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
17
18namespace apollo {
19namespace perception {
20namespace lib {
21
22// Just use the protobuf Closure.
23using google::protobuf::Closure;
24
25using std::vector;
26
27ThreadPool::ThreadPool(int num_workers)
28 : num_workers_(num_workers),
29 num_available_workers_(num_workers),
30 task_queue_(num_workers),
31 started_(false) {
32 workers_.reserve(num_workers_);
33 for (int idx = 0; idx < num_workers_; ++idx) {
34 ThreadPoolWorker *worker = new ThreadPoolWorker(this);
35 workers_.push_back(worker);
36 }
37}
38
40 if (!started_) {
41 return;
42 }
43
44 for (int idx = 0; idx < num_workers_; ++idx) {
45 Add(NULL);
46 }
47
48 for (int idx = 0; idx < num_workers_; ++idx) {
49 workers_[idx]->Join();
50 delete workers_[idx];
51 }
52}
53
55 for (int idx = 0; idx < num_workers_; ++idx) {
56 workers_[idx]->Start();
57 }
58 started_ = true;
59}
60
61void ThreadPool::Add(Closure *closure) { task_queue_.Push(closure); }
62
63void ThreadPool::Add(const vector<Closure *> &closures) {
64 for (size_t idx = 0; idx < closures.size(); ++idx) {
65 Add(closures[idx]);
66 }
67}
68
70 while (true) {
71 Closure *closure = nullptr;
72 thread_pool_->task_queue_.Pop(&closure);
73 if (closure == nullptr) {
74 break;
75 }
76
77 {
78 MutexLock lock(&(thread_pool_->mutex_));
79 --(thread_pool_->num_available_workers_);
80 }
81
82 closure->Run();
83
84 {
85 MutexLock lock(&(thread_pool_->mutex_));
86 ++(thread_pool_->num_available_workers_);
87 }
88 }
89}
90
91} // namespace lib
92} // namespace perception
93} // namespace apollo
virtual void Push(const Data &data)
void Add(google::protobuf::Closure *closure)
class register implement
Definition arena_queue.h:37