Apollo 10.0
自动驾驶开放平台
scheduler_classic.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <algorithm>
20#include <memory>
21#include <utility>
22
24#include "cyber/common/file.h"
27
28namespace apollo {
29namespace cyber {
30namespace scheduler {
31
40
41SchedulerClassic::SchedulerClassic() {
42 std::string conf("conf/");
43 conf.append(GlobalData::Instance()->ProcessGroup()).append(".conf");
44 auto cfg_file = GetAbsolutePath(WorkRoot(), conf);
45
47 if (PathExists(cfg_file) && GetProtoFromFile(cfg_file, &cfg)) {
48 for (auto& thr : cfg.scheduler_conf().threads()) {
49 inner_thr_confs_[thr.name()] = thr;
50 }
51
52 if (cfg.scheduler_conf().has_process_level_cpuset()) {
55 }
56
57 classic_conf_ = cfg.scheduler_conf().classic_conf();
58 for (auto& group : classic_conf_.groups()) {
59 auto& group_name = group.name();
60 for (auto task : group.tasks()) {
61 task.set_group_name(group_name);
62 cr_confs_[task.name()] = task;
63 }
64 }
65 } else {
66 // if do not set default_proc_num in scheduler conf
67 // give a default value
68 uint32_t proc_num = 2;
69 auto& global_conf = GlobalData::Instance()->Config();
70 if (global_conf.has_scheduler_conf() &&
71 global_conf.scheduler_conf().has_default_proc_num()) {
72 proc_num = global_conf.scheduler_conf().default_proc_num();
73 }
74 task_pool_size_ = proc_num;
75
76 auto sched_group = classic_conf_.add_groups();
77 sched_group->set_name(DEFAULT_GROUP_NAME);
78 sched_group->set_processor_num(proc_num);
79 }
80
81 CreateProcessor();
82}
83
84void SchedulerClassic::CreateProcessor() {
85 for (auto& group : classic_conf_.groups()) {
86 auto& group_name = group.name();
87 auto proc_num = group.processor_num();
88 if (task_pool_size_ == 0) {
89 task_pool_size_ = proc_num;
90 }
91
92 auto& affinity = group.affinity();
93 auto& processor_policy = group.processor_policy();
94 auto processor_prio = group.processor_prio();
95 std::vector<int> cpuset;
96 ParseCpuset(group.cpuset(), &cpuset);
97
98 for (uint32_t i = 0; i < proc_num; i++) {
99 auto ctx = std::make_shared<ClassicContext>(group_name);
100 pctxs_.emplace_back(ctx);
101
102 auto proc = std::make_shared<Processor>();
103 proc->BindContext(ctx);
104 SetSchedAffinity(proc->Thread(), cpuset, affinity, i);
105 SetSchedPolicy(proc->Thread(), processor_policy, processor_prio,
106 proc->Tid());
107 processors_.emplace_back(proc);
108 }
109 }
110}
111
112bool SchedulerClassic::DispatchTask(const std::shared_ptr<CRoutine>& cr) {
113 // we use multi-key mutex to prevent race condition
114 // when del && add cr with same crid
115 MutexWrapper* wrapper = nullptr;
116 if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
117 {
118 std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
119 if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
120 wrapper = new MutexWrapper();
121 id_map_mutex_.Set(cr->id(), wrapper);
122 }
123 }
124 }
125 std::lock_guard<std::mutex> lg(wrapper->Mutex());
126
127 {
129 if (id_cr_.find(cr->id()) != id_cr_.end()) {
130 return false;
131 }
132 id_cr_[cr->id()] = cr;
133 }
134
135 if (cr_confs_.find(cr->name()) != cr_confs_.end()) {
136 ClassicTask task = cr_confs_[cr->name()];
137 cr->set_priority(task.prio());
138 cr->set_group_name(task.group_name());
139 } else {
140 // croutine that not exist in conf
141 cr->set_group_name(classic_conf_.groups(0).name());
142 }
143
144 if (cr->priority() >= MAX_PRIO) {
145 AWARN << cr->name() << " prio is greater than MAX_PRIO[ << " << MAX_PRIO
146 << "].";
147 cr->set_priority(MAX_PRIO - 1);
148 }
149
150 // Enqueue task.
151 {
153 ClassicContext::rq_locks_[cr->group_name()].at(cr->priority()));
154 ClassicContext::cr_group_[cr->group_name()]
155 .at(cr->priority())
156 .emplace_back(cr);
157 }
158
159 ClassicContext::Notify(cr->group_name());
160 return true;
161}
162
163bool SchedulerClassic::NotifyProcessor(uint64_t crid) {
164 if (cyber_unlikely(stop_)) {
165 return true;
166 }
167
168 {
169 ReadLockGuard<AtomicRWLock> lk(id_cr_lock_);
170 if (id_cr_.find(crid) != id_cr_.end()) {
171 auto cr = id_cr_[crid];
172 if (cr->state() == RoutineState::DATA_WAIT ||
173 cr->state() == RoutineState::IO_WAIT) {
174 cr->SetUpdateFlag();
175 }
176
177 ClassicContext::Notify(cr->group_name());
178 return true;
179 }
180 }
181 return false;
182}
183
184bool SchedulerClassic::RemoveTask(const std::string& name) {
185 if (cyber_unlikely(stop_)) {
186 return true;
187 }
188
189 auto crid = GlobalData::GenerateHashId(name);
190 return RemoveCRoutine(crid);
191}
192
194 // we use multi-key mutex to prevent race condition
195 // when del && add cr with same crid
196 MutexWrapper* wrapper = nullptr;
197 if (!id_map_mutex_.Get(crid, &wrapper)) {
198 {
199 std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
200 if (!id_map_mutex_.Get(crid, &wrapper)) {
201 wrapper = new MutexWrapper();
202 id_map_mutex_.Set(crid, wrapper);
203 }
204 }
205 }
206 std::lock_guard<std::mutex> lg(wrapper->Mutex());
207
208 std::shared_ptr<CRoutine> cr = nullptr;
209 {
211 if (id_cr_.find(crid) != id_cr_.end()) {
212 cr = id_cr_[crid];
213 id_cr_[crid]->Stop();
214 id_cr_.erase(crid);
215 } else {
216 return false;
217 }
218 }
220}
221
222} // namespace scheduler
223} // namespace cyber
224} // namespace apollo
static uint64_t GenerateHashId(const std::string &name)
Definition global_data.h:75
static void Notify(const std::string &group_name)
static bool RemoveCRoutine(const std::shared_ptr< CRoutine > &cr)
bool RemoveTask(const std::string &name) override
bool DispatchTask(const std::shared_ptr< CRoutine > &) override
std::vector< std::shared_ptr< Processor > > processors_
Definition scheduler.h:96
AtomicHashMap< uint64_t, MutexWrapper * > id_map_mutex_
Definition scheduler.h:91
std::unordered_map< uint64_t, std::shared_ptr< CRoutine > > id_cr_
Definition scheduler.h:94
std::unordered_map< std::string, InnerThread > inner_thr_confs_
Definition scheduler.h:98
std::vector< std::shared_ptr< ProcessorContext > > pctxs_
Definition scheduler.h:95
#define DEFAULT_GROUP_NAME
#define cyber_unlikely(x)
Definition macros.h:30
#define AWARN
Definition log.h:43
bool PathExists(const std::string &path)
Check if the path exists.
Definition file.cc:195
bool GetProtoFromFile(const std::string &file_name, google::protobuf::Message *message)
Parses the content of the file specified by the file_name as a representation of protobufs,...
Definition file.cc:132
std::string GetAbsolutePath(const std::string &prefix, const std::string &relative_path)
Get absolute path by concatenating prefix and relative_path.
Definition file.cc:179
const std::string WorkRoot()
Definition environment.h:40
void SetSchedAffinity(std::thread *thread, const std::vector< int > &cpus, const std::string &affinity, int cpu_id)
Definition pin_thread.cc:52
void SetSchedPolicy(std::thread *thread, std::string spolicy, int sched_priority, pid_t tid)
Definition pin_thread.cc:75
void ParseCpuset(const std::string &str, std::vector< int > *cpuset)
Definition pin_thread.cc:26
class register implement
Definition arena_queue.h:37
optional SchedulerConf scheduler_conf