Apollo 10.0
自动驾驶开放平台
cyber_benchmark_writer.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2024 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#include <getopt.h>
18#include <libgen.h>
19#include <memory>
20#include <string>
21#include <vector>
22
23#include "cyber/benchmark/benchmark_msg.pb.h"
24#include "cyber/cyber.h"
25#include "cyber/time/duration.h"
26#include "cyber/time/time.h"
27
28#if __has_include("gperftools/profiler.h")
29#include "gperftools/heap-profiler.h"
30#include "gperftools/malloc_extension.h"
31#include "gperftools/profiler.h"
32#endif
33
34std::string BINARY_NAME = "cyber_benchmark_writer"; // NOLINT
35
36int message_size = -1;
38int qos_policy = 0;
39int data_type = 0;
40int running_time = 10;
41bool enable_cpuprofile = false;
42bool enable_heapprofile = false;
43std::string profile_filename = "cyber_benchmark_writer_cpu.prof"; // NOLINT
44std::string heapprofile_filename = "cyber_benchmark_writer_mem.prof"; // NOLINT
45
47 AINFO << "Usage: \n " << BINARY_NAME << " [OPTION]...\n"
48 << "Description: \n"
49 << " -h, --help: help information \n"
50 << " -s, --message_size=message_size: transport message size\n"
51 << " -t, --transport_freq=transmission_frequency: transmission "
52 "frequency\n" // NOLINT
53 << " -q, --qos_policy=qos_reliable_policy: set qos reliable policy, "
54 "0 is Reliable, 1 is Best effort, default value is 0\n"
55 << " -d, --data_type=data_type: transport data type, "
56 "0 is bytes, 1 is repeated field, default value is 0\n"
57 << " -T, --time=time: running time, default value is 10 seconds\n"
58 << " -c, --cpuprofile: enable gperftools cpu profile\n"
59 << " -o, --profile_filename=filename: the filename to dump the "
60 "profile to, default value is cyber_benchmark_writer_cpu.prof. Only "
61 "work " // NOLINT
62 "with -c option\n"
63 << " -H, --heapprofile: enable gperftools heap profile\n"
64 << " -O, --heapprofile_filename=filename: the filename to dump the "
65 "profile to, default value is cyber_benchmark_writer_mem.prof. Only "
66 "work " // NOLINT
67 "with -H option\n"
68 << "Example:\n"
69 << " " << BINARY_NAME << " -h\n"
70 << " " << BINARY_NAME << " -s 64K -t 10\n"
71 << " " << BINARY_NAME << " -s 64K -t 10 -c -H ";
72}
73
74void GetOptions(const int argc, char* const argv[]) {
75 opterr = 0; // extern int opterr
76 int long_index = 0;
77 const std::string short_opts = "hs:t:q:d:T:co:HO:";
78 static const struct option long_opts[] = {
79 {"help", no_argument, nullptr, 'h'},
80 {"message_size", required_argument, nullptr, 's'},
81 {"transport_freq", required_argument, nullptr, 't'},
82 {"qos_policy", required_argument, nullptr, 'q'},
83 {"data_type", required_argument, nullptr, 'd'},
84 {"time", required_argument, nullptr, 'T'},
85 {"cpuprofile", no_argument, nullptr, 'c'},
86 {"profile_filename", required_argument, nullptr, 'o'},
87 {"heapprofile", no_argument, nullptr, 'H'},
88 {"heapprofile_filename", required_argument, nullptr, 'O'},
89 {NULL, no_argument, nullptr, 0}};
90
91 // log command for info
92 std::string cmd("");
93 for (int i = 0; i < argc; ++i) {
94 cmd += argv[i];
95 cmd += " ";
96 }
97 AINFO << "command: " << cmd;
98
99 if (1 == argc) {
100 DisplayUsage();
101 exit(0);
102 }
103
104 do {
105 int opt =
106 getopt_long(argc, argv, short_opts.c_str(), long_opts, &long_index);
107 if (opt == -1) {
108 break;
109 }
110 int base_size = 1;
111 std::string arg;
112 switch (opt) {
113 case 's':
114 arg = std::string(optarg);
115 switch (arg[arg.length() - 1]) {
116 case 'B':
117 base_size = 1;
118 break;
119 case 'K':
120 base_size = 1024;
121 break;
122 case 'M':
123 base_size = 1024 * 1024;
124 break;
125 default:
126 AERROR << "Invalid identifier. It should be 'K' or 'M' or 'B'";
127 exit(-1);
128 }
129 message_size = std::stoi(arg.substr(0, arg.length() - 1)) * base_size;
130 if (message_size < 0 || message_size % 4 != 0) {
131 AERROR << "Invalid message size.";
132 exit(-1);
133 }
134 break;
135 case 't':
136 transport_freq = std::stoi(std::string(optarg));
137 break;
138 case 'T':
139 running_time = std::stoi(std::string(optarg));
140 if (running_time < 0) {
141 AERROR << "Invalid running time. It should greater than 0";
142 exit(-1);
143 }
144 break;
145 case 'q':
146 qos_policy = std::stoi(std::string(optarg));
147 if (qos_policy != 0 && qos_policy != 1) {
148 AERROR << "Invalid qos_policy. It should be 0 or 1";
149 exit(-1);
150 }
151 break;
152 case 'd':
153 data_type = std::stoi(std::string(optarg));
154 if (data_type != 0 && data_type != 1) {
155 AERROR << "Invalid data_type. It should be 0 or 1";
156 exit(-1);
157 }
158 break;
159 case 'c':
160#ifndef BASE_PROFILER_H_
161 AWARN << "gperftools not installed, ignore perf parameters";
162#endif
163 enable_cpuprofile = true;
164 break;
165 case 'o':
166 profile_filename = std::string(optarg);
167 break;
168 case 'H':
169#ifndef BASE_PROFILER_H_
170 AWARN << "gperftools not installed, ignore perf parameters";
171#endif
172 enable_heapprofile = true;
173 break;
174 case 'O':
175 heapprofile_filename = std::string(optarg);
176 break;
177 case 'h':
178 DisplayUsage();
179 exit(0);
180 default:
181 break;
182 }
183 } while (true);
184
185 if (optind < argc) {
186 AINFO << "Found non-option ARGV-element \"" << argv[optind++] << "\"";
187 DisplayUsage();
188 exit(1);
189 }
190
191 if (message_size == -1) {
192 AINFO << "-s parameters must be specified";
193 DisplayUsage();
194 exit(1);
195 }
196}
197
198int main(int argc, char** argv) {
199 GetOptions(argc, argv);
200 google::SetCommandLineOption("bvar_dump_interval", "1");
201
203
205 auto test_time =
206 std::make_shared<::bvar::Status<double>>(BINARY_NAME + "-test-time", 0);
207 auto test_message_size = std::make_shared<::bvar::Status<uint64_t>>(
208 BINARY_NAME + "-message-size", 0);
210 attrs.set_channel_name("/apollo/cyber/benchmark");
211 auto qos = attrs.mutable_qos_profile();
212 qos->set_depth(10);
213
214 if (qos_policy == 1) {
215 qos->set_reliability(
217 } else {
218 qos->set_reliability(
220 }
221 auto writer =
222 node->CreateWriter<apollo::cyber::benchmark::BenchmarkMsg>(attrs);
223
224 // sleep a while for initialization, aboout 2 seconds
225 apollo::cyber::Rate rate_init(0.5);
226
227 apollo::cyber::Rate rate_ctl(static_cast<float>(transport_freq));
228
229 rate_init.Sleep();
230
231 uint64_t send_msg_total = transport_freq * running_time;
232
233 // std::vector<uint32_t> trans_vec;
234 // int num_of_instance = message_size / 4;
235 // for (int i = 0; i < num_of_instance; i++) {
236 // trans_vec.push_back(rand());
237 // }
238
239 // char* data = (char*)malloc(message_size);
240 // for (int i = 0; i < num_of_instance; i++) {
241 // *(uint32_t*)(data + i * 4) = rand();
242 // }
243
244 // if (data_type == 0) {
245 // trans_unit->set_data_bytes(data, message_size);
246 // } else {
247 // for (int i = 0; i < num_of_instance; i++) {
248 // trans_unit->add_data(trans_vec[i]);
249 // }
250 // }
251 // free(data);
252
253 int send_msg = 0;
254
255#ifndef NO_TCMALLOC
256#ifdef BASE_PROFILER_H_
257 if (enable_cpuprofile) {
258 ProfilerStart(profile_filename.c_str());
259 }
260 if (enable_heapprofile) {
261 HeapProfilerStart(heapprofile_filename.c_str());
262 }
263#endif
264#endif
265
266 std::vector<uint32_t> trans_vec;
267 int num_of_instance = message_size / 4;
268 for (int i = 0; i < num_of_instance; i++) {
269 trans_vec.push_back(rand()); // NOLINT
270 }
271
272 char* data = (char*)malloc(message_size); // NOLINT
273
274 if (transport_freq > 0) {
275 auto start_time = apollo::cyber::Time::Now();
276 while (send_msg < send_msg_total) {
277 auto trans_unit = writer->AcquireMessage();
278 int base = rand(); // NOLINT
279
280 for (int i = 0; i < num_of_instance; i++) {
281 trans_vec[i] = base * i;
282 }
283
284 for (int i = 0; i < num_of_instance; i++) {
285 *(uint32_t*)(data + i * 4) = base * i; // NOLINT
286 }
287
288 if (data_type == 0) {
289 trans_unit->set_data_bytes(data, message_size);
290 } else {
291 for (int i = 0; i < num_of_instance; i++) {
292 trans_unit->add_data(trans_vec[i]);
293 }
294 }
295
296 writer->Write(trans_unit);
297 ++send_msg;
298
299 rate_ctl.Sleep();
300 }
301 auto end_time = apollo::cyber::Time::Now();
302 test_time->set_value((end_time - start_time).ToSecond());
303 } else {
304 auto start_time = apollo::cyber::Time::Now();
305 auto current = start_time;
306 auto endtime = apollo::cyber::Time::Now() +
307 apollo::cyber::Duration(static_cast<double>(running_time));
308 while (current < endtime) {
309 auto trans_unit = writer->AcquireMessage();
310 int base = rand(); // NOLINT
311
312 for (int i = 0; i < num_of_instance; i++) {
313 trans_vec[i] = base * i;
314 }
315
316 for (int i = 0; i < num_of_instance; i++) {
317 *(uint32_t*)(data + i * 4) = base * i; // NOLINT
318 }
319
320 if (data_type == 0) {
321 trans_unit->set_data_bytes(data, message_size);
322 } else {
323 for (int i = 0; i < num_of_instance; i++) {
324 trans_unit->add_data(trans_vec[i]);
325 }
326 }
327
328 writer->Write(trans_unit);
329 ++send_msg;
330 current = apollo::cyber::Time::Now();
331 }
332 test_time->set_value((current - start_time).ToSecond());
333 }
334
335 auto m = writer->AcquireMessage();
336 if (data_type == 0) {
337 m->set_data_bytes(data, message_size);
338 } else {
339 for (int i = 0; i < num_of_instance; i++) {
340 m->add_data(trans_vec[i]);
341 }
342 }
343 test_message_size->set_value(m->ByteSizeLong());
344
345 free(data);
346
347#ifndef NO_TCMALLOC
348#ifdef BASE_PROFILER_H_
349 if (enable_cpuprofile) {
350 ProfilerStop();
351 }
352 if (enable_heapprofile) {
353 HeapProfilerDump("Befor shutdown");
354 HeapProfilerStop();
355 }
356#endif
357#endif
358 std::this_thread::sleep_for(std::chrono::milliseconds(1200));
360
361 return 0;
362}
static Time Now()
get the current time.
Definition time.cc:57
bool enable_heapprofile
void DisplayUsage()
int running_time
int main(int argc, char **argv)
int transport_freq
void GetOptions(const int argc, char *const argv[])
std::string heapprofile_filename
std::string profile_filename
int message_size
std::string BINARY_NAME
bool enable_cpuprofile
#define AERROR
Definition log.h:44
#define AINFO
Definition log.h:42
#define AWARN
Definition log.h:43
void Clear()
Definition init.cc:161
bool Init(const char *binary_name, const std::string &dag_info)
Definition init.cc:98
std::unique_ptr< Node > CreateNode(const std::string &node_name, const std::string &name_space)
Definition cyber.cc:33