38 const std::string& name) {
43 const std::string& name,
44 std::shared_ptr<DataVisitorBase> visitor) {
46 ADEBUG <<
"scheduler is stoped, cannot create task!";
52 auto cr = std::make_shared<CRoutine>(func);
55 AINFO <<
"create croutine: " << name;
61 if (visitor !=
nullptr) {
62 visitor->RegisterNotifyCallback([
this, task_id]() {
66 this->NotifyProcessor(task_id);
80 std::vector<int> cpus;
84 for (
const auto cpu : cpus) {
87 pthread_setaffinity_np(pthread_self(),
sizeof(set), &set);
93 auto cpuset = th_conf.cpuset();
95 std::vector<int> cpus;
103 std::string snap_info;
106 auto snap = processor->ProcSnapshot();
107 if (snap->execute_start_time.load()) {
108 auto execute_time = (now - snap->execute_start_time.load()) / 1000000;
109 snap_info.append(std::to_string(snap->processor_id.load()))
111 .append(snap->routine_name)
113 .append(std::to_string(execute_time));
115 snap_info.append(std::to_string(snap->processor_id.load()))
118 snap_info.append(
", ");
120 snap_info.append(
"timestamp: ").append(std::to_string(now));
130 for (
auto& ctx :
pctxs_) {
134 std::vector<uint64_t> cr_list;
138 cr_list.emplace_back(cr.second->id());
142 for (
auto&
id : cr_list) {
uint64_t ToNanosecond() const
convert time to nanosecond.
static Time Now()
get the current time.
static uint64_t RegisterTaskName(const std::string &task_name)
std::shared_ptr< data::DataVisitorBase > GetDataVisitor() const
CreateRoutineFunc create_routine
std::string process_level_cpuset_
void ProcessLevelResourceControl()
std::vector< std::shared_ptr< Processor > > processors_
bool NotifyTask(uint64_t crid)
virtual bool DispatchTask(const std::shared_ptr< CRoutine > &)=0
std::unordered_map< uint64_t, std::shared_ptr< CRoutine > > id_cr_
std::unordered_map< std::string, InnerThread > inner_thr_confs_
std::vector< std::shared_ptr< ProcessorContext > > pctxs_
void SetInnerThreadAttr(const std::string &name, std::thread *thr)
virtual bool NotifyProcessor(uint64_t crid)=0
bool CreateTask(const RoutineFactory &factory, const std::string &name)
std::atomic< bool > stop_
virtual bool RemoveCRoutine(uint64_t crid)=0
#define cyber_unlikely(x)
void SetSchedAffinity(std::thread *thread, const std::vector< int > &cpus, const std::string &affinity, int cpu_id)
void SetSchedPolicy(std::thread *thread, std::string spolicy, int sched_priority, pid_t tid)
void ParseCpuset(const std::string &str, std::vector< int > *cpuset)