Apollo 10.0
自动驾驶开放平台
protobuf_arena_manager.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2024 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16#ifndef CYBER_TRANSPORT_SHM_PROTOBUF_ARENA_MANAGER_H_
17#define CYBER_TRANSPORT_SHM_PROTOBUF_ARENA_MANAGER_H_
18
19#include <functional>
20#include <memory>
21#include <unordered_map>
22#include <vector>
23
24#include <google/protobuf/arena.h>
25
29#include "cyber/common/macros.h"
34
35namespace apollo {
36namespace cyber {
37namespace transport {
38
40 struct {
41 std::atomic<uint64_t> ref_count_;
42 std::atomic<bool> auto_extended_;
43 std::atomic<uint64_t> message_size_;
44 std::atomic<uint64_t> block_num_;
45 std::atomic<uint64_t> message_seq_;
46 std::mutex mutex_;
48 uint8_t bytes_[128];
49};
50
52 struct {
53 uint64_t size_;
54 std::atomic<uint64_t> writing_ref_count_;
55 std::atomic<uint64_t> reading_ref_count_;
58 uint8_t bytes_[128];
59};
60
62 uint64_t size_;
63 // std::atomic<uint64_t> writing_ref_count_;
64 // std::atomic<uint64_t> reading_ref_count_;
65 // base::PthreadRWLock read_write_mutex_;
66 static const int32_t kRWLockFree;
67 static const int32_t kWriteExclusive;
68 static const int32_t kMaxTryLockTimes;
69 std::atomic<int32_t> lock_num_ = {0};
70};
71
77
79 public:
81 explicit ArenaSegment(uint64_t channel_id);
82 ArenaSegment(uint64_t channel_id, void* base_address);
83 ArenaSegment(uint64_t channel_id, uint64_t message_size, uint64_t block_num,
84 void* base_address);
86
87 bool Init(uint64_t message_size, uint64_t block_num);
88 // bool Create(uint64_t message_size, uint64_t block_num);
89 bool Open(uint64_t message_size, uint64_t block_num);
90 bool OpenOrCreate(uint64_t message_size, uint64_t block_num);
91
92 void* GetShmAddress();
93
95 bool AddBlockWriteLock(uint64_t block_index);
96 void RemoveBlockWriteLock(uint64_t block_index);
97 bool AddBlockReadLock(uint64_t block_index);
98 void RemoveBlockReadLock(uint64_t block_index);
99
100 bool AcquireBlockToWrite(uint64_t size, ArenaSegmentBlockInfo* block_info);
101 void ReleaseWrittenBlock(const ArenaSegmentBlockInfo& block_info);
102
104 void ReleaseReadBlock(const ArenaSegmentBlockInfo& block_info);
105
106 // uint64_t GetCapicity();
107
108 public:
111 std::vector<std::shared_ptr<google::protobuf::Arena>> arenas_;
112 std::vector<uint64_t> arena_block_address_;
113 uint64_t channel_id_;
114 uint64_t key_id_;
117 std::shared_ptr<google::protobuf::Arena> shared_buffer_arena_;
118 void* arena_buffer_address_ = nullptr;
119
121};
122
124 struct {
125 uint64_t channel_id_;
127 uint64_t related_blocks_[4];
130 uint8_t bytes_[256];
131};
132
134 struct {
135 uint64_t version_;
138 uint64_t block_size_;
139 uint64_t block_num_;
141 uint64_t message_num_;
142 uint64_t message_seq_;
147 uint8_t bytes_[256];
148};
149
151 public:
152 using ArenaAllocCallback = std::function<void*(uint64_t)>;
153
155
156 uint64_t GetBaseAddress(const message::ArenaMessageWrapper* wrapper) override;
157
158 bool Enable();
159 bool EnableSegment(uint64_t channel_id);
160 bool Destroy();
161 // bool Shutdown();
162
164 uint64_t channel_id);
167 uint64_t offset);
169 std::vector<uint64_t> GetMessageRelatedBlocks(
173 uint64_t block_index);
174
175 std::shared_ptr<ArenaSegment> GetSegment(uint64_t channel_id);
176
178 const void* message) override;
179 void* GetMessage(message::ArenaMessageWrapper* wrapper) override;
180
181 void* GetAvailableBuffer(uint64_t channel_id) {
182 auto segment = this->GetSegment(channel_id);
183 if (!segment) {
184 if (non_arena_buffers_.find(channel_id) == non_arena_buffers_.end()) {
185 return nullptr;
186 }
187 return non_arena_buffers_[channel_id];
188 }
189 if (segment->arena_buffer_address_ != nullptr) {
190 return segment->arena_buffer_address_;
191 }
192 return non_arena_buffers_[channel_id];
193 }
194
195 template <typename T>
196 bool RegisterQueue(uint64_t channel_id, uint64_t size) {
197 if (non_arena_buffers_.find(channel_id) == non_arena_buffers_.end() ||
198 arena_buffer_callbacks_.find(channel_id) ==
199 arena_buffer_callbacks_.end()) {
200 auto non_arena_buffer_ptr = new apollo::cyber::base::ArenaQueue<T>();
201 non_arena_buffer_ptr->Init(size);
202 non_arena_buffers_[channel_id] = non_arena_buffer_ptr;
203 arena_buffer_callbacks_[channel_id] = [this, channel_id, size]() {
204 auto segment = this->GetSegment(channel_id);
205 if (!segment) {
206 ADEBUG << "channel id '" << channel_id << "' not enable";
207 ADEBUG << "fallback to use nomarl queue";
208 return;
209 }
210 if (segment->shared_buffer_arena_ == nullptr) {
211 ADEBUG << "Not enable arena shared buffer in channel id '"
212 << channel_id << "'";
213 ADEBUG << "fallback to use nomarl queue";
214 return;
215 }
216 if (segment->arena_buffer_address_ == nullptr) {
217 auto ptr = google::protobuf::Arena::Create<base::ArenaQueue<T>>(
218 segment->shared_buffer_arena_.get());
219 ptr->Init(size, segment->shared_buffer_arena_.get());
220 segment->arena_buffer_address_ = reinterpret_cast<void*>(ptr);
221 }
222 };
223 }
224 // try enable arena buffer
225 auto segment = GetSegment(channel_id);
226 if (segment) {
227 arena_buffer_callbacks_[channel_id]();
228 }
229 return true;
230 }
231
232 template <typename M,
233 typename std::enable_if<
234 google::protobuf::Arena::is_arena_constructable<M>::value,
235 M>::type* = nullptr>
236 void AcquireArenaMessage(uint64_t channel_id, std::shared_ptr<M>& ret_msg) {
237 auto arena_conf =
238 cyber::common::GlobalData::Instance()->GetChannelArenaConf(channel_id);
239 google::protobuf::ArenaOptions options;
240 options.start_block_size = arena_conf.max_msg_size();
241 options.max_block_size = arena_conf.max_msg_size();
242
243 auto segment = GetSegment(channel_id);
244 if (!segment) {
245 return;
246 }
247
249 // TODO(All): size should be send to
250 // AcquireBlockToWrite for dynamic adjust block size
251 // auto size = input_msg->ByteSizeLong();
252 uint64_t size = 0;
253 if (!segment->AcquireBlockToWrite(size, &wb)) {
254 return;
255 }
256 options.initial_block =
257 reinterpret_cast<char*>(segment->arena_block_address_[wb.block_index_]);
258 options.initial_block_size = segment->message_capacity_;
259 if (segment->arenas_[wb.block_index_] != nullptr) {
260 segment->arenas_[wb.block_index_] = nullptr;
261 }
262 segment->arenas_[wb.block_index_] =
263 std::make_shared<google::protobuf::Arena>(options);
264
265 // deconstructor do nothing to avoid proto
266 // instance deconstructed before arena allocator
267 ret_msg = std::shared_ptr<M>(
268 google::protobuf::Arena::CreateMessage<M>(
269 segment->arenas_[wb.block_index_].get()),
270 [segment, wb](M* ptr) {
271 int32_t lock_num = segment->blocks_[wb.block_index_].lock_num_.load();
272 if (lock_num < ArenaSegmentBlock::kRWLockFree) {
273 segment->ReleaseWrittenBlock(wb);
274 }
275 });
276 return;
277 }
278
279 template <typename M,
280 typename std::enable_if<
281 !google::protobuf::Arena::is_arena_constructable<M>::value,
282 M>::type* = nullptr>
283 void AcquireArenaMessage(uint64_t channel_id, std::shared_ptr<M>& ret_msg) {
284 return;
285 }
286
287 private:
288 bool init_;
289 std::unordered_map<uint64_t, std::shared_ptr<ArenaSegment>> segments_;
290 std::unordered_map<uint64_t, void*> non_arena_buffers_;
291 std::unordered_map<uint64_t, std::function<void()>> arena_buffer_callbacks_;
292 std::mutex segments_mutex_;
293
294 std::shared_ptr<ArenaAddressAllocator> address_allocator_;
295
296 static ArenaAllocCallback arena_alloc_cb_;
297 static void* ArenaAlloc(uint64_t size);
298 static void ArenaDealloc(void* addr, uint64_t size);
299
300 std::mutex arena_alloc_cb_mutex_;
301
302 std::unordered_map<uint64_t, uint64_t> managed_wrappers_;
303
305};
306
307// template <typename MessageT>
308// std::shared_ptr<MessageT> ProtobufArenaManager::CreateMessage(
309// message::ArenaMessageWrapper* wrapper, const MessageT& message) {}
310
311// template <typename MessageT>
312// std::shared_ptr<MessageT> ProtobufArenaManager::LoadMessage(
313// message::ArenaMessageWrapper* wrapper) {}
314
315} // namespace transport
316} // namespace cyber
317} // namespace apollo
318
319#endif
bool Init(uint64_t message_size, uint64_t block_num)
bool Open(uint64_t message_size, uint64_t block_num)
std::vector< std::shared_ptr< google::protobuf::Arena > > arenas_
bool OpenOrCreate(uint64_t message_size, uint64_t block_num)
std::shared_ptr< google::protobuf::Arena > shared_buffer_arena_
bool AcquireBlockToRead(ArenaSegmentBlockInfo *block_info)
void ReleaseReadBlock(const ArenaSegmentBlockInfo &block_info)
void ReleaseWrittenBlock(const ArenaSegmentBlockInfo &block_info)
bool AcquireBlockToWrite(uint64_t size, ArenaSegmentBlockInfo *block_info)
uint64_t GetBaseAddress(const message::ArenaMessageWrapper *wrapper) override
std::shared_ptr< ArenaSegment > GetSegment(uint64_t channel_id)
void ResetMessageRelatedBlocks(message::ArenaMessageWrapper *wrapper)
void AcquireArenaMessage(uint64_t channel_id, std::shared_ptr< M > &ret_msg)
bool RegisterQueue(uint64_t channel_id, uint64_t size)
void * GetMessage(message::ArenaMessageWrapper *wrapper) override
void SetMessageAddressOffset(message::ArenaMessageWrapper *wrapper, uint64_t offset)
uint64_t GetMessageAddressOffset(message::ArenaMessageWrapper *wrapper)
void SetMessageChannelId(message::ArenaMessageWrapper *wrapper, uint64_t channel_id)
uint64_t GetMessageChannelId(message::ArenaMessageWrapper *wrapper)
void * SetMessage(message::ArenaMessageWrapper *wrapper, const void *message) override
void AddMessageRelatedBlock(message::ArenaMessageWrapper *wrapper, uint64_t block_index)
std::vector< uint64_t > GetMessageRelatedBlocks(message::ArenaMessageWrapper *wrapper)
#define DECLARE_SINGLETON(classname)
Definition macros.h:52
int message_size
#define ADEBUG
Definition log.h:41
class register implement
Definition arena_queue.h:37
struct apollo::cyber::transport::ArenaSegmentBlockDescriptor::@4 struct_
struct apollo::cyber::transport::ArenaSegmentState::@3 struct_
struct apollo::cyber::transport::ExtendedStruct::@5 meta_
struct apollo::cyber::transport::ProtobufArenaManagerMeta::@6 struct_