28static const char*
const task_prefix =
"/internal/task";
30TaskManager::TaskManager()
31 : task_queue_size_(1000),
32 task_queue_(new base::BoundedQueue<
std::function<void()>>()) {
33 if (!task_queue_->Init(task_queue_size_,
new base::BlockWaitStrategy())) {
34 AERROR <<
"Task queue init failed";
35 throw std::runtime_error(
"Task queue init failed");
37 auto func = [
this]() {
39 std::function<void()> task;
40 if (!task_queue_->Dequeue(&task)) {
41 auto routine = croutine::CRoutine::GetCurrentRoutine();
49 num_threads_ = scheduler::Instance()->TaskPoolSize();
50 auto factory = croutine::CreateRoutineFactory(std::move(func));
51 tasks_.reserve(num_threads_);
52 for (uint32_t i = 0; i < num_threads_; i++) {
53 auto task_name = task_prefix + std::to_string(i);
54 tasks_.push_back(common::GlobalData::RegisterTaskName(task_name));
55 if (!scheduler::Instance()->CreateTask(factory, task_name)) {
56 AERROR <<
"CreateTask failed:" << task_name;
61TaskManager::~TaskManager() { Shutdown(); }
63void TaskManager::Shutdown() {
64 if (stop_.exchange(
true)) {
67 for (uint32_t i = 0; i < num_threads_; i++) {
68 scheduler::Instance()->RemoveTask(task_prefix + std::to_string(i));