Apollo
10.0
自动驾驶开放平台
choreography_context.cc
浏览该文件的文档.
1
/******************************************************************************
2
* Copyright 2018 The Apollo Authors. All Rights Reserved.
3
*
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
* you may not use this file except in compliance with the License.
6
* You may obtain a copy of the License at
7
*
8
* http://www.apache.org/licenses/LICENSE-2.0
9
*
10
* Unless required by applicable law or agreed to in writing, software
11
* distributed under the License is distributed on an "AS IS" BASIS,
12
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
* See the License for the specific language governing permissions and
14
* limitations under the License.
15
*****************************************************************************/
16
17
#include "
cyber/scheduler/policy/choreography_context.h
"
18
19
#include <limits>
20
#include <unordered_map>
21
#include <utility>
22
#include <vector>
23
24
#include "
cyber/common/types.h
"
25
26
namespace
apollo
{
27
namespace
cyber {
28
namespace
scheduler {
29
30
using
apollo::cyber::base::ReadLockGuard
;
31
using
apollo::cyber::base::WriteLockGuard
;
32
33
using
apollo::cyber::croutine::RoutineState
;
34
35
std::shared_ptr<CRoutine>
ChoreographyContext::NextRoutine
() {
36
if
(
cyber_unlikely
(
stop_
.load())) {
37
return
nullptr
;
38
}
39
40
ReadLockGuard<AtomicRWLock>
lock(rq_lk_);
41
for
(
auto
it : cr_queue_) {
42
auto
cr = it.second;
43
if
(!cr->Acquire()) {
44
continue
;
45
}
46
47
if
(cr->UpdateState() == RoutineState::READY) {
48
return
cr;
49
}
50
cr->Release();
51
}
52
return
nullptr
;
53
}
54
55
bool
ChoreographyContext::Enqueue
(
const
std::shared_ptr<CRoutine>& cr) {
56
WriteLockGuard<AtomicRWLock>
lock(rq_lk_);
57
cr_queue_.emplace(cr->priority(), cr);
58
return
true
;
59
}
60
61
void
ChoreographyContext::Notify
() {
62
mtx_wq_.lock();
63
notify++;
64
mtx_wq_.unlock();
65
cv_wq_.notify_one();
66
}
67
68
void
ChoreographyContext::Wait
() {
69
std::unique_lock<std::mutex> lk(mtx_wq_);
70
cv_wq_.wait_for(lk, std::chrono::milliseconds(1000),
71
[&]() {
return
notify > 0; });
72
if
(notify > 0) {
73
notify--;
74
}
75
}
76
77
void
ChoreographyContext::Shutdown
() {
78
stop_
.store(
true
);
79
mtx_wq_.lock();
80
notify = std::numeric_limits<unsigned char>::max();
81
mtx_wq_.unlock();
82
cv_wq_.notify_all();
83
}
84
85
bool
ChoreographyContext::RemoveCRoutine
(uint64_t crid) {
86
WriteLockGuard<AtomicRWLock>
lock(rq_lk_);
87
for
(
auto
it = cr_queue_.begin(); it != cr_queue_.end();) {
88
auto
cr = it->second;
89
if
(cr->id() == crid) {
90
cr->Stop();
91
while
(!cr->Acquire()) {
92
std::this_thread::sleep_for(std::chrono::milliseconds(1));
93
AINFO_EVERY
(1000) <<
"waiting for task "
<< cr->name() <<
" completion"
;
94
}
95
it = cr_queue_.erase(it);
96
cr->Release();
97
return
true
;
98
}
99
++it;
100
}
101
return
false
;
102
}
103
}
// namespace scheduler
104
}
// namespace cyber
105
}
// namespace apollo
choreography_context.h
apollo::cyber::base::ReadLockGuard
Definition
rw_lock_guard.h:35
apollo::cyber::base::WriteLockGuard
Definition
rw_lock_guard.h:48
apollo::cyber::scheduler::ChoreographyContext::RemoveCRoutine
bool RemoveCRoutine(uint64_t crid)
Definition
choreography_context.cc:85
apollo::cyber::scheduler::ChoreographyContext::NextRoutine
std::shared_ptr< CRoutine > NextRoutine() override
Definition
choreography_context.cc:35
apollo::cyber::scheduler::ChoreographyContext::Enqueue
bool Enqueue(const std::shared_ptr< CRoutine > &)
Definition
choreography_context.cc:55
apollo::cyber::scheduler::ChoreographyContext::Notify
void Notify()
Definition
choreography_context.cc:61
apollo::cyber::scheduler::ChoreographyContext::Wait
void Wait() override
Definition
choreography_context.cc:68
apollo::cyber::scheduler::ChoreographyContext::Shutdown
void Shutdown() override
Definition
choreography_context.cc:77
apollo::cyber::scheduler::ProcessorContext::stop_
std::atomic< bool > stop_
Definition
processor_context.h:40
cyber_unlikely
#define cyber_unlikely(x)
Definition
macros.h:30
types.h
AINFO_EVERY
#define AINFO_EVERY(freq)
Definition
log.h:82
apollo::cyber::croutine::RoutineState
RoutineState
Definition
croutine.h:38
apollo
class register implement
Definition
arena_queue.h:37
cyber
scheduler
policy
choreography_context.cc