Apollo 10.0
自动驾驶开放平台
apollo::cyber::scheduler::Scheduler类 参考abstract

#include <scheduler.h>

类 apollo::cyber::scheduler::Scheduler 继承关系图:
apollo::cyber::scheduler::Scheduler 的协作图:

Public 成员函数

virtual ~Scheduler ()
 
bool CreateTask (const RoutineFactory &factory, const std::string &name)
 
bool CreateTask (std::function< void()> &&func, const std::string &name, std::shared_ptr< DataVisitorBase > visitor=nullptr)
 
bool NotifyTask (uint64_t crid)
 
void Shutdown ()
 
uint32_t TaskPoolSize ()
 
virtual bool RemoveTask (const std::string &name)=0
 
void ProcessLevelResourceControl ()
 
void SetInnerThreadAttr (const std::string &name, std::thread *thr)
 
virtual bool DispatchTask (const std::shared_ptr< CRoutine > &)=0
 
virtual bool NotifyProcessor (uint64_t crid)=0
 
virtual bool RemoveCRoutine (uint64_t crid)=0
 
void CheckSchedStatus ()
 
void SetInnerThreadConfs (const std::unordered_map< std::string, InnerThread > &confs)
 

静态 Public 成员函数

static SchedulerInstance ()
 

Protected 成员函数

 Scheduler ()
 

Protected 属性

AtomicRWLock id_cr_lock_
 
AtomicHashMap< uint64_t, MutexWrapper * > id_map_mutex_
 
std::mutex cr_wl_mtx_
 
std::unordered_map< uint64_t, std::shared_ptr< CRoutine > > id_cr_
 
std::vector< std::shared_ptr< ProcessorContext > > pctxs_
 
std::vector< std::shared_ptr< Processor > > processors_
 
std::unordered_map< std::string, InnerThreadinner_thr_confs_
 
std::string process_level_cpuset_
 
uint32_t proc_num_ = 0
 
uint32_t task_pool_size_ = 0
 
std::atomic< bool > stop_
 

详细描述

在文件 scheduler.h58 行定义.

构造及析构函数说明

◆ ~Scheduler()

virtual apollo::cyber::scheduler::Scheduler::~Scheduler ( )
inlinevirtual

在文件 scheduler.h60 行定义.

60{}

◆ Scheduler()

apollo::cyber::scheduler::Scheduler::Scheduler ( )
inlineprotected

在文件 scheduler.h88 行定义.

88: stop_(false) {}

成员函数说明

◆ CheckSchedStatus()

void apollo::cyber::scheduler::Scheduler::CheckSchedStatus ( )

在文件 scheduler.cc102 行定义.

102 {
103 std::string snap_info;
104 auto now = Time::Now().ToNanosecond();
105 for (auto processor : processors_) {
106 auto snap = processor->ProcSnapshot();
107 if (snap->execute_start_time.load()) {
108 auto execute_time = (now - snap->execute_start_time.load()) / 1000000;
109 snap_info.append(std::to_string(snap->processor_id.load()))
110 .append(":")
111 .append(snap->routine_name)
112 .append(":")
113 .append(std::to_string(execute_time));
114 } else {
115 snap_info.append(std::to_string(snap->processor_id.load()))
116 .append(":idle");
117 }
118 snap_info.append(", ");
119 }
120 snap_info.append("timestamp: ").append(std::to_string(now));
121 AINFO << snap_info;
122 snap_info.clear();
123}
uint64_t ToNanosecond() const
convert time to nanosecond.
Definition time.cc:83
static Time Now()
get the current time.
Definition time.cc:57
std::vector< std::shared_ptr< Processor > > processors_
Definition scheduler.h:96
#define AINFO
Definition log.h:42

◆ CreateTask() [1/2]

bool apollo::cyber::scheduler::Scheduler::CreateTask ( const RoutineFactory factory,
const std::string &  name 
)

在文件 scheduler.cc37 行定义.

