Apollo 10.0
自动驾驶开放平台
task_manager.cc
浏览该文件的文档.
1/**************Scheduler::****************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Vesched_infoon 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
23
24namespace apollo {
25namespace cyber {
26
28static const char* const task_prefix = "/internal/task";
29
30TaskManager::TaskManager()
31 : task_queue_size_(1000),
32 task_queue_(new base::BoundedQueue<std::function<void()>>()) {
33 if (!task_queue_->Init(task_queue_size_, new base::BlockWaitStrategy())) {
34 AERROR << "Task queue init failed";
35 throw std::runtime_error("Task queue init failed");
36 }
37 auto func = [this]() {
38 while (!stop_) {
39 std::function<void()> task;
40 if (!task_queue_->Dequeue(&task)) {
41 auto routine = croutine::CRoutine::GetCurrentRoutine();
42 routine->HangUp();
43 continue;
44 }
45 task();
46 }
47 };
48
49 num_threads_ = scheduler::Instance()->TaskPoolSize();
50 auto factory = croutine::CreateRoutineFactory(std::move(func));
51 tasks_.reserve(num_threads_);
52 for (uint32_t i = 0; i < num_threads_; i++) {
53 auto task_name = task_prefix + std::to_string(i);
54 tasks_.push_back(common::GlobalData::RegisterTaskName(task_name));
55 if (!scheduler::Instance()->CreateTask(factory, task_name)) {
56 AERROR << "CreateTask failed:" << task_name;
57 }
58 }
59}
60
61TaskManager::~TaskManager() { Shutdown(); }
62
63void TaskManager::Shutdown() {
64 if (stop_.exchange(true)) {
65 return;
66 }
67 for (uint32_t i = 0; i < num_threads_; i++) {
68 scheduler::Instance()->RemoveTask(task_prefix + std::to_string(i));
69 }
70}
71
72} // namespace cyber
73} // namespace apollo
#define AERROR
Definition log.h:44
class register implement
Definition arena_queue.h:37
Definition future.h:29