22 #include <odp/helper/odph_api.h>
24 #include <export_results.h>
26 #define MAX_QUEUES (32 * 1024)
33 typedef struct test_options_t {
46 typedef struct test_stat_t {
56 typedef struct test_global_t test_global_t;
59 test_global_t *global;
61 test_options_t *options;
63 uint32_t src_queue_id[MAX_QUEUES];
64 uint32_t dst_queue_id[MAX_QUEUES];
68 typedef struct test_global_t {
70 test_options_t options;
78 test_common_options_t common_options;
82 static void print_usage(
void)
85 "Plain queue performance test\n"
87 "Usage: odp_queue_perf [options]\n"
89 " -m, --mode <arg> Test mode:\n"
90 " 0: Loop: events are enqueued back to the same queue they\n"
91 " were dequeued from (default)\n"
92 " 1: Pair: queues are paired and events are always moved\n"
93 " between the queues when doing dequeue/enqueue. Requires\n"
94 " an even number of both queues and workers.\n"
95 " -c, --num_cpu Number of worker threads (default 1)\n"
96 " -q, --num_queue Number of queues (default 1)\n"
97 " -e, --num_event Number of events per queue (default 1)\n"
98 " -b, --burst_size Maximum number of events per operation (default 1)\n"
99 " -p, --private Use separate queues for each worker\n"
100 " -r, --num_round Number of rounds\n"
101 " -l, --lockfree Lock-free queues\n"
102 " -w, --waitfree Wait-free queues\n"
103 " -s, --single Single producer/consumer queues\n"
104 " -h, --help This help\n"
108 static int parse_options(
int argc,
char *argv[], test_options_t *test_options)
113 static const struct option longopts[] = {
114 {
"num_cpu", required_argument, NULL,
'c'},
115 {
"num_queue", required_argument, NULL,
'q'},
116 {
"num_event", required_argument, NULL,
'e'},
117 {
"burst_size", required_argument, NULL,
'b'},
118 {
"mode", required_argument, NULL,
'm'},
119 {
"private", no_argument, NULL,
'p'},
120 {
"num_round", required_argument, NULL,
'r'},
121 {
"lockfree", no_argument, NULL,
'l'},
122 {
"waitfree", no_argument, NULL,
'w'},
123 {
"single", no_argument, NULL,
's'},
124 {
"help", no_argument, NULL,
'h'},
128 static const char *shortopts =
"+c:q:e:b:m:pr:lwsh";
130 test_options->num_cpu = 1;
131 test_options->num_queue = 1;
132 test_options->num_event = 1;
133 test_options->max_burst = 1;
134 test_options->mode = TEST_MODE_LOOP;
135 test_options->num_round = 1000;
137 test_options->single =
false;
138 test_options->private_queues =
false;
141 opt = getopt_long(argc, argv, shortopts, longopts, NULL);
148 test_options->num_cpu = atoi(optarg);
151 test_options->num_queue = atoi(optarg);
154 test_options->num_event = atoi(optarg);
157 test_options->max_burst = atoi(optarg);
160 if (atoi(optarg) == TEST_MODE_PAIR)
161 test_options->mode = TEST_MODE_PAIR;
164 test_options->num_round = atoi(optarg);
173 test_options->private_queues =
true;
176 test_options->single =
true;
187 if (test_options->num_queue > MAX_QUEUES || test_options->num_queue == 0) {
188 ODPH_ERR(
"Invalid number of queues %u. Test maximum %u.\n",
189 test_options->num_queue, MAX_QUEUES);
193 num_cpu = test_options->num_cpu;
197 if (test_options->private_queues) {
198 if ((
int)test_options->num_queue < num_cpu) {
199 ODPH_ERR(
"Not enough queues for %d workers.\n", num_cpu);
202 if (test_options->num_queue % num_cpu)
203 ODPH_ERR(
"Warn: %" PRIu32
" queues shared unevenly amongst %" PRIu32
" "
204 "workers.\n", test_options->num_queue, num_cpu);
207 if (test_options->single && !test_options->private_queues) {
208 if ((test_options->mode == TEST_MODE_LOOP && num_cpu != 1) ||
209 (test_options->mode == TEST_MODE_PAIR && num_cpu != 2)) {
210 ODPH_ERR(
"Multiple producers/consumers not allowed with single prod/cons queues.\n");
215 if (test_options->mode == TEST_MODE_PAIR && (test_options->num_queue % 2 || num_cpu % 2)) {
216 ODPH_ERR(
"Pair mode requires an even number of queues and workers.\n");
223 static int create_queues(test_global_t *global)
230 uint32_t i, j, max_size, max_num;
231 test_options_t *test_options = &global->options;
233 uint32_t num_queue = test_options->num_queue;
234 uint32_t num_event = test_options->num_event;
235 uint32_t num_round = test_options->num_round;
236 uint32_t tot_event = num_queue * num_event;
237 uint32_t queue_size = test_options->mode == TEST_MODE_PAIR ? 2 * num_event : num_event;
242 printf(
"\nTesting %s queues\n",
246 printf(
" mode %s\n", test_options->mode == TEST_MODE_LOOP ?
248 printf(
" private queues %s\n", test_options->private_queues ?
"yes" :
"no");
249 printf(
" single prod/cons %s\n", test_options->single ?
"yes" :
"no");
250 printf(
" num rounds %u\n", num_round);
251 printf(
" num queues %u\n", num_queue);
252 printf(
" num events per queue %u\n", num_event);
253 printf(
" queue size %u\n", queue_size);
254 printf(
" max burst size %u\n", test_options->max_burst);
256 for (i = 0; i < num_queue; i++)
259 for (i = 0; i < tot_event; i++)
263 ODPH_ERR(
"Queue capa failed.\n");
268 ODPH_ERR(
"Pool capa failed.\n");
274 ODPH_ERR(
"Max queues supported %u.\n", queue_capa.
plain.
max_num);
279 if (max_size && queue_size > max_size) {
280 ODPH_ERR(
"Max queue size supported %u.\n", max_size);
285 ODPH_ERR(
"Lockfree queues not supported.\n");
290 ODPH_ERR(
"Max lockfree queues supported %u.\n",
296 if (max_size && queue_size > max_size) {
297 ODPH_ERR(
"Max lockfree queue size supported %u.\n", max_size);
302 ODPH_ERR(
"Waitfree queues not supported.\n");
307 ODPH_ERR(
"Max waitfree queues supported %u.\n",
313 if (max_size && queue_size > max_size) {
314 ODPH_ERR(
"Max waitfree queue size supported %u.\n", max_size);
318 ODPH_ERR(
"Bad queue blocking type.\n");
324 if (max_num && tot_event > max_num) {
325 ODPH_ERR(
"Max events supported %u.\n", max_num);
331 pool_param.
buf.
num = tot_event;
336 ODPH_ERR(
"Pool create failed.\n");
345 queue_param.
size = queue_size;
347 if (test_options->single) {
352 for (i = 0; i < num_queue; i++) {
356 ODPH_ERR(
"Queue create failed %u.\n", i);
361 for (i = 0; i < tot_event; i++) {
365 ODPH_ERR(
"Event alloc failed %u.\n", i);
371 for (i = 0; i < num_queue; i++) {
372 for (j = 0; j < num_event; j++) {
373 uint32_t
id = i * num_event + j;
376 ODPH_ERR(
"Queue enq failed %u/%u.\n", i, j);
387 for (i = 0; i < tot_event; i++) {
393 ODPH_ERR(
"Initializing test queues failed.\n");
398 static int destroy_queues(test_global_t *global)
402 test_options_t *test_options = &global->options;
403 uint32_t num_queue = test_options->num_queue;
407 for (i = 0; i < num_queue; i++) {
420 ODPH_ERR(
"Queue destroy failed %u.\n", i);
427 ODPH_ERR(
"Pool destroy failed.\n");
434 static int run_test(
void *arg)
436 uint64_t c1, c2, cycles, nsec;
440 thread_args_t *thr_args = arg;
441 test_global_t *global = thr_args->global;
442 test_stat_t *stat = &thr_args->stats;
444 uint64_t num_deq_retry = 0;
445 uint64_t num_enq_retry = 0;
447 const uint32_t num_queue = thr_args->num_queues;
448 const uint32_t num_round = thr_args->options->num_round;
449 const uint32_t num_workers = thr_args->options->num_cpu;
450 const uint32_t max_burst = thr_args->options->max_burst;
451 uint32_t queue_idx = 0;
456 for (uint32_t i = 0; i < num_queue; i++) {
457 src_queue_tbl[i] = global->queue[thr_args->src_queue_id[i]];
458 dst_queue_tbl[i] = global->queue[thr_args->dst_queue_id[i]];
467 for (rounds = 0; rounds < num_round; rounds++) {
471 src_queue = src_queue_tbl[queue_idx];
472 dst_queue = dst_queue_tbl[queue_idx];
475 if (queue_idx == num_queue)
481 ODPH_ABORT(
"odp_queue_deq_multi() failed\n");
486 }
while (num_ev == 0);
488 while (num_enq < num_ev) {
492 ODPH_ABORT(
"odp_queue_enq_multi() failed\n");
508 while (thr_args->options->mode == TEST_MODE_PAIR &&
512 src_queue = src_queue_tbl[queue_idx];
513 dst_queue = dst_queue_tbl[queue_idx];
516 if (queue_idx == num_queue)
521 while (num_enq < num_ev) {
525 ODPH_ABORT(
"odp_queue_enq_multi() failed\n");
534 stat->rounds = rounds;
535 stat->events = events;
537 stat->cycles = cycles;
538 stat->deq_retry = num_deq_retry;
539 stat->enq_retry = num_enq_retry;
544 static void map_queues_to_threads(test_global_t *global)
546 test_options_t *opt = &global->options;
548 if (opt->mode == TEST_MODE_LOOP) {
549 if (!opt->private_queues) {
550 for (uint32_t i = 0; i < opt->num_queue; i++) {
551 for (uint32_t j = 0; j < opt->num_cpu; j++) {
552 thread_args_t *thread_args = &global->thread_args[j];
554 thread_args->src_queue_id[i] = i;
555 thread_args->dst_queue_id[i] = i;
556 thread_args->num_queues++;
562 for (uint32_t i = 0; i < opt->num_queue; i++) {
563 thread_args_t *thread_args = &global->thread_args[i % opt->num_cpu];
564 uint32_t queue_idx = thread_args->num_queues;
566 thread_args->src_queue_id[queue_idx] = i;
567 thread_args->dst_queue_id[queue_idx] = i;
568 thread_args->num_queues++;
573 if (!opt->private_queues) {
574 for (uint32_t i = 0; i < opt->num_queue; i += 2) {
575 for (uint32_t j = 0; j < opt->num_cpu; j++) {
576 thread_args_t *thread_args = &global->thread_args[j];
577 uint32_t num_queues = thread_args->num_queues;
580 thread_args->src_queue_id[num_queues] = i;
581 thread_args->dst_queue_id[num_queues] = i + 1;
583 thread_args->src_queue_id[num_queues] = i + 1;
584 thread_args->dst_queue_id[num_queues] = i;
586 thread_args->num_queues++;
592 for (uint32_t i = 0; i < opt->num_queue; i += 2) {
594 uint32_t thread_a_idx = i % opt->num_cpu;
595 thread_args_t *thread_a_args = &global->thread_args[thread_a_idx];
596 thread_args_t *thread_b_args = &global->thread_args[thread_a_idx + 1];
598 num_queues = thread_a_args->num_queues;
599 thread_a_args->src_queue_id[num_queues] = i;
600 thread_a_args->dst_queue_id[num_queues] = i + 1;
601 thread_a_args->num_queues++;
603 num_queues = thread_b_args->num_queues;
604 thread_b_args->src_queue_id[num_queues] = i + 1;
605 thread_b_args->dst_queue_id[num_queues] = i;
606 thread_b_args->num_queues++;
610 static void print_queue_mappings(test_global_t *global)
612 printf(
"Worker-queue mappings\n");
613 printf(
"---------------------\n");
615 for (uint32_t i = 0; i < global->options.num_cpu; i++) {
616 thread_args_t *thread_args = &global->thread_args[i];
617 uint32_t num_queues = thread_args->num_queues;
619 printf(
"Worker %u:\n", i);
621 printf(
" src queue idx:");
622 for (uint32_t j = 0; j < num_queues; j++)
623 printf(
" %" PRIu32
"", thread_args->src_queue_id[j]);
624 printf(
"\n dst queue idx:");
625 for (uint32_t j = 0; j < num_queues; j++)
626 printf(
" %" PRIu32
"", thread_args->dst_queue_id[j]);
631 static void init_thread_args(test_global_t *global)
633 for (uint32_t i = 0; i < global->options.num_cpu; i++) {
634 thread_args_t *thread_args = &global->thread_args[i];
636 thread_args->global = global;
637 thread_args->barrier = &global->barrier;
638 thread_args->options = &global->options;
641 map_queues_to_threads(global);
643 print_queue_mappings(global);
646 static int start_workers(test_global_t *global)
648 odph_thread_common_param_t thr_common;
652 test_options_t *test_options = &global->options;
653 int num_cpu = test_options->num_cpu;
657 if (num_cpu && ret != num_cpu) {
658 ODPH_ERR(
"Too many workers. Max supported %i\n.", ret);
665 test_options->num_cpu = num_cpu;
668 printf(
" num workers %u\n\n", num_cpu);
672 odph_thread_common_param_init(&thr_common);
673 thr_common.instance = global->instance;
674 thr_common.cpumask = &cpumask;
676 init_thread_args(global);
678 for (
int i = 0; i < num_cpu; i++) {
679 odph_thread_param_init(&thr_param[i]);
680 thr_param[i].start = run_test;
681 thr_param[i].arg = &global->thread_args[i];
685 if (odph_thread_create(global->thread_tbl, &thr_common, thr_param,
692 static int output_results(test_global_t *global)
695 double rounds_ave, events_ave, nsec_ave, cycles_ave;
697 test_options_t *test_options = &global->options;
698 int num_cpu = test_options->num_cpu;
699 uint64_t rounds_sum = 0;
700 uint64_t events_sum = 0;
701 uint64_t nsec_sum = 0;
702 uint64_t cycles_sum = 0;
703 uint64_t deq_retry_sum = 0;
704 uint64_t enq_retry_sum = 0;
708 stats = &global->thread_args[i].stats;
709 rounds_sum += stats->rounds;
710 events_sum += stats->events;
711 nsec_sum += stats->nsec;
712 cycles_sum += stats->cycles;
713 deq_retry_sum += stats->deq_retry;
714 enq_retry_sum += stats->enq_retry;
717 if (rounds_sum == 0) {
718 printf(
"No results.\n");
722 rounds_ave = rounds_sum / num_cpu;
723 events_ave = events_sum / num_cpu;
724 nsec_ave = nsec_sum / num_cpu;
725 cycles_ave = cycles_sum / num_cpu;
728 printf(
"RESULTS - per thread (Million events per sec):\n");
729 printf(
"----------------------------------------------\n");
730 printf(
" 1 2 3 4 5 6 7 8 9 10");
733 stats = &global->thread_args[i].stats;
738 printf(
"%6.1f ", (1000.0 * stats->events) / stats->nsec);
744 printf(
"RESULTS - per thread average (%i threads):\n", num_cpu);
745 printf(
"------------------------------------------\n");
746 printf(
" duration: %.3f msec\n", nsec_ave / 1000000);
747 printf(
" num cycles: %.3f M\n", cycles_ave / 1000000);
748 printf(
" events per dequeue: %.3f\n",
749 events_ave / rounds_ave);
750 printf(
" cycles per event: %.3f\n",
751 cycles_ave / events_ave);
752 printf(
" dequeue retries: %" PRIu64
"\n", deq_retry_sum);
753 printf(
" enqueue retries: %" PRIu64
"\n", enq_retry_sum);
754 printf(
" events per sec: %.3f M\n\n",
755 (1000.0 * events_ave) / nsec_ave);
757 printf(
"TOTAL events per sec: %.3f M\n\n",
758 (1000.0 * events_sum) / nsec_ave);
760 if (global->common_options.is_export) {
761 if (test_common_write(
"cycles per event,events per sec (M),TOTAL events per sec (M),"
762 "dequeue retries,enqueue retries\n")) {
763 test_common_write_term();
766 if (test_common_write(
"%f,%f,%f,%" PRIu64
",%" PRIu64
"\n",
767 cycles_ave / events_ave,
768 (1000.0 * events_ave) / nsec_ave,
769 (1000.0 * events_sum) / nsec_ave,
772 test_common_write_term();
775 test_common_write_term();
781 int main(
int argc,
char **argv)
783 odph_helper_options_t helper_options;
787 test_global_t *global;
788 test_common_options_t common_options;
791 argc = odph_parse_options(argc, argv);
792 if (odph_options(&helper_options)) {
793 ODPH_ERR(
"Reading ODP helper options failed.\n");
797 argc = test_common_parse_options(argc, argv);
798 if (test_common_options(&common_options)) {
799 ODPH_ERR(
"Reading test options failed.\n");
813 init.
mem_model = helper_options.mem_model;
817 ODPH_ERR(
"Global init failed.\n");
823 ODPH_ERR(
"Local init failed.\n");
827 shm =
odp_shm_reserve(
"queue_perf_global",
sizeof(test_global_t), ODP_CACHE_LINE_SIZE, 0);
829 ODPH_ERR(
"Shared memory reserve failed.\n");
834 if (global == NULL) {
835 ODPH_ERR(
"Shared memory address read failed.\n");
839 memset(global, 0,
sizeof(test_global_t));
840 global->common_options = common_options;
843 if (parse_options(argc, argv, &global->options))
848 global->instance = instance;
850 if (create_queues(global))
853 if (start_workers(global)) {
854 ODPH_ERR(
"Test start failed.\n");
859 odph_thread_join(global->thread_tbl, global->options.num_cpu);
861 if (output_results(global)) {
862 ODPH_ERR(
"Outputting results failed.\n");
867 if (destroy_queues(global)) {
868 ODPH_ERR(
"Destroy queues failed.\n");
873 ODPH_ERR(
"Shared memory free failed.\n");
878 ODPH_ERR(
"Term local failed.\n");
883 ODPH_ERR(
"Term global failed.\n");
void odp_atomic_init_u32(odp_atomic_u32_t *atom, uint32_t val)
Initialize atomic uint32 variable.
uint32_t odp_atomic_load_u32(odp_atomic_u32_t *atom)
Load value of atomic uint32 variable.
void odp_atomic_inc_u32(odp_atomic_u32_t *atom)
Increment atomic uint32 variable.
void odp_barrier_init(odp_barrier_t *barr, int count)
Initialize barrier with thread count.
void odp_barrier_wait(odp_barrier_t *barr)
Synchronize thread execution on barrier.
odp_event_t odp_buffer_to_event(odp_buffer_t buf)
Convert buffer handle to event.
odp_buffer_t odp_buffer_alloc(odp_pool_t pool)
Buffer alloc.
#define odp_unlikely(x)
Branch unlikely taken.
uint64_t odp_cpu_cycles_diff(uint64_t c2, uint64_t c1)
CPU cycle count difference.
uint64_t odp_cpu_cycles(void)
Current CPU cycle count.
int odp_cpumask_default_worker(odp_cpumask_t *mask, int num)
Default CPU mask for worker threads.
void odp_event_free(odp_event_t event)
Free event.
#define ODP_EVENT_INVALID
Invalid event.
void odp_init_param_init(odp_init_t *param)
Initialize the odp_init_t to default values for all fields.
int odp_init_local(odp_instance_t instance, odp_thread_type_t thr_type)
Thread local ODP initialization.
int odp_init_global(odp_instance_t *instance, const odp_init_t *params, const odp_platform_init_t *platform_params)
Global ODP initialization.
int odp_term_local(void)
Thread local ODP termination.
int odp_term_global(odp_instance_t instance)
Global ODP termination.
uint64_t odp_instance_t
ODP instance ID.
odp_pool_t odp_pool_create(const char *name, const odp_pool_param_t *param)
Create a pool.
int odp_pool_capability(odp_pool_capability_t *capa)
Query pool capabilities.
void odp_pool_param_init(odp_pool_param_t *param)
Initialize pool params.
int odp_pool_destroy(odp_pool_t pool)
Destroy a pool previously created by odp_pool_create()
#define ODP_POOL_INVALID
Invalid pool.
@ ODP_POOL_BUFFER
Buffer pool.
odp_nonblocking_t
Non-blocking level.
int odp_queue_enq_multi(odp_queue_t queue, const odp_event_t events[], int num)
Enqueue multiple events to a queue.
void odp_queue_param_init(odp_queue_param_t *param)
Initialize queue params.
int odp_queue_capability(odp_queue_capability_t *capa)
Query queue capabilities.
#define ODP_QUEUE_INVALID
Invalid queue.
odp_event_t odp_queue_deq(odp_queue_t queue)
Dequeue an event from a queue.
int odp_queue_enq(odp_queue_t queue, odp_event_t ev)
Enqueue an event to a queue.
odp_queue_t odp_queue_create(const char *name, const odp_queue_param_t *param)
Queue create.
int odp_queue_destroy(odp_queue_t queue)
Destroy ODP queue.
int odp_queue_deq_multi(odp_queue_t queue, odp_event_t events[], int num)
Dequeue multiple events from a queue.
@ ODP_NONBLOCKING_WF
Non-blocking and wait-free implementation.
@ ODP_BLOCKING
Blocking implementation.
@ ODP_NONBLOCKING_LF
Non-blocking and lock-free implementation.
@ ODP_QUEUE_TYPE_PLAIN
Plain queue.
@ ODP_QUEUE_OP_MT_UNSAFE
Not multithread safe operation.
int odp_shm_free(odp_shm_t shm)
Free a contiguous block of shared memory.
#define ODP_SHM_INVALID
Invalid shared memory block.
void * odp_shm_addr(odp_shm_t shm)
Shared memory block address.
odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, uint32_t flags)
Reserve a contiguous block of shared memory.
bool odp_bool_t
Boolean type.
void odp_sys_info_print(void)
Print system info.
#define ODP_THREAD_COUNT_MAX
Maximum number of threads supported in build time.
@ ODP_THREAD_WORKER
Worker thread.
odp_time_t odp_time_local_strict(void)
Current local time (strict)
uint64_t odp_time_diff_ns(odp_time_t t2, odp_time_t t1)
Time difference in nanoseconds.
Global initialization parameters.
odp_mem_model_t mem_model
Application memory model.
odp_feature_t not_used
Unused features.
struct odp_pool_capability_t::@121 buf
Buffer pool capabilities
uint32_t max_num
Maximum number of buffers of any size.
uint32_t num
Number of buffers in the pool.
odp_pool_type_t type
Pool type.
struct odp_pool_param_t::@125 buf
Parameters for buffer pools.
struct odp_queue_capability_t::@140::@142 waitfree
Wait-free (ODP_NONBLOCKING_WF) implementation capabilities.
uint32_t max_size
Maximum number of events a plain (ODP_BLOCKING) queue can store simultaneously.
uint32_t max_num
Maximum number of plain (ODP_BLOCKING) queues of the default size.
struct odp_queue_capability_t::@140::@141 lockfree
Lock-free (ODP_NONBLOCKING_LF) implementation capabilities.
struct odp_queue_capability_t::@140 plain
Plain queue capabilities.
odp_queue_op_mode_t enq_mode
Enqueue mode.
odp_queue_type_t type
Queue type.
odp_queue_op_mode_t deq_mode
Dequeue mode.
odp_nonblocking_t nonblocking
Non-blocking level.
uint32_t tm
Traffic Manager APIs, e.g., odp_tm_xxx()
uint32_t crypto
Crypto APIs, e.g., odp_crypto_xxx()
uint32_t ipsec
IPsec APIs, e.g., odp_ipsec_xxx()
uint32_t timer
Timer APIs, e.g., odp_timer_xxx(), odp_timeout_xxx()
uint32_t cls
Classifier APIs, e.g., odp_cls_xxx(), odp_cos_xxx()
uint32_t schedule
Scheduler APIs, e.g., odp_schedule_xxx()
struct odp_feature_t::@148 feat
Individual feature bits.
uint32_t compress
Compression APIs, e.g., odp_comp_xxx()