Apollo 10.0
自动驾驶开放平台
data_dispatcher.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_DATA_DATA_DISPATCHER_H_
18#define CYBER_DATA_DATA_DISPATCHER_H_
19
20#include <memory>
21#include <mutex>
22#include <vector>
23
24#include "cyber/common/log.h"
25#include "cyber/common/macros.h"
27#include "cyber/state.h"
28#include "cyber/time/time.h"
29
30namespace apollo {
31namespace cyber {
32namespace data {
33
36
37template <typename T>
39 public:
41 std::vector<std::weak_ptr<CacheBuffer<std::shared_ptr<T>>>>;
43
44 void AddBuffer(const ChannelBuffer<T>& channel_buffer);
45
46 bool Dispatch(const uint64_t channel_id, const std::shared_ptr<T>& msg);
47
48 private:
49 DataNotifier* notifier_ = DataNotifier::Instance();
50 std::mutex buffers_map_mutex_;
52
54};
55
56template <typename T>
57inline DataDispatcher<T>::DataDispatcher() {}
58
59template <typename T>
61 std::lock_guard<std::mutex> lock(buffers_map_mutex_);
62 auto buffer = channel_buffer.Buffer();
63 BufferVector* buffers = nullptr;
64 if (buffers_map_.Get(channel_buffer.channel_id(), &buffers)) {
65 buffers->emplace_back(buffer);
66 } else {
67 BufferVector new_buffers = {buffer};
68 buffers_map_.Set(channel_buffer.channel_id(), new_buffers);
69 }
70}
71
72template <typename T>
73bool DataDispatcher<T>::Dispatch(const uint64_t channel_id,
74 const std::shared_ptr<T>& msg) {
75 BufferVector* buffers = nullptr;
77 return false;
78 }
79 if (buffers_map_.Get(channel_id, &buffers)) {
80 for (auto& buffer_wptr : *buffers) {
81 if (auto buffer = buffer_wptr.lock()) {
82 std::lock_guard<std::mutex> lock(buffer->Mutex());
83 buffer->Fill(msg);
84 }
85 }
86 } else {
87 return false;
88 }
89 return notifier_->Notify(channel_id);
90}
91
92} // namespace data
93} // namespace cyber
94} // namespace apollo
95
96#endif // CYBER_DATA_DATA_DISPATCHER_H_
Cyber has builtin time type Time.
Definition time.h:31
A implementation of lock-free fixed size hash map
std::shared_ptr< BufferType > Buffer() const
void AddBuffer(const ChannelBuffer< T > &channel_buffer)
bool Dispatch(const uint64_t channel_id, const std::shared_ptr< T > &msg)
std::vector< std::weak_ptr< CacheBuffer< std::shared_ptr< T > > > > BufferVector
#define DECLARE_SINGLETON(classname)
Definition macros.h:52
bool IsShutdown()
Definition state.h:46
class register implement
Definition arena_queue.h:37