Apollo 10.0
自动驾驶开放平台
apollo::cyber::scheduler::SchedulerChoreography类 参考

#include <scheduler_choreography.h>

类 apollo::cyber::scheduler::SchedulerChoreography 继承关系图:
apollo::cyber::scheduler::SchedulerChoreography 的协作图:

Public 成员函数

bool RemoveCRoutine (uint64_t crid) override
 
bool RemoveTask (const std::string &name) override
 
bool DispatchTask (const std::shared_ptr< CRoutine > &) override
 
- Public 成员函数 继承自 apollo::cyber::scheduler::Scheduler
virtual ~Scheduler ()
 
bool CreateTask (const RoutineFactory &factory, const std::string &name)
 
bool CreateTask (std::function< void()> &&func, const std::string &name, std::shared_ptr< DataVisitorBase > visitor=nullptr)
 
bool NotifyTask (uint64_t crid)
 
void Shutdown ()
 
uint32_t TaskPoolSize ()
 
void ProcessLevelResourceControl ()
 
void SetInnerThreadAttr (const std::string &name, std::thread *thr)
 
void CheckSchedStatus ()
 
void SetInnerThreadConfs (const std::unordered_map< std::string, InnerThread > &confs)
 

友元

SchedulerInstance ()
 

额外继承的成员函数

- 静态 Public 成员函数 继承自 apollo::cyber::scheduler::Scheduler
static SchedulerInstance ()
 
- Protected 成员函数 继承自 apollo::cyber::scheduler::Scheduler
 Scheduler ()
 
- Protected 属性 继承自 apollo::cyber::scheduler::Scheduler
AtomicRWLock id_cr_lock_
 
AtomicHashMap< uint64_t, MutexWrapper * > id_map_mutex_
 
std::mutex cr_wl_mtx_
 
std::unordered_map< uint64_t, std::shared_ptr< CRoutine > > id_cr_
 
std::vector< std::shared_ptr< ProcessorContext > > pctxs_
 
std::vector< std::shared_ptr< Processor > > processors_
 
std::unordered_map< std::string, InnerThreadinner_thr_confs_
 
std::string process_level_cpuset_
 
uint32_t proc_num_ = 0
 
uint32_t task_pool_size_ = 0
 
std::atomic< bool > stop_
 

详细描述

在文件 scheduler_choreography.h36 行定义.

成员函数说明

◆ DispatchTask()

bool apollo::cyber::scheduler::SchedulerChoreography::DispatchTask ( const std::shared_ptr< CRoutine > &  cr)
overridevirtual

实现了 apollo::cyber::scheduler::Scheduler.

在文件 scheduler_choreography.cc123 行定义.

123 {
124 // we use multi-key mutex to prevent race condition
125 // when del && add cr with same crid
126 MutexWrapper* wrapper = nullptr;
127 if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
128 {
129 std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
130 if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
131 wrapper = new MutexWrapper();
132 id_map_mutex_.Set(cr->id(), wrapper);
133 }
134 }
135 }
136 std::lock_guard<std::mutex> lg(wrapper->Mutex());
137
138 // Assign sched cfg to tasks according to configuration.
139 if (cr_confs_.find(cr->name()) != cr_confs_.end()) {
140 ChoreographyTask taskconf = cr_confs_[cr->name()];
141 cr->set_priority(taskconf.prio());
142
143 if (taskconf.has_processor()) {
144 cr->set_processor_id(taskconf.processor());
145 }
146 }
147
148 {
149 WriteLockGuard<AtomicRWLock> lk(id_cr_lock_);
150 if (id_cr_.find(cr->id()) != id_cr_.end()) {
151 return false;
152 }
153 id_cr_[cr->id()] = cr;
154 }
155
156 // Enqueue task.
157 uint32_t pid = cr->processor_id();
158 if (pid < proc_num_) {
159 // Enqueue task to Choreo Policy.
160 static_cast<ChoreographyContext*>(pctxs_[pid].get())->Enqueue(cr);
161 } else {
162 // Check if task prio is reasonable.
163 if (cr->priority() >= MAX_PRIO) {
164 AWARN << cr->name() << " prio great than MAX_PRIO.";
165 cr->set_priority(MAX_PRIO - 1);
166 }
167
168 cr->set_group_name(DEFAULT_GROUP_NAME);
169
170 // Enqueue task to pool runqueue.
171 {
172 WriteLockGuard<AtomicRWLock> lk(
175 .at(cr->priority())
176 .emplace_back(cr);
177 }
178 }
179 return true;
180}
AtomicHashMap< uint64_t, MutexWrapper * > id_map_mutex_
Definition scheduler.h:91
std::unordered_map< uint64_t, std::shared_ptr< CRoutine > > id_cr_
Definition scheduler.h:94
std::vector< std::shared_ptr< ProcessorContext > > pctxs_
Definition scheduler.h:95
#define DEFAULT_GROUP_NAME
#define AWARN
Definition log.h:43

