Apollo 10.0
自动驾驶开放平台
classic_context.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <limits>
20
21namespace apollo {
22namespace cyber {
23namespace scheduler {
24
30
36
38
39ClassicContext::ClassicContext(const std::string& group_name) {
40 InitGroup(group_name);
41}
42
43void ClassicContext::InitGroup(const std::string& group_name) {
44 multi_pri_rq_ = &cr_group_[group_name];
45 lq_ = &rq_locks_[group_name];
46 mtx_wrapper_ = &mtx_wq_[group_name];
47 cw_ = &cv_wq_[group_name];
48 notify_grp_[group_name] = 0;
49 current_grp = group_name;
50}
51
52std::shared_ptr<CRoutine> ClassicContext::NextRoutine() {
53 if (cyber_unlikely(stop_.load())) {
54 return nullptr;
55 }
56
57 for (int i = MAX_PRIO - 1; i >= 0; --i) {
58 ReadLockGuard<AtomicRWLock> lk(lq_->at(i));
59 for (auto& cr : multi_pri_rq_->at(i)) {
60 if (!cr->Acquire()) {
61 continue;
62 }
63
64 if (cr->UpdateState() == RoutineState::READY) {
65 return cr;
66 }
67
68 cr->Release();
69 }
70 }
71
72 return nullptr;
73}
74
76 std::unique_lock<std::mutex> lk(mtx_wrapper_->Mutex());
77 cw_->Cv().wait_for(lk, std::chrono::milliseconds(1000),
78 [&]() { return notify_grp_[current_grp] > 0; });
79 if (notify_grp_[current_grp] > 0) {
80 notify_grp_[current_grp]--;
81 }
82}
83
85 stop_.store(true);
86 mtx_wrapper_->Mutex().lock();
87 notify_grp_[current_grp] = std::numeric_limits<unsigned char>::max();
88 mtx_wrapper_->Mutex().unlock();
89 cw_->Cv().notify_all();
90}
91
92void ClassicContext::Notify(const std::string& group_name) {
93 (&mtx_wq_[group_name])->Mutex().lock();
94 notify_grp_[group_name]++;
95 (&mtx_wq_[group_name])->Mutex().unlock();
96 cv_wq_[group_name].Cv().notify_one();
97}
98
99bool ClassicContext::RemoveCRoutine(const std::shared_ptr<CRoutine>& cr) {
100 auto grp = cr->group_name();
101 auto prio = cr->priority();
102 auto crid = cr->id();
104 auto& croutines = ClassicContext::cr_group_[grp].at(prio);
105 for (auto it = croutines.begin(); it != croutines.end(); ++it) {
106 if ((*it)->id() == crid) {
107 auto cr = *it;
108 cr->Stop();
109 while (!cr->Acquire()) {
110 std::this_thread::sleep_for(std::chrono::microseconds(1));
111 AINFO_EVERY(1000) << "waiting for task " << cr->name() << " completion";
112 }
113 croutines.erase(it);
114 cr->Release();
115 return true;
116 }
117 }
118 return false;
119}
120
121} // namespace scheduler
122} // namespace cyber
123} // namespace apollo
static void Notify(const std::string &group_name)
static bool RemoveCRoutine(const std::shared_ptr< CRoutine > &cr)
std::shared_ptr< CRoutine > NextRoutine() override
std::condition_variable & Cv()
Definition cv_wrapper.h:29
#define DEFAULT_GROUP_NAME
#define cyber_unlikely(x)
Definition macros.h:30
#define CACHELINE_SIZE
Definition macros.h:33
#define AINFO_EVERY(freq)
Definition log.h:82
std::unordered_map< std::string, MutexWrapper > GRP_WQ_MUTEX
std::unordered_map< std::string, CvWrapper > GRP_WQ_CV
std::unordered_map< std::string, LOCK_QUEUE > RQ_LOCK_GROUP
std::unordered_map< std::string, MULTI_PRIO_QUEUE > CR_GROUP
std::unordered_map< std::string, int > NOTIFY_GRP
class register implement
Definition arena_queue.h:37