Apollo 10.0
自动驾驶开放平台
apollo::cyber::scheduler::SchedulerClassic类 参考

#include <scheduler_classic.h>

类 apollo::cyber::scheduler::SchedulerClassic 继承关系图:
apollo::cyber::scheduler::SchedulerClassic 的协作图:

Public 成员函数

bool RemoveCRoutine (uint64_t crid) override
 
bool RemoveTask (const std::string &name) override
 
bool DispatchTask (const std::shared_ptr< CRoutine > &) override
 
- Public 成员函数 继承自 apollo::cyber::scheduler::Scheduler
virtual ~Scheduler ()
 
bool CreateTask (const RoutineFactory &factory, const std::string &name)
 
bool CreateTask (std::function< void()> &&func, const std::string &name, std::shared_ptr< DataVisitorBase > visitor=nullptr)
 
bool NotifyTask (uint64_t crid)
 
void Shutdown ()
 
uint32_t TaskPoolSize ()
 
void ProcessLevelResourceControl ()
 
void SetInnerThreadAttr (const std::string &name, std::thread *thr)
 
void CheckSchedStatus ()
 
void SetInnerThreadConfs (const std::unordered_map< std::string, InnerThread > &confs)
 

友元

SchedulerInstance ()
 

额外继承的成员函数

- 静态 Public 成员函数 继承自 apollo::cyber::scheduler::Scheduler
static SchedulerInstance ()
 
- Protected 成员函数 继承自 apollo::cyber::scheduler::Scheduler
 Scheduler ()
 
- Protected 属性 继承自 apollo::cyber::scheduler::Scheduler
AtomicRWLock id_cr_lock_
 
AtomicHashMap< uint64_t, MutexWrapper * > id_map_mutex_
 
std::mutex cr_wl_mtx_
 
std::unordered_map< uint64_t, std::shared_ptr< CRoutine > > id_cr_
 
std::vector< std::shared_ptr< ProcessorContext > > pctxs_
 
std::vector< std::shared_ptr< Processor > > processors_
 
std::unordered_map< std::string, InnerThreadinner_thr_confs_
 
std::string process_level_cpuset_
 
uint32_t proc_num_ = 0
 
uint32_t task_pool_size_ = 0
 
std::atomic< bool > stop_
 

详细描述

在文件 scheduler_classic.h37 行定义.

成员函数说明

◆ DispatchTask()

bool apollo::cyber::scheduler::SchedulerClassic::DispatchTask ( const std::shared_ptr< CRoutine > &  cr)
overridevirtual

实现了 apollo::cyber::scheduler::Scheduler.

在文件 scheduler_classic.cc112 行定义.

112 {
113 // we use multi-key mutex to prevent race condition
114 // when del && add cr with same crid
115 MutexWrapper* wrapper = nullptr;
116 if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
117 {
118 std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
119 if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
120 wrapper = new MutexWrapper();
121 id_map_mutex_.Set(cr->id(), wrapper);
122 }
123 }
124 }
125 std::lock_guard<std::mutex> lg(wrapper->Mutex());
126
127 {
128 WriteLockGuard<AtomicRWLock> lk(id_cr_lock_);
129 if (id_cr_.find(cr->id()) != id_cr_.end()) {
130 return false;
131 }
132 id_cr_[cr->id()] = cr;
133 }
134
135 if (cr_confs_.find(cr->name()) != cr_confs_.end()) {
136 ClassicTask task = cr_confs_[cr->name()];
137 cr->set_priority(task.prio());
138 cr->set_group_name(task.group_name());
139 } else {
140 // croutine that not exist in conf
141 cr->set_group_name(classic_conf_.groups(0).name());
142 }
143
144 if (cr->priority() >= MAX_PRIO) {
145 AWARN << cr->name() << " prio is greater than MAX_PRIO[ << " << MAX_PRIO
146 << "].";
147 cr->set_priority(MAX_PRIO - 1);
148 }
149
150 // Enqueue task.
151 {
152 WriteLockGuard<AtomicRWLock> lk(
153 ClassicContext::rq_locks_[cr->group_name()].at(cr->priority()));
154 ClassicContext::cr_group_[cr->group_name()]
155 .at(cr->priority())
156 .emplace_back(cr);
157 }
158
159 ClassicContext::Notify(cr->group_name());
160 return true;
161}
static void Notify(const std::string &group_name)
AtomicHashMap< uint64_t, MutexWrapper * > id_map_mutex_
Definition scheduler.h:91
std::unordered_map< uint64_t, std::shared_ptr< CRoutine > > id_cr_
Definition scheduler.h:94
#define AWARN
Definition log.h:43

