41SchedulerClassic::SchedulerClassic() {
42 std::string conf(
"conf/");
43 conf.append(GlobalData::Instance()->ProcessGroup()).append(
".conf");
48 for (
auto& thr : cfg.scheduler_conf().threads()) {
58 for (
auto& group : classic_conf_.groups()) {
59 auto& group_name = group.name();
60 for (
auto task : group.tasks()) {
61 task.set_group_name(group_name);
62 cr_confs_[task.name()] = task;
68 uint32_t proc_num = 2;
69 auto& global_conf = GlobalData::Instance()->Config();
70 if (global_conf.has_scheduler_conf() &&
71 global_conf.scheduler_conf().has_default_proc_num()) {
72 proc_num = global_conf.scheduler_conf().default_proc_num();
76 auto sched_group = classic_conf_.add_groups();
78 sched_group->set_processor_num(proc_num);
84void SchedulerClassic::CreateProcessor() {
85 for (
auto& group : classic_conf_.groups()) {
86 auto& group_name = group.name();
87 auto proc_num = group.processor_num();
92 auto& affinity = group.affinity();
93 auto& processor_policy = group.processor_policy();
94 auto processor_prio = group.processor_prio();
95 std::vector<int> cpuset;
98 for (uint32_t i = 0; i < proc_num; i++) {
99 auto ctx = std::make_shared<ClassicContext>(group_name);
102 auto proc = std::make_shared<Processor>();
103 proc->BindContext(ctx);
118 std::lock_guard<std::mutex> wl_lg(
cr_wl_mtx_);
125 std::lock_guard<std::mutex> lg(wrapper->
Mutex());
135 if (cr_confs_.find(cr->name()) != cr_confs_.end()) {
137 cr->set_priority(task.
prio());
141 cr->set_group_name(classic_conf_.
groups(0).
name());
144 if (cr->priority() >= MAX_PRIO) {
145 AWARN << cr->name() <<
" prio is greater than MAX_PRIO[ << " << MAX_PRIO
147 cr->set_priority(MAX_PRIO - 1);
163bool SchedulerClassic::NotifyProcessor(uint64_t crid) {
172 if (cr->state() == RoutineState::DATA_WAIT ||
173 cr->state() == RoutineState::IO_WAIT) {
199 std::lock_guard<std::mutex> wl_lg(
cr_wl_mtx_);
206 std::lock_guard<std::mutex> lg(wrapper->
Mutex());
208 std::shared_ptr<CRoutine> cr =
nullptr;
static uint64_t GenerateHashId(const std::string &name)
static void Notify(const std::string &group_name)
static RQ_LOCK_GROUP rq_locks_
static CR_GROUP cr_group_
static bool RemoveCRoutine(const std::shared_ptr< CRoutine > &cr)
bool RemoveTask(const std::string &name) override
bool RemoveCRoutine(uint64_t crid) override
bool DispatchTask(const std::shared_ptr< CRoutine > &) override
std::string process_level_cpuset_
void ProcessLevelResourceControl()
std::vector< std::shared_ptr< Processor > > processors_
AtomicHashMap< uint64_t, MutexWrapper * > id_map_mutex_
std::unordered_map< uint64_t, std::shared_ptr< CRoutine > > id_cr_
std::unordered_map< std::string, InnerThread > inner_thr_confs_
std::vector< std::shared_ptr< ProcessorContext > > pctxs_
std::atomic< bool > stop_
#define DEFAULT_GROUP_NAME
#define cyber_unlikely(x)
bool PathExists(const std::string &path)
Check if the path exists.
bool GetProtoFromFile(const std::string &file_name, google::protobuf::Message *message)
Parses the content of the file specified by the file_name as a representation of protobufs,...
std::string GetAbsolutePath(const std::string &prefix, const std::string &relative_path)
Get absolute path by concatenating prefix and relative_path.
const std::string WorkRoot()
void SetSchedAffinity(std::thread *thread, const std::vector< int > &cpus, const std::string &affinity, int cpu_id)
void SetSchedPolicy(std::thread *thread, std::string spolicy, int sched_priority, pid_t tid)
void ParseCpuset(const std::string &str, std::vector< int > *cpuset)
repeated SchedGroup groups
optional string group_name
optional SchedulerConf scheduler_conf
optional ClassicConf classic_conf
optional string process_level_cpuset