Apollo 10.0
自动驾驶开放平台
scheduler_choreography.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <memory>
20#include <string>
21#include <utility>
22
24#include "cyber/common/file.h"
28
29namespace apollo {
30namespace cyber {
31namespace scheduler {
32
42
43SchedulerChoreography::SchedulerChoreography()
44 : choreography_processor_prio_(0), pool_processor_prio_(0) {
45 std::string conf("conf/");
46 conf.append(GlobalData::Instance()->ProcessGroup()).append(".conf");
47 auto cfg_file = GetAbsolutePath(WorkRoot(), conf);
48
50 if (PathExists(cfg_file) && GetProtoFromFile(cfg_file, &cfg)) {
51 for (auto& thr : cfg.scheduler_conf().threads()) {
52 inner_thr_confs_[thr.name()] = thr;
53 }
54
55 if (cfg.scheduler_conf().has_process_level_cpuset()) {
56 process_level_cpuset_ = cfg.scheduler_conf().process_level_cpuset();
57 ProcessLevelResourceControl();
58 }
59
60 const apollo::cyber::proto::ChoreographyConf& choreography_conf =
62 proc_num_ = choreography_conf.choreography_processor_num();
63 choreography_affinity_ = choreography_conf.choreography_affinity();
64 choreography_processor_policy_ =
65 choreography_conf.choreography_processor_policy();
66
67 choreography_processor_prio_ =
68 choreography_conf.choreography_processor_prio();
69 ParseCpuset(choreography_conf.choreography_cpuset(), &choreography_cpuset_);
70
71 task_pool_size_ = choreography_conf.pool_processor_num();
72 pool_affinity_ = choreography_conf.pool_affinity();
73 pool_processor_policy_ = choreography_conf.pool_processor_policy();
74 pool_processor_prio_ = choreography_conf.pool_processor_prio();
75 ParseCpuset(choreography_conf.pool_cpuset(), &pool_cpuset_);
76
77 for (const auto& task : choreography_conf.tasks()) {
78 cr_confs_[task.name()] = task;
79 }
80 }
81
82 if (proc_num_ == 0) {
83 auto& global_conf = GlobalData::Instance()->Config();
84 if (global_conf.has_scheduler_conf() &&
85 global_conf.scheduler_conf().has_default_proc_num()) {
86 proc_num_ = global_conf.scheduler_conf().default_proc_num();
87 } else {
88 proc_num_ = 2;
89 }
90 task_pool_size_ = proc_num_;
91 }
92
93 CreateProcessor();
94}
95
96void SchedulerChoreography::CreateProcessor() {
97 for (uint32_t i = 0; i < proc_num_; i++) {
98 auto proc = std::make_shared<Processor>();
99 auto ctx = std::make_shared<ChoreographyContext>();
100
101 proc->BindContext(ctx);
102 SetSchedAffinity(proc->Thread(), choreography_cpuset_,
103 choreography_affinity_, i);
104 SetSchedPolicy(proc->Thread(), choreography_processor_policy_,
105 choreography_processor_prio_, proc->Tid());
106 pctxs_.emplace_back(ctx);
107 processors_.emplace_back(proc);
108 }
109
110 for (uint32_t i = 0; i < task_pool_size_; i++) {
111 auto proc = std::make_shared<Processor>();
112 auto ctx = std::make_shared<ClassicContext>();
113
114 proc->BindContext(ctx);
115 SetSchedAffinity(proc->Thread(), pool_cpuset_, pool_affinity_, i);
116 SetSchedPolicy(proc->Thread(), pool_processor_policy_, pool_processor_prio_,
117 proc->Tid());
118 pctxs_.emplace_back(ctx);
119 processors_.emplace_back(proc);
120 }
121}
122
123bool SchedulerChoreography::DispatchTask(const std::shared_ptr<CRoutine>& cr) {
124 // we use multi-key mutex to prevent race condition
125 // when del && add cr with same crid
126 MutexWrapper* wrapper = nullptr;
127 if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
128 {
129 std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
130 if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
131 wrapper = new MutexWrapper();
132 id_map_mutex_.Set(cr->id(), wrapper);
133 }
134 }
135 }
136 std::lock_guard<std::mutex> lg(wrapper->Mutex());
137
138 // Assign sched cfg to tasks according to configuration.
139 if (cr_confs_.find(cr->name()) != cr_confs_.end()) {
140 ChoreographyTask taskconf = cr_confs_[cr->name()];
141 cr->set_priority(taskconf.prio());
142
143 if (taskconf.has_processor()) {
144 cr->set_processor_id(taskconf.processor());
145 }
146 }
147
148 {
149 WriteLockGuard<AtomicRWLock> lk(id_cr_lock_);
150 if (id_cr_.find(cr->id()) != id_cr_.end()) {
151 return false;
152 }
153 id_cr_[cr->id()] = cr;
154 }
155
156 // Enqueue task.
157 uint32_t pid = cr->processor_id();
158 if (pid < proc_num_) {
159 // Enqueue task to Choreo Policy.
160 static_cast<ChoreographyContext*>(pctxs_[pid].get())->Enqueue(cr);
161 } else {
162 // Check if task prio is reasonable.
163 if (cr->priority() >= MAX_PRIO) {
164 AWARN << cr->name() << " prio great than MAX_PRIO.";
165 cr->set_priority(MAX_PRIO - 1);
166 }
167
168 cr->set_group_name(DEFAULT_GROUP_NAME);
169
170 // Enqueue task to pool runqueue.
171 {
173 ClassicContext::rq_locks_[DEFAULT_GROUP_NAME].at(cr->priority()));
174 ClassicContext::cr_group_[DEFAULT_GROUP_NAME]
175 .at(cr->priority())
176 .emplace_back(cr);
177 }
178 }
179 return true;
180}
181
182bool SchedulerChoreography::RemoveTask(const std::string& name) {
183 if (cyber_unlikely(stop_)) {
184 return true;
185 }
186
187 auto crid = GlobalData::GenerateHashId(name);
188 return RemoveCRoutine(crid);
189}
190
191bool SchedulerChoreography::RemoveCRoutine(uint64_t crid) {
192 // we use multi-key mutex to prevent race condition
193 // when del && add cr with same crid
194 MutexWrapper* wrapper = nullptr;
195 if (!id_map_mutex_.Get(crid, &wrapper)) {
196 {
197 std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
198 if (!id_map_mutex_.Get(crid, &wrapper)) {
199 wrapper = new MutexWrapper();
200 id_map_mutex_.Set(crid, wrapper);
201 }
202 }
203 }
204 std::lock_guard<std::mutex> lg(wrapper->Mutex());
205
206 std::shared_ptr<CRoutine> cr = nullptr;
207 uint32_t pid;
208 {
209 WriteLockGuard<AtomicRWLock> lk(id_cr_lock_);
210 auto p = id_cr_.find(crid);
211 if (p != id_cr_.end()) {
212 cr = p->second;
213 pid = cr->processor_id();
214 id_cr_[crid]->Stop();
215 id_cr_.erase(crid);
216 } else {
217 return false;
218 }
219 }
220
221 // rm cr from pool if rt not in choreo context
222 if (pid < proc_num_) {
223 return static_cast<ChoreographyContext*>(pctxs_[pid].get())
224 ->RemoveCRoutine(crid);
225 } else {
226 return ClassicContext::RemoveCRoutine(cr);
227 }
228}
229
230bool SchedulerChoreography::NotifyProcessor(uint64_t crid) {
231 if (cyber_unlikely(stop_)) {
232 return true;
233 }
234
235 std::shared_ptr<CRoutine> cr;
236 uint32_t pid;
237 // find cr from id_cr && Update cr Flag
238 // policies will handle ready-state CRoutines
239 {
240 ReadLockGuard<AtomicRWLock> lk(id_cr_lock_);
241 auto it = id_cr_.find(crid);
242 if (it != id_cr_.end()) {
243 cr = it->second;
244 pid = cr->processor_id();
245 if (cr->state() == RoutineState::DATA_WAIT ||
246 cr->state() == RoutineState::IO_WAIT) {
247 cr->SetUpdateFlag();
248 }
249 } else {
250 return false;
251 }
252 }
253
254 if (pid < proc_num_) {
255 static_cast<ChoreographyContext*>(pctxs_[pid].get())->Notify();
256 } else {
257 ClassicContext::Notify(cr->group_name());
258 }
259
260 return true;
261}
262
263} // namespace scheduler
264} // namespace cyber
265} // namespace apollo
#define DEFAULT_GROUP_NAME
#define cyber_unlikely(x)
Definition macros.h:30
#define AWARN
Definition log.h:43
bool PathExists(const std::string &path)
Check if the path exists.
Definition file.cc:195
bool GetProtoFromFile(const std::string &file_name, google::protobuf::Message *message)
Parses the content of the file specified by the file_name as a representation of protobufs,...
Definition file.cc:132
std::string GetAbsolutePath(const std::string &prefix, const std::string &relative_path)
Get absolute path by concatenating prefix and relative_path.
Definition file.cc:179
const std::string WorkRoot()
Definition environment.h:40
void SetSchedAffinity(std::thread *thread, const std::vector< int > &cpus, const std::string &affinity, int cpu_id)
Definition pin_thread.cc:52
void SetSchedPolicy(std::thread *thread, std::string spolicy, int sched_priority, pid_t tid)
Definition pin_thread.cc:75
void ParseCpuset(const std::string &str, std::vector< int > *cpuset)
Definition pin_thread.cc:26
class register implement
Definition arena_queue.h:37
optional SchedulerConf scheduler_conf
optional ChoreographyConf choreography_conf