38 {
39 return CreateTask(factory.create_routine(), name, factory.GetDataVisitor());
40}
bool CreateTask(const RoutineFactory &factory, const std::string &name)
Definition scheduler.cc:37

◆ CreateTask() [2/2]

bool apollo::cyber::scheduler::Scheduler::CreateTask ( std::function< void()> &&  func,
const std::string &  name,
std::shared_ptr< DataVisitorBase visitor = nullptr 
)

在文件 scheduler.cc42 行定义.

44 {
45 if (cyber_unlikely(stop_.load())) {
46 ADEBUG << "scheduler is stoped, cannot create task!";
47 return false;
48 }
49
50 auto task_id = GlobalData::RegisterTaskName(name);
51
52 auto cr = std::make_shared<CRoutine>(func);
53 cr->set_id(task_id);
54 cr->set_name(name);
55 AINFO << "create croutine: " << name;
56
57 if (!DispatchTask(cr)) {
58 return false;
59 }
60
61 if (visitor != nullptr) {
62 visitor->RegisterNotifyCallback([this, task_id]() {
63 if (cyber_unlikely(stop_.load())) {
64 return;
65 }
66 this->NotifyProcessor(task_id);
67 });
68 }
69 return true;
70}
static uint64_t RegisterTaskName(const std::string &task_name)
virtual bool DispatchTask(const std::shared_ptr< CRoutine > &)=0
#define cyber_unlikely(x)
Definition macros.h:30
#define ADEBUG
Definition log.h:41

◆ DispatchTask()

virtual bool apollo::cyber::scheduler::Scheduler::DispatchTask ( const std::shared_ptr< CRoutine > &  )
pure virtual

◆ Instance()

static Scheduler * apollo::cyber::scheduler::Scheduler::Instance ( )
static

◆ NotifyProcessor()

virtual bool apollo::cyber::scheduler::Scheduler::NotifyProcessor ( uint64_t  crid)
pure virtual

◆ NotifyTask()

bool apollo::cyber::scheduler::Scheduler::NotifyTask ( uint64_t  crid)

在文件 scheduler.cc72 行定义.

72 {
73 if (cyber_unlikely(stop_.load())) {
74 return true;
75 }
76 return NotifyProcessor(crid);
77}
virtual bool NotifyProcessor(uint64_t crid)=0

◆ ProcessLevelResourceControl()

void apollo::cyber::scheduler::Scheduler::ProcessLevelResourceControl ( )

在文件 scheduler.cc79 行定义.

79 {
80 std::vector<int> cpus;
82 cpu_set_t set;
83 CPU_ZERO(&set);
84 for (const auto cpu : cpus) {
85 CPU_SET(cpu, &set);
86 }
87 pthread_setaffinity_np(pthread_self(), sizeof(set), &set);
88}
void ParseCpuset(const std::string &str, std::vector< int > *cpuset)
Definition pin_thread.cc:26

◆ RemoveCRoutine()

virtual bool apollo::cyber::scheduler::Scheduler::RemoveCRoutine ( uint64_t  crid)
pure virtual

◆ RemoveTask()

virtual bool apollo::cyber::scheduler::Scheduler::RemoveTask ( const std::string &  name)
pure virtual

◆ SetInnerThreadAttr()

void apollo::cyber::scheduler::Scheduler::SetInnerThreadAttr ( const std::string &  name,
std::thread *  thr 
)

在文件 scheduler.cc90 行定义.

90 {
91 if (thr != nullptr && inner_thr_confs_.find(name) != inner_thr_confs_.end()) {
92 auto th_conf = inner_thr_confs_[name];
93 auto cpuset = th_conf.cpuset();
94
95 std::vector<int> cpus;
96 ParseCpuset(cpuset, &cpus);
97 SetSchedAffinity(thr, cpus, "range");
98 SetSchedPolicy(thr, th_conf.policy(), th_conf.prio());
99 }
100}
std::unordered_map< std::string, InnerThread > inner_thr_confs_
Definition scheduler.h:98
void SetSchedAffinity(std::thread *thread, const std::vector< int > &cpus, const std::string &affinity, int cpu_id)
Definition pin_thread.cc:52
void SetSchedPolicy(std::thread *thread, std::string spolicy, int sched_priority, pid_t tid)
Definition pin_thread.cc:75

