Apollo 10.0
自动驾驶开放平台
scheduler.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Vesched_infoon 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <sched.h>
20
21#include <utility>
22
24#include "cyber/common/file.h"
26#include "cyber/common/util.h"
30
31namespace apollo {
32namespace cyber {
33namespace scheduler {
34
36
38 const std::string& name) {
39 return CreateTask(factory.create_routine(), name, factory.GetDataVisitor());
40}
41
42bool Scheduler::CreateTask(std::function<void()>&& func,
43 const std::string& name,
44 std::shared_ptr<DataVisitorBase> visitor) {
45 if (cyber_unlikely(stop_.load())) {
46 ADEBUG << "scheduler is stoped, cannot create task!";
47 return false;
48 }
49
50 auto task_id = GlobalData::RegisterTaskName(name);
51
52 auto cr = std::make_shared<CRoutine>(func);
53 cr->set_id(task_id);
54 cr->set_name(name);
55 AINFO << "create croutine: " << name;
56
57 if (!DispatchTask(cr)) {
58 return false;
59 }
60
61 if (visitor != nullptr) {
62 visitor->RegisterNotifyCallback([this, task_id]() {
63 if (cyber_unlikely(stop_.load())) {
64 return;
65 }
66 this->NotifyProcessor(task_id);
67 });
68 }
69 return true;
70}
71
72bool Scheduler::NotifyTask(uint64_t crid) {
73 if (cyber_unlikely(stop_.load())) {
74 return true;
75 }
76 return NotifyProcessor(crid);
77}
78
80 std::vector<int> cpus;
82 cpu_set_t set;
83 CPU_ZERO(&set);
84 for (const auto cpu : cpus) {
85 CPU_SET(cpu, &set);
86 }
87 pthread_setaffinity_np(pthread_self(), sizeof(set), &set);
88}
89
90void Scheduler::SetInnerThreadAttr(const std::string& name, std::thread* thr) {
91 if (thr != nullptr && inner_thr_confs_.find(name) != inner_thr_confs_.end()) {
92 auto th_conf = inner_thr_confs_[name];
93 auto cpuset = th_conf.cpuset();
94
95 std::vector<int> cpus;
96 ParseCpuset(cpuset, &cpus);
97 SetSchedAffinity(thr, cpus, "range");
98 SetSchedPolicy(thr, th_conf.policy(), th_conf.prio());
99 }
100}
101
103 std::string snap_info;
104 auto now = Time::Now().ToNanosecond();
105 for (auto processor : processors_) {
106 auto snap = processor->ProcSnapshot();
107 if (snap->execute_start_time.load()) {
108 auto execute_time = (now - snap->execute_start_time.load()) / 1000000;
109 snap_info.append(std::to_string(snap->processor_id.load()))
110 .append(":")
111 .append(snap->routine_name)
112 .append(":")
113 .append(std::to_string(execute_time));
114 } else {
115 snap_info.append(std::to_string(snap->processor_id.load()))
116 .append(":idle");
117 }
118 snap_info.append(", ");
119 }
120 snap_info.append("timestamp: ").append(std::to_string(now));
121 AINFO << snap_info;
122 snap_info.clear();
123}
124
126 if (cyber_unlikely(stop_.exchange(true))) {
127 return;
128 }
129
130 for (auto& ctx : pctxs_) {
131 ctx->Shutdown();
132 }
133
134 std::vector<uint64_t> cr_list;
135 {
137 for (auto& cr : id_cr_) {
138 cr_list.emplace_back(cr.second->id());
139 }
140 }
141
142 for (auto& id : cr_list) {
143 RemoveCRoutine(id);
144 }
145
146 for (auto& processor : processors_) {
147 processor->Stop();
148 }
149
150 processors_.clear();
151 pctxs_.clear();
152}
153} // namespace scheduler
154} // namespace cyber
155} // namespace apollo
uint64_t ToNanosecond() const
convert time to nanosecond.
Definition time.cc:83
static Time Now()
get the current time.
Definition time.cc:57
static uint64_t RegisterTaskName(const std::string &task_name)
std::shared_ptr< data::DataVisitorBase > GetDataVisitor() const
std::vector< std::shared_ptr< Processor > > processors_
Definition scheduler.h:96
virtual bool DispatchTask(const std::shared_ptr< CRoutine > &)=0
std::unordered_map< uint64_t, std::shared_ptr< CRoutine > > id_cr_
Definition scheduler.h:94
std::unordered_map< std::string, InnerThread > inner_thr_confs_
Definition scheduler.h:98
std::vector< std::shared_ptr< ProcessorContext > > pctxs_
Definition scheduler.h:95
void SetInnerThreadAttr(const std::string &name, std::thread *thr)
Definition scheduler.cc:90
virtual bool NotifyProcessor(uint64_t crid)=0
bool CreateTask(const RoutineFactory &factory, const std::string &name)
Definition scheduler.cc:37
virtual bool RemoveCRoutine(uint64_t crid)=0
#define cyber_unlikely(x)
Definition macros.h:30
#define ADEBUG
Definition log.h:41
#define AINFO
Definition log.h:42
void SetSchedAffinity(std::thread *thread, const std::vector< int > &cpus, const std::string &affinity, int cpu_id)
Definition pin_thread.cc:52
void SetSchedPolicy(std::thread *thread, std::string spolicy, int sched_priority, pid_t tid)
Definition pin_thread.cc:75
void ParseCpuset(const std::string &str, std::vector< int > *cpuset)
Definition pin_thread.cc:26
class register implement
Definition arena_queue.h:37