Apollo 10.0
自动驾驶开放平台
channel_buffer.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_DATA_CHANNEL_BUFFER_H_
18#define CYBER_DATA_CHANNEL_BUFFER_H_
19
20#include <algorithm>
21#include <functional>
22#include <memory>
23#include <vector>
24
26#include "cyber/common/log.h"
28
29namespace apollo {
30namespace cyber {
31namespace data {
32
34
35template <typename T>
37 public:
40 : channel_id_(channel_id), buffer_(buffer) {}
41
42 bool Fetch(uint64_t* index, std::shared_ptr<T>& m); // NOLINT
43
44 bool Latest(std::shared_ptr<T>& m); // NOLINT
45
46 bool FetchMulti(uint64_t fetch_size, std::vector<std::shared_ptr<T>>* vec);
47
48 uint64_t channel_id() const { return channel_id_; }
49 std::shared_ptr<BufferType> Buffer() const { return buffer_; }
50
51 private:
52 uint64_t channel_id_;
53 std::shared_ptr<BufferType> buffer_;
54};
55
56template <typename T>
57bool ChannelBuffer<T>::Fetch(uint64_t* index,
58 std::shared_ptr<T>& m) { // NOLINT
59 std::lock_guard<std::mutex> lock(buffer_->Mutex());
60 if (buffer_->Empty()) {
61 return false;
62 }
63
64 if (*index == 0) {
65 *index = buffer_->Tail();
66 } else if (*index == buffer_->Tail() + 1) {
67 return false;
68 } else if (*index < buffer_->Head()) {
69 auto interval = buffer_->Tail() - *index;
70 AWARN << "channel[" << GlobalData::GetChannelById(channel_id_) << "] "
71 << "read buffer overflow, drop_message[" << interval << "] pre_index["
72 << *index << "] current_index[" << buffer_->Tail() << "] ";
73 *index = buffer_->Tail();
74 }
75 m = buffer_->at(*index);
76 return true;
77}
78
79template <typename T>
80bool ChannelBuffer<T>::Latest(std::shared_ptr<T>& m) { // NOLINT
81 std::lock_guard<std::mutex> lock(buffer_->Mutex());
82 if (buffer_->Empty()) {
83 return false;
84 }
85
86 m = buffer_->Back();
87 return true;
88}
89
90template <typename T>
91bool ChannelBuffer<T>::FetchMulti(uint64_t fetch_size,
92 std::vector<std::shared_ptr<T>>* vec) {
93 std::lock_guard<std::mutex> lock(buffer_->Mutex());
94 if (buffer_->Empty()) {
95 return false;
96 }
97
98 auto num = std::min(buffer_->Size(), fetch_size);
99 vec->reserve(num);
100 for (auto index = buffer_->Tail() - num + 1; index <= buffer_->Tail();
101 ++index) {
102 vec->emplace_back(buffer_->at(index));
103 }
104 return true;
105}
106
107} // namespace data
108} // namespace cyber
109} // namespace apollo
110
111#endif // CYBER_DATA_CHANNEL_BUFFER_H_
static std::string GetChannelById(uint64_t id)
bool Latest(std::shared_ptr< T > &m)
std::shared_ptr< BufferType > Buffer() const
CacheBuffer< std::shared_ptr< T > > BufferType
bool Fetch(uint64_t *index, std::shared_ptr< T > &m)
bool FetchMulti(uint64_t fetch_size, std::vector< std::shared_ptr< T > > *vec)
ChannelBuffer(uint64_t channel_id, BufferType *buffer)
#define AWARN
Definition log.h:43
class register implement
Definition arena_queue.h:37