◆ RemoveCRoutine()

bool apollo::cyber::scheduler::SchedulerClassic::RemoveCRoutine ( uint64_t  crid)
overridevirtual

实现了 apollo::cyber::scheduler::Scheduler.

在文件 scheduler_classic.cc193 行定义.

193 {
194 // we use multi-key mutex to prevent race condition
195 // when del && add cr with same crid
196 MutexWrapper* wrapper = nullptr;
197 if (!id_map_mutex_.Get(crid, &wrapper)) {
198 {
199 std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
200 if (!id_map_mutex_.Get(crid, &wrapper)) {
201 wrapper = new MutexWrapper();
202 id_map_mutex_.Set(crid, wrapper);
203 }
204 }
205 }
206 std::lock_guard<std::mutex> lg(wrapper->Mutex());
207
208 std::shared_ptr<CRoutine> cr = nullptr;
209 {
210 WriteLockGuard<AtomicRWLock> lk(id_cr_lock_);
211 if (id_cr_.find(crid) != id_cr_.end()) {
212 cr = id_cr_[crid];
213 id_cr_[crid]->Stop();
214 id_cr_.erase(crid);
215 } else {
216 return false;
217 }
218 }
220}
static bool RemoveCRoutine(const std::shared_ptr< CRoutine > &cr)

◆ RemoveTask()

bool apollo::cyber::scheduler::SchedulerClassic::RemoveTask ( const std::string &  name)
overridevirtual

实现了 apollo::cyber::scheduler::Scheduler.

在文件 scheduler_classic.cc184 行定义.

184 {
185 if (cyber_unlikely(stop_)) {
186 return true;
187 }
188
189 auto crid = GlobalData::GenerateHashId(name);
190 return RemoveCRoutine(crid);
191}
static uint64_t GenerateHashId(const std::string &name)
Definition global_data.h:75
#define cyber_unlikely(x)
Definition macros.h:30

友元及相关函数文档

◆ Instance

Scheduler * Instance ( )
friend

在文件 scheduler_factory.cc49 行定义.

49 {
50 Scheduler* obj = instance.load(std::memory_order_acquire);
51 if (obj == nullptr) {
52 std::lock_guard<std::mutex> lock(mutex);
53 obj = instance.load(std::memory_order_relaxed);
54 if (obj == nullptr) {
55 std::string policy("classic");
56 std::string conf("conf/");
57 conf.append(GlobalData::Instance()->ProcessGroup()).append(".conf");
58 auto cfg_file = GetAbsolutePath(WorkRoot(), conf);
60 if (PathExists(cfg_file) && GetProtoFromFile(cfg_file, &cfg)) {
61 policy = cfg.scheduler_conf().policy();
62 } else {
63 AWARN << "Scheduler conf named " << cfg_file
64 << " not found, use default.";
65 }
66 if (!policy.compare("classic")) {
67 obj = new SchedulerClassic();
68 } else if (!policy.compare("choreography")) {
69 obj = new SchedulerChoreography();
70 } else {
71 AWARN << "Invalid scheduler policy: " << policy;
72 obj = new SchedulerClassic();
73 }
74 instance.store(obj, std::memory_order_release);
75 }
76 }
77 return obj;
78}
bool PathExists(const std::string &path)
Check if the path exists.
Definition file.cc:195
bool GetProtoFromFile(const std::string &file_name, google::protobuf::Message *message)
Parses the content of the file specified by the file_name as a representation of protobufs,...
Definition file.cc:132
std::string GetAbsolutePath(const std::string &prefix, const std::string &relative_path)
Get absolute path by concatenating prefix and relative_path.
Definition file.cc:179
const std::string WorkRoot()
Definition environment.h:40
optional SchedulerConf scheduler_conf

该类的文档由以下文件生成: