API Reference Manual  1.46.0
odp_queue_perf.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018 Linaro Limited
3  * Copyright (c) 2021-2024 Nokia
4  */
5 
14 #include <stdio.h>
15 #include <string.h>
16 #include <stdint.h>
17 #include <inttypes.h>
18 #include <stdlib.h>
19 #include <getopt.h>
20 
21 #include <odp_api.h>
22 #include <odp/helper/odph_api.h>
23 
24 #include <export_results.h>
25 
26 #define MAX_QUEUES (32 * 1024)
27 
28 typedef enum {
29  TEST_MODE_LOOP = 0,
30  TEST_MODE_PAIR,
31 } test_mode_t;
32 
33 typedef struct test_options_t {
34  uint32_t num_queue;
35  uint32_t num_event;
36  uint32_t num_round;
37  uint32_t max_burst;
38  uint32_t num_cpu;
39  odp_nonblocking_t nonblock;
40  test_mode_t mode;
41  odp_bool_t private_queues;
42  odp_bool_t single;
43 
44 } test_options_t;
45 
46 typedef struct test_stat_t {
47  uint64_t rounds;
48  uint64_t events;
49  uint64_t nsec;
50  uint64_t cycles;
51  uint64_t deq_retry;
52  uint64_t enq_retry;
53 
54 } test_stat_t;
55 
56 typedef struct test_global_t test_global_t;
57 
58 typedef struct {
59  test_global_t *global;
60  odp_barrier_t *barrier;
61  test_options_t *options;
62  test_stat_t stats;
63  uint32_t src_queue_id[MAX_QUEUES];
64  uint32_t dst_queue_id[MAX_QUEUES];
65  uint32_t num_queues;
66 } thread_args_t;
67 
68 typedef struct test_global_t {
69  odp_barrier_t barrier;
70  test_options_t options;
71  odp_instance_t instance;
72  odp_shm_t shm;
73  odp_pool_t pool;
74  odp_atomic_u32_t workers_finished;
75  odp_queue_t queue[MAX_QUEUES];
76  odph_thread_t thread_tbl[ODP_THREAD_COUNT_MAX];
77  thread_args_t thread_args[ODP_THREAD_COUNT_MAX];
78  test_common_options_t common_options;
79 
80 } test_global_t;
81 
82 static void print_usage(void)
83 {
84  printf("\n"
85  "Plain queue performance test\n"
86  "\n"
87  "Usage: odp_queue_perf [options]\n"
88  "\n"
89  " -m, --mode <arg> Test mode:\n"
90  " 0: Loop: events are enqueued back to the same queue they\n"
91  " were dequeued from (default)\n"
92  " 1: Pair: queues are paired and events are always moved\n"
93  " between the queues when doing dequeue/enqueue. Requires\n"
94  " an even number of both queues and workers.\n"
95  " -c, --num_cpu Number of worker threads (default 1)\n"
96  " -q, --num_queue Number of queues (default 1)\n"
97  " -e, --num_event Number of events per queue (default 1)\n"
98  " -b, --burst_size Maximum number of events per operation (default 1)\n"
99  " -p, --private Use separate queues for each worker\n"
100  " -r, --num_round Number of rounds\n"
101  " -l, --lockfree Lock-free queues\n"
102  " -w, --waitfree Wait-free queues\n"
103  " -s, --single Single producer/consumer queues\n"
104  " -h, --help This help\n"
105  "\n");
106 }
107 
108 static int parse_options(int argc, char *argv[], test_options_t *test_options)
109 {
110  int opt, num_cpu;
111  int ret = 0;
112 
113  static const struct option longopts[] = {
114  {"num_cpu", required_argument, NULL, 'c'},
115  {"num_queue", required_argument, NULL, 'q'},
116  {"num_event", required_argument, NULL, 'e'},
117  {"burst_size", required_argument, NULL, 'b'},
118  {"mode", required_argument, NULL, 'm'},
119  {"private", no_argument, NULL, 'p'},
120  {"num_round", required_argument, NULL, 'r'},
121  {"lockfree", no_argument, NULL, 'l'},
122  {"waitfree", no_argument, NULL, 'w'},
123  {"single", no_argument, NULL, 's'},
124  {"help", no_argument, NULL, 'h'},
125  {NULL, 0, NULL, 0}
126  };
127 
128  static const char *shortopts = "+c:q:e:b:m:pr:lwsh";
129 
130  test_options->num_cpu = 1;
131  test_options->num_queue = 1;
132  test_options->num_event = 1;
133  test_options->max_burst = 1;
134  test_options->mode = TEST_MODE_LOOP;
135  test_options->num_round = 1000;
136  test_options->nonblock = ODP_BLOCKING;
137  test_options->single = false;
138  test_options->private_queues = false;
139 
140  while (1) {
141  opt = getopt_long(argc, argv, shortopts, longopts, NULL);
142 
143  if (opt == -1)
144  break;
145 
146  switch (opt) {
147  case 'c':
148  test_options->num_cpu = atoi(optarg);
149  break;
150  case 'q':
151  test_options->num_queue = atoi(optarg);
152  break;
153  case 'e':
154  test_options->num_event = atoi(optarg);
155  break;
156  case 'b':
157  test_options->max_burst = atoi(optarg);
158  break;
159  case 'm':
160  if (atoi(optarg) == TEST_MODE_PAIR)
161  test_options->mode = TEST_MODE_PAIR;
162  break;
163  case 'r':
164  test_options->num_round = atoi(optarg);
165  break;
166  case 'l':
167  test_options->nonblock = ODP_NONBLOCKING_LF;
168  break;
169  case 'w':
170  test_options->nonblock = ODP_NONBLOCKING_WF;
171  break;
172  case 'p':
173  test_options->private_queues = true;
174  break;
175  case 's':
176  test_options->single = true;
177  break;
178  case 'h':
179  /* fall through */
180  default:
181  print_usage();
182  ret = -1;
183  break;
184  }
185  }
186 
187  if (test_options->num_queue > MAX_QUEUES || test_options->num_queue == 0) {
188  ODPH_ERR("Invalid number of queues %u. Test maximum %u.\n",
189  test_options->num_queue, MAX_QUEUES);
190  return -1;
191  }
192 
193  num_cpu = test_options->num_cpu;
194  if (num_cpu == 0)
195  num_cpu = odp_cpumask_default_worker(NULL, 0);
196 
197  if (test_options->private_queues) {
198  if ((int)test_options->num_queue < num_cpu) {
199  ODPH_ERR("Not enough queues for %d workers.\n", num_cpu);
200  return -1;
201  }
202  if (test_options->num_queue % num_cpu)
203  ODPH_ERR("Warn: %" PRIu32 " queues shared unevenly amongst %" PRIu32 " "
204  "workers.\n", test_options->num_queue, num_cpu);
205  }
206 
207  if (test_options->single && !test_options->private_queues) {
208  if ((test_options->mode == TEST_MODE_LOOP && num_cpu != 1) ||
209  (test_options->mode == TEST_MODE_PAIR && num_cpu != 2)) {
210  ODPH_ERR("Multiple producers/consumers not allowed with single prod/cons queues.\n");
211  return -1;
212  }
213  }
214 
215  if (test_options->mode == TEST_MODE_PAIR && (test_options->num_queue % 2 || num_cpu % 2)) {
216  ODPH_ERR("Pair mode requires an even number of queues and workers.\n");
217  return -1;
218  }
219 
220  return ret;
221 }
222 
223 static int create_queues(test_global_t *global)
224 {
225  odp_pool_capability_t pool_capa;
226  odp_queue_capability_t queue_capa;
227  odp_pool_param_t pool_param;
228  odp_queue_param_t queue_param;
229  odp_pool_t pool;
230  uint32_t i, j, max_size, max_num;
231  test_options_t *test_options = &global->options;
232  odp_nonblocking_t nonblock = test_options->nonblock;
233  uint32_t num_queue = test_options->num_queue;
234  uint32_t num_event = test_options->num_event;
235  uint32_t num_round = test_options->num_round;
236  uint32_t tot_event = num_queue * num_event;
237  uint32_t queue_size = test_options->mode == TEST_MODE_PAIR ? 2 * num_event : num_event;
238  int ret = 0;
239  odp_queue_t *queue = global->queue;
240  odp_event_t event[tot_event];
241 
242  printf("\nTesting %s queues\n",
243  nonblock == ODP_BLOCKING ? "NORMAL" :
244  (nonblock == ODP_NONBLOCKING_LF ? "LOCKFREE" :
245  (nonblock == ODP_NONBLOCKING_WF ? "WAITFREE" : "???")));
246  printf(" mode %s\n", test_options->mode == TEST_MODE_LOOP ?
247  "loop" : "pair");
248  printf(" private queues %s\n", test_options->private_queues ? "yes" : "no");
249  printf(" single prod/cons %s\n", test_options->single ? "yes" : "no");
250  printf(" num rounds %u\n", num_round);
251  printf(" num queues %u\n", num_queue);
252  printf(" num events per queue %u\n", num_event);
253  printf(" queue size %u\n", queue_size);
254  printf(" max burst size %u\n", test_options->max_burst);
255 
256  for (i = 0; i < num_queue; i++)
257  queue[i] = ODP_QUEUE_INVALID;
258 
259  for (i = 0; i < tot_event; i++)
260  event[i] = ODP_EVENT_INVALID;
261 
262  if (odp_queue_capability(&queue_capa)) {
263  ODPH_ERR("Queue capa failed.\n");
264  return -1;
265  }
266 
267  if (odp_pool_capability(&pool_capa)) {
268  ODPH_ERR("Pool capa failed.\n");
269  return -1;
270  }
271 
272  if (nonblock == ODP_BLOCKING) {
273  if (num_queue > queue_capa.plain.max_num) {
274  ODPH_ERR("Max queues supported %u.\n", queue_capa.plain.max_num);
275  return -1;
276  }
277 
278  max_size = queue_capa.plain.max_size;
279  if (max_size && queue_size > max_size) {
280  ODPH_ERR("Max queue size supported %u.\n", max_size);
281  return -1;
282  }
283  } else if (nonblock == ODP_NONBLOCKING_LF) {
284  if (queue_capa.plain.lockfree.max_num == 0) {
285  ODPH_ERR("Lockfree queues not supported.\n");
286  return -1;
287  }
288 
289  if (num_queue > queue_capa.plain.lockfree.max_num) {
290  ODPH_ERR("Max lockfree queues supported %u.\n",
291  queue_capa.plain.lockfree.max_num);
292  return -1;
293  }
294 
295  max_size = queue_capa.plain.lockfree.max_size;
296  if (max_size && queue_size > max_size) {
297  ODPH_ERR("Max lockfree queue size supported %u.\n", max_size);
298  return -1;
299  }
300  } else if (nonblock == ODP_NONBLOCKING_WF) {
301  if (queue_capa.plain.waitfree.max_num == 0) {
302  ODPH_ERR("Waitfree queues not supported.\n");
303  return -1;
304  }
305 
306  if (num_queue > queue_capa.plain.waitfree.max_num) {
307  ODPH_ERR("Max waitfree queues supported %u.\n",
308  queue_capa.plain.waitfree.max_num);
309  return -1;
310  }
311 
312  max_size = queue_capa.plain.waitfree.max_size;
313  if (max_size && queue_size > max_size) {
314  ODPH_ERR("Max waitfree queue size supported %u.\n", max_size);
315  return -1;
316  }
317  } else {
318  ODPH_ERR("Bad queue blocking type.\n");
319  return -1;
320  }
321 
322  max_num = pool_capa.buf.max_num;
323 
324  if (max_num && tot_event > max_num) {
325  ODPH_ERR("Max events supported %u.\n", max_num);
326  return -1;
327  }
328 
329  odp_pool_param_init(&pool_param);
330  pool_param.type = ODP_POOL_BUFFER;
331  pool_param.buf.num = tot_event;
332 
333  pool = odp_pool_create("queue perf pool", &pool_param);
334 
335  if (pool == ODP_POOL_INVALID) {
336  ODPH_ERR("Pool create failed.\n");
337  return -1;
338  }
339 
340  global->pool = pool;
341 
342  odp_queue_param_init(&queue_param);
343  queue_param.type = ODP_QUEUE_TYPE_PLAIN;
344  queue_param.nonblocking = nonblock;
345  queue_param.size = queue_size;
346 
347  if (test_options->single) {
348  queue_param.enq_mode = ODP_QUEUE_OP_MT_UNSAFE;
349  queue_param.deq_mode = ODP_QUEUE_OP_MT_UNSAFE;
350  }
351 
352  for (i = 0; i < num_queue; i++) {
353  queue[i] = odp_queue_create(NULL, &queue_param);
354 
355  if (queue[i] == ODP_QUEUE_INVALID) {
356  ODPH_ERR("Queue create failed %u.\n", i);
357  return -1;
358  }
359  }
360 
361  for (i = 0; i < tot_event; i++) {
362  event[i] = odp_buffer_to_event(odp_buffer_alloc(pool));
363 
364  if (event[i] == ODP_EVENT_INVALID) {
365  ODPH_ERR("Event alloc failed %u.\n", i);
366  ret = -1;
367  goto free_events;
368  }
369  }
370 
371  for (i = 0; i < num_queue; i++) {
372  for (j = 0; j < num_event; j++) {
373  uint32_t id = i * num_event + j;
374 
375  if (odp_queue_enq(queue[i], event[id])) {
376  ODPH_ERR("Queue enq failed %u/%u.\n", i, j);
377  ret = -1;
378  goto free_events;
379  }
380 
381  event[id] = ODP_EVENT_INVALID;
382  }
383  }
384 
385 free_events:
386  /* Free events that were not stored into queues */
387  for (i = 0; i < tot_event; i++) {
388  if (event[i] != ODP_EVENT_INVALID)
389  odp_event_free(event[i]);
390  }
391 
392  if (ret)
393  ODPH_ERR("Initializing test queues failed.\n");
394 
395  return ret;
396 }
397 
398 static int destroy_queues(test_global_t *global)
399 {
400  uint32_t i;
401  int ret = 0;
402  test_options_t *test_options = &global->options;
403  uint32_t num_queue = test_options->num_queue;
404  odp_queue_t *queue = global->queue;
405  odp_pool_t pool = global->pool;
406 
407  for (i = 0; i < num_queue; i++) {
408  if (queue[i] == ODP_QUEUE_INVALID)
409  break;
410 
411  while (1) {
412  odp_event_t ev = odp_queue_deq(queue[i]);
413 
414  if (ev == ODP_EVENT_INVALID)
415  break;
416  odp_event_free(ev);
417  }
418 
419  if (odp_queue_destroy(queue[i])) {
420  ODPH_ERR("Queue destroy failed %u.\n", i);
421  ret = -1;
422  break;
423  }
424  }
425 
426  if (pool != ODP_POOL_INVALID && odp_pool_destroy(pool)) {
427  ODPH_ERR("Pool destroy failed.\n");
428  ret = -1;
429  }
430 
431  return ret;
432 }
433 
434 static int run_test(void *arg)
435 {
436  uint64_t c1, c2, cycles, nsec;
437  odp_time_t t1, t2;
438  uint32_t rounds;
439  int num_ev;
440  thread_args_t *thr_args = arg;
441  test_global_t *global = thr_args->global;
442  test_stat_t *stat = &thr_args->stats;
443  odp_queue_t src_queue, dst_queue;
444  uint64_t num_deq_retry = 0;
445  uint64_t num_enq_retry = 0;
446  uint64_t events = 0;
447  const uint32_t num_queue = thr_args->num_queues;
448  const uint32_t num_round = thr_args->options->num_round;
449  const uint32_t num_workers = thr_args->options->num_cpu;
450  const uint32_t max_burst = thr_args->options->max_burst;
451  uint32_t queue_idx = 0;
452  odp_event_t ev[max_burst];
453  odp_queue_t src_queue_tbl[MAX_QUEUES];
454  odp_queue_t dst_queue_tbl[MAX_QUEUES];
455 
456  for (uint32_t i = 0; i < num_queue; i++) {
457  src_queue_tbl[i] = global->queue[thr_args->src_queue_id[i]];
458  dst_queue_tbl[i] = global->queue[thr_args->dst_queue_id[i]];
459  }
460 
461  /* Start all workers at the same time */
462  odp_barrier_wait(thr_args->barrier);
463 
464  t1 = odp_time_local_strict();
465  c1 = odp_cpu_cycles();
466 
467  for (rounds = 0; rounds < num_round; rounds++) {
468  int num_enq = 0;
469 
470  do {
471  src_queue = src_queue_tbl[queue_idx];
472  dst_queue = dst_queue_tbl[queue_idx];
473 
474  queue_idx++;
475  if (queue_idx == num_queue)
476  queue_idx = 0;
477 
478  num_ev = odp_queue_deq_multi(src_queue, ev, max_burst);
479 
480  if (odp_unlikely(num_ev < 0))
481  ODPH_ABORT("odp_queue_deq_multi() failed\n");
482 
483  if (odp_unlikely(num_ev == 0))
484  num_deq_retry++;
485 
486  } while (num_ev == 0);
487 
488  while (num_enq < num_ev) {
489  int num = odp_queue_enq_multi(dst_queue, &ev[num_enq], num_ev - num_enq);
490 
491  if (odp_unlikely(num < 0))
492  ODPH_ABORT("odp_queue_enq_multi() failed\n");
493 
494  num_enq += num;
495 
496  if (odp_unlikely(num_enq != num_ev))
497  num_enq_retry++;
498  }
499  events += num_ev;
500  }
501 
502  c2 = odp_cpu_cycles();
503  t2 = odp_time_local_strict();
504 
505  odp_atomic_inc_u32(&global->workers_finished);
506 
507  /* Keep forwarding events in pair mode until all workers have completed */
508  while (thr_args->options->mode == TEST_MODE_PAIR &&
509  odp_atomic_load_u32(&global->workers_finished) < num_workers) {
510  int num_enq = 0;
511 
512  src_queue = src_queue_tbl[queue_idx];
513  dst_queue = dst_queue_tbl[queue_idx];
514 
515  queue_idx++;
516  if (queue_idx == num_queue)
517  queue_idx = 0;
518 
519  num_ev = odp_queue_deq_multi(src_queue, ev, max_burst);
520 
521  while (num_enq < num_ev) {
522  int num = odp_queue_enq_multi(dst_queue, &ev[num_enq], num_ev - num_enq);
523 
524  if (odp_unlikely(num < 0))
525  ODPH_ABORT("odp_queue_enq_multi() failed\n");
526 
527  num_enq += num;
528  }
529  }
530 
531  nsec = odp_time_diff_ns(t2, t1);
532  cycles = odp_cpu_cycles_diff(c2, c1);
533 
534  stat->rounds = rounds;
535  stat->events = events;
536  stat->nsec = nsec;
537  stat->cycles = cycles;
538  stat->deq_retry = num_deq_retry;
539  stat->enq_retry = num_enq_retry;
540 
541  return 0;
542 }
543 
544 static void map_queues_to_threads(test_global_t *global)
545 {
546  test_options_t *opt = &global->options;
547 
548  if (opt->mode == TEST_MODE_LOOP) {
549  if (!opt->private_queues) {
550  for (uint32_t i = 0; i < opt->num_queue; i++) {
551  for (uint32_t j = 0; j < opt->num_cpu; j++) {
552  thread_args_t *thread_args = &global->thread_args[j];
553 
554  thread_args->src_queue_id[i] = i;
555  thread_args->dst_queue_id[i] = i;
556  thread_args->num_queues++;
557  }
558  }
559  return;
560  }
561 
562  for (uint32_t i = 0; i < opt->num_queue; i++) {
563  thread_args_t *thread_args = &global->thread_args[i % opt->num_cpu];
564  uint32_t queue_idx = thread_args->num_queues;
565 
566  thread_args->src_queue_id[queue_idx] = i;
567  thread_args->dst_queue_id[queue_idx] = i;
568  thread_args->num_queues++;
569  }
570  return;
571  }
572  /* Pair mode. Always an even number of both queues and CPUs. */
573  if (!opt->private_queues) {
574  for (uint32_t i = 0; i < opt->num_queue; i += 2) {
575  for (uint32_t j = 0; j < opt->num_cpu; j++) {
576  thread_args_t *thread_args = &global->thread_args[j];
577  uint32_t num_queues = thread_args->num_queues;
578 
579  if (j % 2 == 0) {
580  thread_args->src_queue_id[num_queues] = i;
581  thread_args->dst_queue_id[num_queues] = i + 1;
582  } else {
583  thread_args->src_queue_id[num_queues] = i + 1;
584  thread_args->dst_queue_id[num_queues] = i;
585  }
586  thread_args->num_queues++;
587  }
588  }
589  return;
590  }
591 
592  for (uint32_t i = 0; i < opt->num_queue; i += 2) {
593  uint32_t num_queues;
594  uint32_t thread_a_idx = i % opt->num_cpu;
595  thread_args_t *thread_a_args = &global->thread_args[thread_a_idx];
596  thread_args_t *thread_b_args = &global->thread_args[thread_a_idx + 1];
597 
598  num_queues = thread_a_args->num_queues;
599  thread_a_args->src_queue_id[num_queues] = i;
600  thread_a_args->dst_queue_id[num_queues] = i + 1;
601  thread_a_args->num_queues++;
602 
603  num_queues = thread_b_args->num_queues;
604  thread_b_args->src_queue_id[num_queues] = i + 1;
605  thread_b_args->dst_queue_id[num_queues] = i;
606  thread_b_args->num_queues++;
607  }
608 }
609 
610 static void print_queue_mappings(test_global_t *global)
611 {
612  printf("Worker-queue mappings\n");
613  printf("---------------------\n");
614 
615  for (uint32_t i = 0; i < global->options.num_cpu; i++) {
616  thread_args_t *thread_args = &global->thread_args[i];
617  uint32_t num_queues = thread_args->num_queues;
618 
619  printf("Worker %u:\n", i);
620 
621  printf(" src queue idx:");
622  for (uint32_t j = 0; j < num_queues; j++)
623  printf(" %" PRIu32 "", thread_args->src_queue_id[j]);
624  printf("\n dst queue idx:");
625  for (uint32_t j = 0; j < num_queues; j++)
626  printf(" %" PRIu32 "", thread_args->dst_queue_id[j]);
627  printf("\n\n");
628  }
629 }
630 
631 static void init_thread_args(test_global_t *global)
632 {
633  for (uint32_t i = 0; i < global->options.num_cpu; i++) {
634  thread_args_t *thread_args = &global->thread_args[i];
635 
636  thread_args->global = global;
637  thread_args->barrier = &global->barrier;
638  thread_args->options = &global->options;
639  }
640 
641  map_queues_to_threads(global);
642 
643  print_queue_mappings(global);
644 }
645 
646 static int start_workers(test_global_t *global)
647 {
648  odph_thread_common_param_t thr_common;
649  odph_thread_param_t thr_param[ODP_THREAD_COUNT_MAX];
650  odp_cpumask_t cpumask;
651  int ret;
652  test_options_t *test_options = &global->options;
653  int num_cpu = test_options->num_cpu;
654 
655  ret = odp_cpumask_default_worker(&cpumask, num_cpu);
656 
657  if (num_cpu && ret != num_cpu) {
658  ODPH_ERR("Too many workers. Max supported %i\n.", ret);
659  return -1;
660  }
661 
662  /* Zero: all available workers */
663  if (num_cpu == 0) {
664  num_cpu = ret;
665  test_options->num_cpu = num_cpu;
666  }
667 
668  printf(" num workers %u\n\n", num_cpu);
669 
670  odp_barrier_init(&global->barrier, num_cpu);
671 
672  odph_thread_common_param_init(&thr_common);
673  thr_common.instance = global->instance;
674  thr_common.cpumask = &cpumask;
675 
676  init_thread_args(global);
677 
678  for (int i = 0; i < num_cpu; i++) {
679  odph_thread_param_init(&thr_param[i]);
680  thr_param[i].start = run_test;
681  thr_param[i].arg = &global->thread_args[i];
682  thr_param[i].thr_type = ODP_THREAD_WORKER;
683  }
684 
685  if (odph_thread_create(global->thread_tbl, &thr_common, thr_param,
686  num_cpu) != num_cpu)
687  return -1;
688 
689  return 0;
690 }
691 
692 static int output_results(test_global_t *global)
693 {
694  int i, num;
695  double rounds_ave, events_ave, nsec_ave, cycles_ave;
696  test_stat_t *stats;
697  test_options_t *test_options = &global->options;
698  int num_cpu = test_options->num_cpu;
699  uint64_t rounds_sum = 0;
700  uint64_t events_sum = 0;
701  uint64_t nsec_sum = 0;
702  uint64_t cycles_sum = 0;
703  uint64_t deq_retry_sum = 0;
704  uint64_t enq_retry_sum = 0;
705 
706  /* Averages */
707  for (i = 0; i < ODP_THREAD_COUNT_MAX; i++) {
708  stats = &global->thread_args[i].stats;
709  rounds_sum += stats->rounds;
710  events_sum += stats->events;
711  nsec_sum += stats->nsec;
712  cycles_sum += stats->cycles;
713  deq_retry_sum += stats->deq_retry;
714  enq_retry_sum += stats->enq_retry;
715  }
716 
717  if (rounds_sum == 0) {
718  printf("No results.\n");
719  return 0;
720  }
721 
722  rounds_ave = rounds_sum / num_cpu;
723  events_ave = events_sum / num_cpu;
724  nsec_ave = nsec_sum / num_cpu;
725  cycles_ave = cycles_sum / num_cpu;
726  num = 0;
727 
728  printf("RESULTS - per thread (Million events per sec):\n");
729  printf("----------------------------------------------\n");
730  printf(" 1 2 3 4 5 6 7 8 9 10");
731 
732  for (i = 0; i < ODP_THREAD_COUNT_MAX; i++) {
733  stats = &global->thread_args[i].stats;
734  if (stats->rounds) {
735  if ((num % 10) == 0)
736  printf("\n ");
737 
738  printf("%6.1f ", (1000.0 * stats->events) / stats->nsec);
739  num++;
740  }
741  }
742  printf("\n\n");
743 
744  printf("RESULTS - per thread average (%i threads):\n", num_cpu);
745  printf("------------------------------------------\n");
746  printf(" duration: %.3f msec\n", nsec_ave / 1000000);
747  printf(" num cycles: %.3f M\n", cycles_ave / 1000000);
748  printf(" events per dequeue: %.3f\n",
749  events_ave / rounds_ave);
750  printf(" cycles per event: %.3f\n",
751  cycles_ave / events_ave);
752  printf(" dequeue retries: %" PRIu64 "\n", deq_retry_sum);
753  printf(" enqueue retries: %" PRIu64 "\n", enq_retry_sum);
754  printf(" events per sec: %.3f M\n\n",
755  (1000.0 * events_ave) / nsec_ave);
756 
757  printf("TOTAL events per sec: %.3f M\n\n",
758  (1000.0 * events_sum) / nsec_ave);
759 
760  if (global->common_options.is_export) {
761  if (test_common_write("cycles per event,events per sec (M),TOTAL events per sec (M),"
762  "dequeue retries,enqueue retries\n")) {
763  test_common_write_term();
764  return -1;
765  }
766  if (test_common_write("%f,%f,%f,%" PRIu64 ",%" PRIu64 "\n",
767  cycles_ave / events_ave,
768  (1000.0 * events_ave) / nsec_ave,
769  (1000.0 * events_sum) / nsec_ave,
770  deq_retry_sum,
771  enq_retry_sum)) {
772  test_common_write_term();
773  return -1;
774  }
775  test_common_write_term();
776  }
777 
778  return 0;
779 }
780 
781 int main(int argc, char **argv)
782 {
783  odph_helper_options_t helper_options;
784  odp_instance_t instance;
785  odp_init_t init;
786  odp_shm_t shm;
787  test_global_t *global;
788  test_common_options_t common_options;
789 
790  /* Let helper collect its own arguments (e.g. --odph_proc) */
791  argc = odph_parse_options(argc, argv);
792  if (odph_options(&helper_options)) {
793  ODPH_ERR("Reading ODP helper options failed.\n");
794  exit(EXIT_FAILURE);
795  }
796 
797  argc = test_common_parse_options(argc, argv);
798  if (test_common_options(&common_options)) {
799  ODPH_ERR("Reading test options failed.\n");
800  exit(EXIT_FAILURE);
801  }
802 
803  /* List features not to be used */
804  odp_init_param_init(&init);
805  init.not_used.feat.cls = 1;
806  init.not_used.feat.compress = 1;
807  init.not_used.feat.crypto = 1;
808  init.not_used.feat.ipsec = 1;
809  init.not_used.feat.schedule = 1;
810  init.not_used.feat.timer = 1;
811  init.not_used.feat.tm = 1;
812 
813  init.mem_model = helper_options.mem_model;
814 
815  /* Init ODP before calling anything else */
816  if (odp_init_global(&instance, &init, NULL)) {
817  ODPH_ERR("Global init failed.\n");
818  return -1;
819  }
820 
821  /* Init this thread */
822  if (odp_init_local(instance, ODP_THREAD_WORKER)) {
823  ODPH_ERR("Local init failed.\n");
824  return -1;
825  }
826 
827  shm = odp_shm_reserve("queue_perf_global", sizeof(test_global_t), ODP_CACHE_LINE_SIZE, 0);
828  if (shm == ODP_SHM_INVALID) {
829  ODPH_ERR("Shared memory reserve failed.\n");
830  exit(EXIT_FAILURE);
831  }
832 
833  global = odp_shm_addr(shm);
834  if (global == NULL) {
835  ODPH_ERR("Shared memory address read failed.\n");
836  exit(EXIT_FAILURE);
837  }
838 
839  memset(global, 0, sizeof(test_global_t));
840  global->common_options = common_options;
841  odp_atomic_init_u32(&global->workers_finished, 0);
842 
843  if (parse_options(argc, argv, &global->options))
844  return -1;
845 
847 
848  global->instance = instance;
849 
850  if (create_queues(global))
851  goto destroy;
852 
853  if (start_workers(global)) {
854  ODPH_ERR("Test start failed.\n");
855  return -1;
856  }
857 
858  /* Wait workers to exit */
859  odph_thread_join(global->thread_tbl, global->options.num_cpu);
860 
861  if (output_results(global)) {
862  ODPH_ERR("Outputting results failed.\n");
863  exit(EXIT_FAILURE);
864  }
865 
866 destroy:
867  if (destroy_queues(global)) {
868  ODPH_ERR("Destroy queues failed.\n");
869  return -1;
870  }
871 
872  if (odp_shm_free(shm)) {
873  ODPH_ERR("Shared memory free failed.\n");
874  exit(EXIT_FAILURE);
875  }
876 
877  if (odp_term_local()) {
878  ODPH_ERR("Term local failed.\n");
879  return -1;
880  }
881 
882  if (odp_term_global(instance)) {
883  ODPH_ERR("Term global failed.\n");
884  return -1;
885  }
886 
887  return 0;
888 }
void odp_atomic_init_u32(odp_atomic_u32_t *atom, uint32_t val)
Initialize atomic uint32 variable.
uint32_t odp_atomic_load_u32(odp_atomic_u32_t *atom)
Load value of atomic uint32 variable.
void odp_atomic_inc_u32(odp_atomic_u32_t *atom)
Increment atomic uint32 variable.
void odp_barrier_init(odp_barrier_t *barr, int count)
Initialize barrier with thread count.
void odp_barrier_wait(odp_barrier_t *barr)
Synchronize thread execution on barrier.
odp_event_t odp_buffer_to_event(odp_buffer_t buf)
Convert buffer handle to event.
odp_buffer_t odp_buffer_alloc(odp_pool_t pool)
Buffer alloc.
#define odp_unlikely(x)
Branch unlikely taken.
Definition: spec/hints.h:64
uint64_t odp_cpu_cycles_diff(uint64_t c2, uint64_t c1)
CPU cycle count difference.
uint64_t odp_cpu_cycles(void)
Current CPU cycle count.
int odp_cpumask_default_worker(odp_cpumask_t *mask, int num)
Default CPU mask for worker threads.
void odp_event_free(odp_event_t event)
Free event.
#define ODP_EVENT_INVALID
Invalid event.
void odp_init_param_init(odp_init_t *param)
Initialize the odp_init_t to default values for all fields.
int odp_init_local(odp_instance_t instance, odp_thread_type_t thr_type)
Thread local ODP initialization.
int odp_init_global(odp_instance_t *instance, const odp_init_t *params, const odp_platform_init_t *platform_params)
Global ODP initialization.
int odp_term_local(void)
Thread local ODP termination.
int odp_term_global(odp_instance_t instance)
Global ODP termination.
uint64_t odp_instance_t
ODP instance ID.
odp_pool_t odp_pool_create(const char *name, const odp_pool_param_t *param)
Create a pool.
int odp_pool_capability(odp_pool_capability_t *capa)
Query pool capabilities.
void odp_pool_param_init(odp_pool_param_t *param)
Initialize pool params.
int odp_pool_destroy(odp_pool_t pool)
Destroy a pool previously created by odp_pool_create()
#define ODP_POOL_INVALID
Invalid pool.
@ ODP_POOL_BUFFER
Buffer pool.
odp_nonblocking_t
Non-blocking level.
int odp_queue_enq_multi(odp_queue_t queue, const odp_event_t events[], int num)
Enqueue multiple events to a queue.
void odp_queue_param_init(odp_queue_param_t *param)
Initialize queue params.
int odp_queue_capability(odp_queue_capability_t *capa)
Query queue capabilities.
#define ODP_QUEUE_INVALID
Invalid queue.
odp_event_t odp_queue_deq(odp_queue_t queue)
Dequeue an event from a queue.
int odp_queue_enq(odp_queue_t queue, odp_event_t ev)
Enqueue an event to a queue.
odp_queue_t odp_queue_create(const char *name, const odp_queue_param_t *param)
Queue create.
int odp_queue_destroy(odp_queue_t queue)
Destroy ODP queue.
int odp_queue_deq_multi(odp_queue_t queue, odp_event_t events[], int num)
Dequeue multiple events from a queue.
@ ODP_NONBLOCKING_WF
Non-blocking and wait-free implementation.
@ ODP_BLOCKING
Blocking implementation.
@ ODP_NONBLOCKING_LF
Non-blocking and lock-free implementation.
@ ODP_QUEUE_TYPE_PLAIN
Plain queue.
@ ODP_QUEUE_OP_MT_UNSAFE
Not multithread safe operation.
int odp_shm_free(odp_shm_t shm)
Free a contiguous block of shared memory.
#define ODP_SHM_INVALID
Invalid shared memory block.
void * odp_shm_addr(odp_shm_t shm)
Shared memory block address.
odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, uint32_t flags)
Reserve a contiguous block of shared memory.
bool odp_bool_t
Boolean type.
void odp_sys_info_print(void)
Print system info.
#define ODP_THREAD_COUNT_MAX
Maximum number of threads supported in build time.
@ ODP_THREAD_WORKER
Worker thread.
odp_time_t odp_time_local_strict(void)
Current local time (strict)
uint64_t odp_time_diff_ns(odp_time_t t2, odp_time_t t1)
Time difference in nanoseconds.
The OpenDataPlane API.
Global initialization parameters.
odp_mem_model_t mem_model
Application memory model.
odp_feature_t not_used
Unused features.
struct odp_pool_capability_t::@121 buf
Buffer pool capabilities
uint32_t max_num
Maximum number of buffers of any size.
Pool parameters.
uint32_t num
Number of buffers in the pool.
odp_pool_type_t type
Pool type.
struct odp_pool_param_t::@125 buf
Parameters for buffer pools.
struct odp_queue_capability_t::@140::@142 waitfree
Wait-free (ODP_NONBLOCKING_WF) implementation capabilities.
uint32_t max_size
Maximum number of events a plain (ODP_BLOCKING) queue can store simultaneously.
uint32_t max_num
Maximum number of plain (ODP_BLOCKING) queues of the default size.
struct odp_queue_capability_t::@140::@141 lockfree
Lock-free (ODP_NONBLOCKING_LF) implementation capabilities.
struct odp_queue_capability_t::@140 plain
Plain queue capabilities.
ODP Queue parameters.
odp_queue_op_mode_t enq_mode
Enqueue mode.
uint32_t size
Queue size.
odp_queue_type_t type
Queue type.
odp_queue_op_mode_t deq_mode
Dequeue mode.
odp_nonblocking_t nonblocking
Non-blocking level.
uint32_t tm
Traffic Manager APIs, e.g., odp_tm_xxx()
uint32_t crypto
Crypto APIs, e.g., odp_crypto_xxx()
uint32_t ipsec
IPsec APIs, e.g., odp_ipsec_xxx()
uint32_t timer
Timer APIs, e.g., odp_timer_xxx(), odp_timeout_xxx()
uint32_t cls
Classifier APIs, e.g., odp_cls_xxx(), odp_cos_xxx()
uint32_t schedule
Scheduler APIs, e.g., odp_schedule_xxx()
struct odp_feature_t::@148 feat
Individual feature bits.
uint32_t compress
Compression APIs, e.g., odp_comp_xxx()