◆ SetInnerThreadConfs()

void apollo::cyber::scheduler::Scheduler::SetInnerThreadConfs ( const std::unordered_map< std::string, InnerThread > &  confs)
inline

在文件 scheduler.h82 行定义.

83 {
84 inner_thr_confs_ = confs;
85 }

◆ Shutdown()

void apollo::cyber::scheduler::Scheduler::Shutdown ( )

在文件 scheduler.cc125 行定义.

125 {
126 if (cyber_unlikely(stop_.exchange(true))) {
127 return;
128 }
129
130 for (auto& ctx : pctxs_) {
131 ctx->Shutdown();
132 }
133
134 std::vector<uint64_t> cr_list;
135 {
136 ReadLockGuard<AtomicRWLock> lk(id_cr_lock_);
137 for (auto& cr : id_cr_) {
138 cr_list.emplace_back(cr.second->id());
139 }
140 }
141
142 for (auto& id : cr_list) {
143 RemoveCRoutine(id);
144 }
145
146 for (auto& processor : processors_) {
147 processor->Stop();
148 }
149
150 processors_.clear();
151 pctxs_.clear();
152}
std::unordered_map< uint64_t, std::shared_ptr< CRoutine > > id_cr_
Definition scheduler.h:94
std::vector< std::shared_ptr< ProcessorContext > > pctxs_
Definition scheduler.h:95
virtual bool RemoveCRoutine(uint64_t crid)=0

◆ TaskPoolSize()

uint32_t apollo::cyber::scheduler::Scheduler::TaskPoolSize ( )
inline

在文件 scheduler.h69 行定义.

类成员变量说明

◆ cr_wl_mtx_

std::mutex apollo::cyber::scheduler::Scheduler::cr_wl_mtx_
protected

在文件 scheduler.h92 行定义.

◆ id_cr_

std::unordered_map<uint64_t, std::shared_ptr<CRoutine> > apollo::cyber::scheduler::Scheduler::id_cr_
protected

在文件 scheduler.h94 行定义.

◆ id_cr_lock_

AtomicRWLock apollo::cyber::scheduler::Scheduler::id_cr_lock_
protected

在文件 scheduler.h90 行定义.

◆ id_map_mutex_

AtomicHashMap<uint64_t, MutexWrapper*> apollo::cyber::scheduler::Scheduler::id_map_mutex_
protected

在文件 scheduler.h91 行定义.

◆ inner_thr_confs_

std::unordered_map<std::string, InnerThread> apollo::cyber::scheduler::Scheduler::inner_thr_confs_
protected

在文件 scheduler.h98 行定义.

◆ pctxs_

std::vector<std::shared_ptr<ProcessorContext> > apollo::cyber::scheduler::Scheduler::pctxs_
protected

在文件 scheduler.h95 行定义.

◆ proc_num_

uint32_t apollo::cyber::scheduler::Scheduler::proc_num_ = 0
protected

在文件 scheduler.h101 行定义.

◆ process_level_cpuset_

std::string apollo::cyber::scheduler::Scheduler::process_level_cpuset_
protected

在文件 scheduler.h100 行定义.

◆ processors_

std::vector<std::shared_ptr<Processor> > apollo::cyber::scheduler::Scheduler::processors_
protected

在文件 scheduler.h96 行定义.

◆ stop_

std::atomic<bool> apollo::cyber::scheduler::Scheduler::stop_
protected

在文件 scheduler.h103 行定义.

◆ task_pool_size_

uint32_t apollo::cyber::scheduler::Scheduler::task_pool_size_ = 0
protected

在文件 scheduler.h102 行定义.


该类的文档由以下文件生成: