Apollo 10.0
自动驾驶开放平台
choreography_context.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
18
19#include <limits>
20#include <unordered_map>
21#include <utility>
22#include <vector>
23
24#include "cyber/common/types.h"
25
26namespace apollo {
27namespace cyber {
28namespace scheduler {
29
32
34
35std::shared_ptr<CRoutine> ChoreographyContext::NextRoutine() {
36 if (cyber_unlikely(stop_.load())) {
37 return nullptr;
38 }
39
40 ReadLockGuard<AtomicRWLock> lock(rq_lk_);
41 for (auto it : cr_queue_) {
42 auto cr = it.second;
43 if (!cr->Acquire()) {
44 continue;
45 }
46
47 if (cr->UpdateState() == RoutineState::READY) {
48 return cr;
49 }
50 cr->Release();
51 }
52 return nullptr;
53}
54
55bool ChoreographyContext::Enqueue(const std::shared_ptr<CRoutine>& cr) {
57 cr_queue_.emplace(cr->priority(), cr);
58 return true;
59}
60
62 mtx_wq_.lock();
63 notify++;
64 mtx_wq_.unlock();
65 cv_wq_.notify_one();
66}
67
69 std::unique_lock<std::mutex> lk(mtx_wq_);
70 cv_wq_.wait_for(lk, std::chrono::milliseconds(1000),
71 [&]() { return notify > 0; });
72 if (notify > 0) {
73 notify--;
74 }
75}
76
78 stop_.store(true);
79 mtx_wq_.lock();
80 notify = std::numeric_limits<unsigned char>::max();
81 mtx_wq_.unlock();
82 cv_wq_.notify_all();
83}
84
87 for (auto it = cr_queue_.begin(); it != cr_queue_.end();) {
88 auto cr = it->second;
89 if (cr->id() == crid) {
90 cr->Stop();
91 while (!cr->Acquire()) {
92 std::this_thread::sleep_for(std::chrono::milliseconds(1));
93 AINFO_EVERY(1000) << "waiting for task " << cr->name() << " completion";
94 }
95 it = cr_queue_.erase(it);
96 cr->Release();
97 return true;
98 }
99 ++it;
100 }
101 return false;
102}
103} // namespace scheduler
104} // namespace cyber
105} // namespace apollo
std::shared_ptr< CRoutine > NextRoutine() override
bool Enqueue(const std::shared_ptr< CRoutine > &)
#define cyber_unlikely(x)
Definition macros.h:30
#define AINFO_EVERY(freq)
Definition log.h:82
class register implement
Definition arena_queue.h:37