43SchedulerChoreography::SchedulerChoreography()
44 : choreography_processor_prio_(0), pool_processor_prio_(0) {
45 std::string conf(
"conf/");
46 conf.append(GlobalData::Instance()->ProcessGroup()).append(
".conf");
51 for (
auto& thr : cfg.scheduler_conf().threads()) {
52 inner_thr_confs_[thr.name()] = thr;
57 ProcessLevelResourceControl();
64 choreography_processor_policy_ =
67 choreography_processor_prio_ =
77 for (
const auto& task : choreography_conf.tasks()) {
78 cr_confs_[task.name()] = task;
83 auto& global_conf = GlobalData::Instance()->Config();
84 if (global_conf.has_scheduler_conf() &&
85 global_conf.scheduler_conf().has_default_proc_num()) {
86 proc_num_ = global_conf.scheduler_conf().default_proc_num();
90 task_pool_size_ = proc_num_;
96void SchedulerChoreography::CreateProcessor() {
97 for (uint32_t i = 0; i < proc_num_; i++) {
98 auto proc = std::make_shared<Processor>();
99 auto ctx = std::make_shared<ChoreographyContext>();
101 proc->BindContext(ctx);
103 choreography_affinity_, i);
105 choreography_processor_prio_, proc->Tid());
106 pctxs_.emplace_back(ctx);
107 processors_.emplace_back(proc);
110 for (uint32_t i = 0; i < task_pool_size_; i++) {
111 auto proc = std::make_shared<Processor>();
112 auto ctx = std::make_shared<ClassicContext>();
114 proc->BindContext(ctx);
116 SetSchedPolicy(proc->Thread(), pool_processor_policy_, pool_processor_prio_,
118 pctxs_.emplace_back(ctx);
119 processors_.emplace_back(proc);
123bool SchedulerChoreography::DispatchTask(
const std::shared_ptr<CRoutine>& cr) {
127 if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
129 std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
130 if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
132 id_map_mutex_.Set(cr->id(), wrapper);
136 std::lock_guard<std::mutex> lg(wrapper->
Mutex());
139 if (cr_confs_.find(cr->name()) != cr_confs_.end()) {
141 cr->set_priority(taskconf.
prio());
143 if (taskconf.has_processor()) {
144 cr->set_processor_id(taskconf.
processor());
150 if (id_cr_.find(cr->id()) != id_cr_.end()) {
153 id_cr_[cr->id()] = cr;
157 uint32_t pid = cr->processor_id();
158 if (pid < proc_num_) {
163 if (cr->priority() >= MAX_PRIO) {
164 AWARN << cr->name() <<
" prio great than MAX_PRIO.";
165 cr->set_priority(MAX_PRIO - 1);
182bool SchedulerChoreography::RemoveTask(
const std::string& name) {
187 auto crid = GlobalData::GenerateHashId(name);
188 return RemoveCRoutine(crid);
191bool SchedulerChoreography::RemoveCRoutine(uint64_t crid) {
195 if (!id_map_mutex_.Get(crid, &wrapper)) {
197 std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
198 if (!id_map_mutex_.Get(crid, &wrapper)) {
200 id_map_mutex_.Set(crid, wrapper);
204 std::lock_guard<std::mutex> lg(wrapper->
Mutex());
206 std::shared_ptr<CRoutine> cr =
nullptr;
210 auto p = id_cr_.find(crid);
211 if (p != id_cr_.end()) {
213 pid = cr->processor_id();
214 id_cr_[crid]->Stop();
222 if (pid < proc_num_) {
224 ->RemoveCRoutine(crid);
226 return ClassicContext::RemoveCRoutine(cr);
230bool SchedulerChoreography::NotifyProcessor(uint64_t crid) {
235 std::shared_ptr<CRoutine> cr;
240 ReadLockGuard<AtomicRWLock> lk(id_cr_lock_);
241 auto it = id_cr_.find(crid);
242 if (it != id_cr_.end()) {
244 pid = cr->processor_id();
245 if (cr->state() == RoutineState::DATA_WAIT ||
246 cr->state() == RoutineState::IO_WAIT) {
254 if (pid < proc_num_) {
255 static_cast<ChoreographyContext*
>(pctxs_[pid].get())->Notify();
257 ClassicContext::Notify(cr->group_name());
#define DEFAULT_GROUP_NAME
#define cyber_unlikely(x)
bool PathExists(const std::string &path)
Check if the path exists.
bool GetProtoFromFile(const std::string &file_name, google::protobuf::Message *message)
Parses the content of the file specified by the file_name as a representation of protobufs,...
std::string GetAbsolutePath(const std::string &prefix, const std::string &relative_path)
Get absolute path by concatenating prefix and relative_path.
const std::string WorkRoot()
void SetSchedAffinity(std::thread *thread, const std::vector< int > &cpus, const std::string &affinity, int cpu_id)
void SetSchedPolicy(std::thread *thread, std::string spolicy, int sched_priority, pid_t tid)
void ParseCpuset(const std::string &str, std::vector< int > *cpuset)
optional string pool_affinity
optional string choreography_affinity
optional string pool_cpuset
optional int32 choreography_processor_prio
optional string choreography_cpuset
optional int32 pool_processor_prio
optional string choreography_processor_policy
optional uint32 pool_processor_num
optional uint32 choreography_processor_num
optional string pool_processor_policy
optional SchedulerConf scheduler_conf
optional string process_level_cpuset
optional ChoreographyConf choreography_conf