◆ RemoveCRoutine()

bool apollo::cyber::scheduler::SchedulerChoreography::RemoveCRoutine ( uint64_t  crid)
overridevirtual

实现了 apollo::cyber::scheduler::Scheduler.

在文件 scheduler_choreography.cc191 行定义.

191 {
192 // we use multi-key mutex to prevent race condition
193 // when del && add cr with same crid
194 MutexWrapper* wrapper = nullptr;
195 if (!id_map_mutex_.Get(crid, &wrapper)) {
196 {
197 std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
198 if (!id_map_mutex_.Get(crid, &wrapper)) {
199 wrapper = new MutexWrapper();
200 id_map_mutex_.Set(crid, wrapper);
201 }
202 }
203 }
204 std::lock_guard<std::mutex> lg(wrapper->Mutex());
205
206 std::shared_ptr<CRoutine> cr = nullptr;
207 uint32_t pid;
208 {
209 WriteLockGuard<AtomicRWLock> lk(id_cr_lock_);
210 auto p = id_cr_.find(crid);
211 if (p != id_cr_.end()) {
212 cr = p->second;
213 pid = cr->processor_id();
214 id_cr_[crid]->Stop();
215 id_cr_.erase(crid);
216 } else {
217 return false;
218 }
219 }
220
221 // rm cr from pool if rt not in choreo context
222 if (pid < proc_num_) {
223 return static_cast<ChoreographyContext*>(pctxs_[pid].get())
224 ->RemoveCRoutine(crid);
225 } else {
227 }
228}
static bool RemoveCRoutine(const std::shared_ptr< CRoutine > &cr)

◆ RemoveTask()

bool apollo::cyber::scheduler::SchedulerChoreography::RemoveTask ( const std::string &  name)
overridevirtual

实现了 apollo::cyber::scheduler::Scheduler.

在文件 scheduler_choreography.cc182 行定义.

182 {
183 if (cyber_unlikely(stop_)) {
184 return true;
185 }
186
187 auto crid = GlobalData::GenerateHashId(name);
188 return RemoveCRoutine(crid);
189}
static uint64_t GenerateHashId(const std::string &name)
Definition global_data.h:75
#define cyber_unlikely(x)
Definition macros.h:30

友元及相关函数文档

◆ Instance

Scheduler * Instance ( )
friend

在文件 scheduler_factory.cc49 行定义.

49 {
50 Scheduler* obj = instance.load(std::memory_order_acquire);
51 if (obj == nullptr) {
52 std::lock_guard<std::mutex> lock(mutex);
53 obj = instance.load(std::memory_order_relaxed);
54 if (obj == nullptr) {
55 std::string policy("classic");
56 std::string conf("conf/");
57 conf.append(GlobalData::Instance()->ProcessGroup()).append(".conf");
58 auto cfg_file = GetAbsolutePath(WorkRoot(), conf);
60 if (PathExists(cfg_file) && GetProtoFromFile(cfg_file, &cfg)) {
61 policy = cfg.scheduler_conf().policy();
62 } else {
63 AWARN << "Scheduler conf named " << cfg_file
64 << " not found, use default.";
65 }
66 if (!policy.compare("classic")) {
67 obj = new SchedulerClassic();
68 } else if (!policy.compare("choreography")) {
69 obj = new SchedulerChoreography();
70 } else {
71 AWARN << "Invalid scheduler policy: " << policy;
72 obj = new SchedulerClassic();
73 }
74 instance.store(obj, std::memory_order_release);
75 }
76 }
77 return obj;
78}
bool PathExists(const std::string &path)
Check if the path exists.
Definition file.cc:195
bool GetProtoFromFile(const std::string &file_name, google::protobuf::Message *message)
Parses the content of the file specified by the file_name as a representation of protobufs,...
Definition file.cc:132
std::string GetAbsolutePath(const std::string &prefix, const std::string &relative_path)
Get absolute path by concatenating prefix and relative_path.
Definition file.cc:179
const std::string WorkRoot()
Definition environment.h:40
optional SchedulerConf scheduler_conf

该类的文档由以下文件生成: