API Reference Manual  1.46.0
odp_sched_latency.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2016-2018 Linaro Limited
3  * Copyright (c) 2020-2024 Nokia
4  */
5 
14 #include <string.h>
15 #include <stdlib.h>
16 #include <inttypes.h>
17 
18 /* ODP main header */
19 #include <odp_api.h>
20 
21 /* ODP helper for Linux apps */
22 #include <odp/helper/odph_api.h>
23 
24 /* Result export helpers */
25 #include <export_results.h>
26 
27 /* GNU lib C */
28 #include <getopt.h>
29 
30 #define MAX_QUEUES 4096
31 #define MAX_GROUPS 64
32 #define EVENT_POOL_SIZE (1024 * 1024)
33 #define MAIN_THREAD 1
35 #define CACHE_ALIGN_ROUNDUP(x)\
36  ((ODP_CACHE_LINE_SIZE) * \
37  (((x) + ODP_CACHE_LINE_SIZE - 1) / (ODP_CACHE_LINE_SIZE)))
38 
39 /* Test priorities */
40 #define NUM_PRIOS 2
41 #define HI_PRIO 0
42 #define LO_PRIO 1
43 
44 /* Test event forwarding mode */
45 #define EVENT_FORWARD_RAND 0
46 #define EVENT_FORWARD_INC 1
47 #define EVENT_FORWARD_NONE 2
48 
50 typedef enum {
51  WARM_UP,
52  COOL_DOWN,
53  TRAFFIC,
54  SAMPLE
55 } event_type_t;
56 
58 typedef struct {
59  odp_time_t time_stamp;
60  event_type_t type;
61  int src_idx[NUM_PRIOS];
62  int prio;
63  int warm_up_rounds;
64 } test_event_t;
65 
67 typedef struct {
68  unsigned int cpu_count;
69  odp_schedule_sync_t sync_type;
70  int forward_mode;
71  int num_group;
72  int isolate;
73  int test_rounds;
74  int warm_up_rounds;
75  struct {
76  int queues;
77  int events;
78  int sample_events;
79  odp_bool_t events_per_queue;
81  } prio[NUM_PRIOS];
82  odp_bool_t sample_per_prio;
84 } test_args_t;
85 
87 typedef struct {
88  uint64_t events;
89  uint64_t sample_events;
90  uint64_t tot;
91  uint64_t min;
92  uint64_t max;
93  uint64_t max_idx;
94 } test_stat_t;
95 
97 typedef struct ODP_ALIGNED_CACHE {
98  test_stat_t prio[NUM_PRIOS];
99 } core_stat_t;
100 
102 typedef struct {
104  core_stat_t core_stat[ODP_THREAD_COUNT_MAX];
105  odp_barrier_t barrier;
106  odp_pool_t pool;
107  test_args_t args;
108  odp_queue_t queue[NUM_PRIOS][MAX_QUEUES];
109  test_common_options_t common_options;
111  odp_schedule_group_t group[NUM_PRIOS][MAX_GROUPS];
112 
113 } test_globals_t;
114 
120 static void clear_sched_queues(test_globals_t *globals)
121 {
122  odp_event_t ev;
123  odp_buffer_t buf;
124  test_event_t *event;
125  int i, j;
126  odp_queue_t fromq;
127 
128  /* Allocate the cool_down event. */
129  buf = odp_buffer_alloc(globals->pool);
130  if (buf == ODP_BUFFER_INVALID)
131  ODPH_ABORT("Buffer alloc failed.\n");
132 
133  event = odp_buffer_addr(buf);
134  event->type = COOL_DOWN;
135  ev = odp_buffer_to_event(buf);
136 
137  for (i = 0; i < NUM_PRIOS; i++) {
138  for (j = 0; j < globals->args.prio[i].queues; j++) {
139  /* Enqueue cool_down event on each queue. */
140  if (odp_queue_enq(globals->queue[i][j], ev))
141  ODPH_ABORT("Queue enqueue failed.\n");
142 
143  /* Invoke scheduler until cool_down event has been
144  * received. */
145  while (1) {
146  ev = odp_schedule(NULL, ODP_SCHED_WAIT);
147  buf = odp_buffer_from_event(ev);
148  event = odp_buffer_addr(buf);
149  if (event->type == COOL_DOWN)
150  break;
151  odp_event_free(ev);
152  }
153  }
154  }
155 
156  /* Free the cool_down event. */
157  odp_event_free(ev);
158 
159  /* Call odp_schedule() to trigger a release of any scheduler context. */
160  ev = odp_schedule(&fromq, ODP_SCHED_NO_WAIT);
161  if (ev != ODP_EVENT_INVALID)
162  ODPH_ABORT("Queue %" PRIu64 " not empty.\n",
163  odp_queue_to_u64(fromq));
164 }
165 
180 static int enqueue_events(int prio, int num_queues, int num_events,
181  int num_samples, odp_bool_t div_events,
182  test_globals_t *globals)
183 {
184  odp_buffer_t buf[num_events + num_samples];
185  odp_event_t ev[num_events + num_samples];
186  odp_queue_t queue;
187  test_event_t *event;
188  int i, j, ret;
189  int enq_events;
190  int events_per_queue;
191  int tot_events;
192  int rdy_events = 0;
193 
194  tot_events = num_events + num_samples;
195 
196  if (!num_queues || !tot_events)
197  return 0;
198 
199  events_per_queue = tot_events;
200  if (div_events)
201  events_per_queue = (tot_events + num_queues - 1) / num_queues;
202 
203  for (i = 0; i < num_queues; i++) {
204  queue = globals->queue[prio][i];
205 
206  ret = odp_buffer_alloc_multi(globals->pool, buf,
207  events_per_queue);
208  if (ret != events_per_queue) {
209  ODPH_ERR("Buffer alloc failed. Try increasing EVENT_POOL_SIZE.\n");
210  ret = ret < 0 ? 0 : ret;
211  odp_buffer_free_multi(buf, ret);
212  return -1;
213  }
214  for (j = 0; j < events_per_queue; j++) {
215  if (!odp_buffer_is_valid(buf[j])) {
216  ODPH_ERR("Buffer alloc failed\n");
217  odp_buffer_free_multi(buf, events_per_queue);
218  return -1;
219  }
220 
221  event = odp_buffer_addr(buf[j]);
222  memset(event, 0, sizeof(test_event_t));
223 
224  /* Latency isn't measured from the first processing
225  * rounds. */
226  if (num_samples > 0) {
227  event->type = WARM_UP;
228  event->warm_up_rounds = 0;
229  num_samples--;
230  } else {
231  event->type = TRAFFIC;
232  }
233  event->src_idx[prio] = i;
234  event->prio = prio;
235  ev[j] = odp_buffer_to_event(buf[j]);
236  }
237 
238  enq_events = 0;
239  do {
240  ret = odp_queue_enq_multi(queue, &ev[enq_events],
241  events_per_queue -
242  enq_events);
243  if (ret < 0) {
244  ODPH_ERR("Queue enqueue failed.\n");
245  return -1;
246  }
247  enq_events += ret;
248  } while (enq_events < events_per_queue);
249 
250  rdy_events += events_per_queue;
251  if (div_events && rdy_events >= tot_events)
252  return 0;
253  }
254  return 0;
255 }
256 
262 static int output_results(test_globals_t *globals)
263 {
264  test_stat_t *lat;
265  odp_schedule_sync_t stype;
266  test_stat_t total;
267  test_args_t *args;
268  uint64_t avg;
269  unsigned int i, j;
270 
271  args = &globals->args;
272  stype = globals->args.sync_type;
273 
274  printf("\n%s queue scheduling latency\n",
275  (stype == ODP_SCHED_SYNC_ATOMIC) ? "ATOMIC" :
276  ((stype == ODP_SCHED_SYNC_ORDERED) ? "ORDERED" : "PARALLEL"));
277 
278  printf(" Forwarding mode: %s\n",
279  (args->forward_mode == EVENT_FORWARD_RAND) ? "random" :
280  ((args->forward_mode == EVENT_FORWARD_INC) ? "incremental" :
281  "none"));
282 
283  printf(" LO_PRIO queues: %i\n", args->prio[LO_PRIO].queues);
284  if (args->prio[LO_PRIO].events_per_queue)
285  printf(" LO_PRIO event per queue: %i\n",
286  args->prio[LO_PRIO].events);
287  else
288  printf(" LO_PRIO events: %i\n", args->prio[LO_PRIO].events);
289 
290  printf(" LO_PRIO sample events: %i\n", args->prio[LO_PRIO].sample_events);
291 
292  printf(" HI_PRIO queues: %i\n", args->prio[HI_PRIO].queues);
293  if (args->prio[HI_PRIO].events_per_queue)
294  printf(" HI_PRIO event per queue: %i\n\n",
295  args->prio[HI_PRIO].events);
296  else
297  printf(" HI_PRIO events: %i\n", args->prio[HI_PRIO].events);
298 
299  printf(" HI_PRIO sample events: %i\n\n", args->prio[HI_PRIO].sample_events);
300 
301  if (globals->common_options.is_export) {
302  if (test_common_write("high priority Avg (ns),high priority Min (ns),"
303  "high priority Max (ns),low priority Avg (ns),"
304  "low priority Min (ns),low priority Max (ns)\n")) {
305  ODPH_ERR("Export failed\n");
306  test_common_write_term();
307  return -1;
308  }
309  }
310 
311  for (i = 0; i < NUM_PRIOS; i++) {
312  memset(&total, 0, sizeof(test_stat_t));
313  total.min = UINT64_MAX;
314 
315  printf("%s priority\n"
316  "Thread Avg[ns] Min[ns] Max[ns] Samples Total Max idx\n"
317  "-----------------------------------------------------------------------\n",
318  i == HI_PRIO ? "HIGH" : "LOW");
319  for (j = 1; j <= args->cpu_count; j++) {
320  lat = &globals->core_stat[j].prio[i];
321 
322  if (lat->sample_events == 0) {
323  printf("%-8d N/A\n", j);
324  continue;
325  }
326 
327  if (lat->max > total.max)
328  total.max = lat->max;
329  if (lat->min < total.min)
330  total.min = lat->min;
331  total.tot += lat->tot;
332  total.sample_events += lat->sample_events;
333  total.events += lat->events;
334 
335  avg = lat->events ? lat->tot / lat->sample_events : 0;
336  printf("%-8d %-10" PRIu64 " %-10" PRIu64 " "
337  "%-10" PRIu64 " %-10" PRIu64 " %-10" PRIu64 " %-10" PRIu64 "\n",
338  j, avg, lat->min, lat->max, lat->sample_events,
339  lat->events, lat->max_idx);
340  }
341  printf("-----------------------------------------------------------------------\n");
342  if (total.sample_events == 0) {
343  printf("Total N/A\n\n");
344  continue;
345  }
346  avg = total.events ? total.tot / total.sample_events : 0;
347  printf("Total %-10" PRIu64 " %-10" PRIu64 " %-10" PRIu64 " "
348  "%-10" PRIu64 " %-10" PRIu64 "\n\n", avg, total.min,
349  total.max, total.sample_events, total.events);
350 
351  if (globals->common_options.is_export) {
352  if (test_common_write("%" PRIu64 ",%" PRIu64 ",%" PRIu64 "%s",
353  avg, total.min, total.max, i == 0 ? "," : "")) {
354  ODPH_ERR("Export failed\n");
355  test_common_write_term();
356  return -1;
357  }
358  }
359  }
360 
361  if (globals->common_options.is_export)
362  test_common_write_term();
363 
364  return 0;
365 }
366 
367 static int join_groups(test_globals_t *globals, int thr)
368 {
369  odp_thrmask_t thrmask;
370  odp_schedule_group_t group;
371  int i, num;
372  int num_group = globals->args.num_group;
373 
374  if (num_group <= 0)
375  return 0;
376 
377  num = num_group;
378  if (globals->args.isolate)
379  num = 2 * num_group;
380 
381  odp_thrmask_zero(&thrmask);
382  odp_thrmask_set(&thrmask, thr);
383 
384  for (i = 0; i < num; i++) {
385  if (globals->args.isolate)
386  group = globals->group[i % 2][i / 2];
387  else
388  group = globals->group[0][i];
389 
390  if (odp_schedule_group_join(group, &thrmask)) {
391  ODPH_ERR("Group join failed %i (thr %i)\n", i, thr);
392  return -1;
393  }
394  }
395 
396  return 0;
397 }
398 
416 static int test_schedule(int thr, test_globals_t *globals)
417 {
418  odp_time_t time;
419  odp_event_t ev;
420  odp_buffer_t buf;
421  odp_queue_t dst_queue;
422  uint64_t latency;
423  uint64_t i;
424  test_event_t *event;
425  test_stat_t *stats;
426  int dst_idx, change_queue;
427  int warm_up_rounds = globals->args.warm_up_rounds;
428  uint64_t test_rounds = globals->args.test_rounds * (uint64_t)1000000;
429 
430  memset(&globals->core_stat[thr], 0, sizeof(core_stat_t));
431  globals->core_stat[thr].prio[HI_PRIO].min = UINT64_MAX;
432  globals->core_stat[thr].prio[LO_PRIO].min = UINT64_MAX;
433 
434  change_queue = globals->args.forward_mode != EVENT_FORWARD_NONE ? 1 : 0;
435 
436  odp_barrier_wait(&globals->barrier);
437 
438  for (i = 0; i < test_rounds; i++) {
439  ev = odp_schedule(NULL, ODP_SCHED_WAIT);
440 
441  time = odp_time_global_strict();
442 
443  buf = odp_buffer_from_event(ev);
444  event = odp_buffer_addr(buf);
445 
446  stats = &globals->core_stat[thr].prio[event->prio];
447 
448  if (event->type == SAMPLE) {
449  latency = odp_time_to_ns(time) - odp_time_to_ns(event->time_stamp);
450 
451  if (latency > stats->max) {
452  stats->max = latency;
453  stats->max_idx = stats->sample_events;
454  }
455  if (latency < stats->min)
456  stats->min = latency;
457  stats->tot += latency;
458  stats->sample_events++;
459 
460  /* Move sample event to a different priority */
461  if (!globals->args.sample_per_prio &&
462  globals->args.prio[!event->prio].queues)
463  event->prio = !event->prio;
464  }
465 
466  if (odp_unlikely(event->type == WARM_UP)) {
467  event->warm_up_rounds++;
468  if (event->warm_up_rounds >= warm_up_rounds)
469  event->type = SAMPLE;
470  } else {
471  stats->events++;
472  }
473 
474  /* Move event to next queue if forwarding is enabled */
475  if (change_queue)
476  dst_idx = event->src_idx[event->prio] + 1;
477  else
478  dst_idx = event->src_idx[event->prio];
479  if (dst_idx >= globals->args.prio[event->prio].queues)
480  dst_idx = 0;
481  event->src_idx[event->prio] = dst_idx;
482  dst_queue = globals->queue[event->prio][dst_idx];
483 
484  if (event->type == SAMPLE)
485  event->time_stamp = odp_time_global_strict();
486 
487  if (odp_queue_enq(dst_queue, ev)) {
488  ODPH_ERR("[%i] Queue enqueue failed.\n", thr);
489  odp_event_free(ev);
490  return -1;
491  }
492  }
493 
494  /* Clear possible locally stored buffers */
496 
497  while (1) {
498  odp_queue_t src_queue;
499 
500  ev = odp_schedule(&src_queue, ODP_SCHED_NO_WAIT);
501 
502  if (ev == ODP_EVENT_INVALID)
503  break;
504 
505  if (odp_queue_enq(src_queue, ev)) {
506  ODPH_ERR("[%i] Queue enqueue failed.\n", thr);
507  odp_event_free(ev);
508  return -1;
509  }
510  }
511 
512  odp_barrier_wait(&globals->barrier);
513 
514  if (thr == MAIN_THREAD) {
516  clear_sched_queues(globals);
517  if (output_results(globals))
518  return -1;
519  }
520 
521  return 0;
522 }
523 
532 static int run_thread(void *arg ODP_UNUSED)
533 {
534  odp_shm_t shm;
535  test_globals_t *globals;
536  test_args_t *args;
537  int thr;
538 
539  thr = odp_thread_id();
540 
541  shm = odp_shm_lookup("test_globals");
542  globals = odp_shm_addr(shm);
543 
544  if (globals == NULL) {
545  ODPH_ERR("Shared mem lookup failed\n");
546  return -1;
547  }
548 
549  if (join_groups(globals, thr))
550  return -1;
551 
552  if (thr == MAIN_THREAD) {
553  args = &globals->args;
554 
555  if (enqueue_events(HI_PRIO, args->prio[HI_PRIO].queues,
556  args->prio[HI_PRIO].events, args->prio[HI_PRIO].sample_events,
557  !args->prio[HI_PRIO].events_per_queue,
558  globals))
559  return -1;
560 
561  if (enqueue_events(LO_PRIO, args->prio[LO_PRIO].queues,
562  args->prio[LO_PRIO].events, args->prio[LO_PRIO].sample_events,
563  !args->prio[LO_PRIO].events_per_queue,
564  globals))
565  return -1;
566  }
567 
568  if (test_schedule(thr, globals))
569  return -1;
570 
571  return 0;
572 }
573 
577 static void usage(void)
578 {
579  printf("\n"
580  "OpenDataPlane scheduler latency benchmark application.\n"
581  "\n"
582  "Usage: ./odp_sched_latency [options]\n"
583  "Optional OPTIONS:\n"
584  " -c, --count <number> CPU count, 0=all available, default=1\n"
585  " -d, --duration <number> Test duration in scheduling rounds (millions), default=10, min=1\n"
586  " -f, --forward-mode <mode> Selection of target queue\n"
587  " 0: Random (default)\n"
588  " 1: Incremental\n"
589  " 2: Use source queue\n"
590  " -g, --num_group <num> Number of schedule groups. Round robins queues into groups.\n"
591  " -1: SCHED_GROUP_WORKER\n"
592  " 0: SCHED_GROUP_ALL (default)\n"
593  " -i, --isolate <mode> Select if shared or isolated groups are used. Ignored when num_group <= 0.\n"
594  " 0: All queues share groups (default)\n"
595  " 1: Separate groups for high and low priority queues. Creates 2xnum_group groups.\n"
596  " -l, --lo-prio-queues <number> Number of low priority scheduled queues (default=64)\n"
597  " -t, --hi-prio-queues <number> Number of high priority scheduled queues (default=16)\n"
598  " -m, --lo-prio-events-per-queue <number> Number of events per low priority queue (default=32).\n"
599  " Does not include sample event.\n"
600  " -n, --hi-prio-events-per-queue <number> Number of events per high priority queues (default=0)\n"
601  " Does not include sample event.\n"
602  " -o, --lo-prio-events <number> Total number of low priority events. Overrides the\n"
603  " number of events per queue, does not include sample event.\n"
604  " -p, --hi-prio-events <number> Total number of high priority events. Overrides the\n"
605  " number of events per queue, does not include sample event.\n"
606  " -r --sample-per-prio Allocate a separate sample event for each priority. By default\n"
607  " a single sample event is used and its priority is changed after\n"
608  " each processing round.\n"
609  " -s, --sync Scheduled queues' sync type\n"
610  " 0: ODP_SCHED_SYNC_PARALLEL (default)\n"
611  " 1: ODP_SCHED_SYNC_ATOMIC\n"
612  " 2: ODP_SCHED_SYNC_ORDERED\n"
613  " -w, --warm-up <number> Number of warm-up rounds, default=100, min=1\n"
614  " -h, --help Display help and exit.\n\n");
615 }
616 
624 static void parse_args(int argc, char *argv[], test_args_t *args)
625 {
626  int opt;
627  int i;
628 
629  static const struct option longopts[] = {
630  {"count", required_argument, NULL, 'c'},
631  {"duration", required_argument, NULL, 'd'},
632  {"forward-mode", required_argument, NULL, 'f'},
633  {"num_group", required_argument, NULL, 'g'},
634  {"isolate", required_argument, NULL, 'i'},
635  {"lo-prio-queues", required_argument, NULL, 'l'},
636  {"hi-prio-queues", required_argument, NULL, 't'},
637  {"lo-prio-events-per-queue", required_argument, NULL, 'm'},
638  {"hi-prio-events-per-queue", required_argument, NULL, 'n'},
639  {"lo-prio-events", required_argument, NULL, 'o'},
640  {"hi-prio-events", required_argument, NULL, 'p'},
641  {"sync", required_argument, NULL, 's'},
642  {"warm-up", required_argument, NULL, 'w'},
643  {"sample-per-prio", no_argument, NULL, 'r'},
644  {"help", no_argument, NULL, 'h'},
645  {NULL, 0, NULL, 0}
646  };
647 
648  static const char *shortopts = "+c:d:f:g:i:l:t:m:n:o:p:s:w:rh";
649 
650  args->cpu_count = 1;
651  args->forward_mode = EVENT_FORWARD_RAND;
652  args->num_group = 0;
653  args->isolate = 0;
654  args->test_rounds = 10;
655  args->warm_up_rounds = 100;
656  args->sync_type = ODP_SCHED_SYNC_PARALLEL;
657  args->sample_per_prio = 0;
658  args->prio[LO_PRIO].queues = 64;
659  args->prio[HI_PRIO].queues = 16;
660  args->prio[LO_PRIO].events = 32;
661  args->prio[HI_PRIO].events = 0;
662  args->prio[LO_PRIO].events_per_queue = 1;
663  args->prio[HI_PRIO].events_per_queue = 0;
664  args->prio[LO_PRIO].sample_events = 0;
665  args->prio[HI_PRIO].sample_events = 1;
666 
667  while (1) {
668  opt = getopt_long(argc, argv, shortopts, longopts, NULL);
669 
670  if (opt == -1)
671  break; /* No more options */
672 
673  switch (opt) {
674  case 'c':
675  args->cpu_count = atoi(optarg);
676  break;
677  case 'd':
678  args->test_rounds = atoi(optarg);
679  break;
680  case 'f':
681  args->forward_mode = atoi(optarg);
682  break;
683  case 'g':
684  args->num_group = atoi(optarg);
685  break;
686  case 'i':
687  args->isolate = atoi(optarg);
688  break;
689  case 'l':
690  args->prio[LO_PRIO].queues = atoi(optarg);
691  break;
692  case 't':
693  args->prio[HI_PRIO].queues = atoi(optarg);
694  break;
695  case 'm':
696  args->prio[LO_PRIO].events = atoi(optarg);
697  args->prio[LO_PRIO].events_per_queue = 1;
698  break;
699  case 'n':
700  args->prio[HI_PRIO].events = atoi(optarg);
701  args->prio[HI_PRIO].events_per_queue = 1;
702  break;
703  case 'o':
704  args->prio[LO_PRIO].events = atoi(optarg);
705  args->prio[LO_PRIO].events_per_queue = 0;
706  break;
707  case 'p':
708  args->prio[HI_PRIO].events = atoi(optarg);
709  args->prio[HI_PRIO].events_per_queue = 0;
710  break;
711  case 's':
712  i = atoi(optarg);
713  if (i == 1)
714  args->sync_type = ODP_SCHED_SYNC_ATOMIC;
715  else if (i == 2)
716  args->sync_type = ODP_SCHED_SYNC_ORDERED;
717  else
718  args->sync_type = ODP_SCHED_SYNC_PARALLEL;
719  break;
720  case 'r':
721  args->sample_per_prio = 1;
722  break;
723  case 'w':
724  args->warm_up_rounds = atoi(optarg);
725  break;
726  case 'h':
727  usage();
728  exit(EXIT_SUCCESS);
729  break;
730 
731  default:
732  break;
733  }
734  }
735 
736  /* Make sure arguments are valid */
737  /* -1 for main thread */
738  if (args->cpu_count > ODP_THREAD_COUNT_MAX - 1)
739  args->cpu_count = ODP_THREAD_COUNT_MAX - 1;
740  if (args->prio[LO_PRIO].queues > MAX_QUEUES)
741  args->prio[LO_PRIO].queues = MAX_QUEUES;
742  if (args->prio[HI_PRIO].queues > MAX_QUEUES)
743  args->prio[HI_PRIO].queues = MAX_QUEUES;
744  if (args->test_rounds < 1)
745  args->test_rounds = 1;
746  if (!args->prio[HI_PRIO].queues && !args->prio[LO_PRIO].queues) {
747  printf("No queues configured\n");
748  usage();
749  exit(EXIT_FAILURE);
750  }
751  if (args->forward_mode > EVENT_FORWARD_NONE ||
752  args->forward_mode < EVENT_FORWARD_RAND) {
753  printf("Invalid forwarding mode\n");
754  usage();
755  exit(EXIT_FAILURE);
756  }
757 
758  if (args->num_group > MAX_GROUPS) {
759  ODPH_ERR("Too many groups. Max supported %i.\n", MAX_GROUPS);
760  exit(EXIT_FAILURE);
761  }
762 
763  if (args->prio[HI_PRIO].queues == 0 || args->sample_per_prio)
764  args->prio[LO_PRIO].sample_events = 1;
765 }
766 
767 static void randomize_queues(odp_queue_t queues[], uint32_t num, uint64_t *seed)
768 {
769  uint32_t i;
770 
771  for (i = 0; i < num; i++) {
772  uint32_t new_index;
773  odp_queue_t swap_queue;
774  odp_queue_t cur_queue = queues[i];
775 
776  odp_random_test_data((uint8_t *)&new_index, sizeof(new_index),
777  seed);
778  new_index = new_index % num;
779  swap_queue = queues[new_index];
780 
781  queues[new_index] = cur_queue;
782  queues[i] = swap_queue;
783  }
784 }
785 
786 static int create_groups(test_globals_t *globals, odp_schedule_group_t group[], int num)
787 {
788  odp_schedule_capability_t sched_capa;
789  odp_thrmask_t zeromask;
790  int i, j, max;
791 
792  if (num <= 0)
793  return 0;
794 
795  if (odp_schedule_capability(&sched_capa)) {
796  ODPH_ERR("Schedule capability failed\n");
797  return 0;
798  }
799 
800  max = sched_capa.max_groups - 3;
801  if (num > max) {
802  printf("Too many schedule groups %i (max %u)\n", num, max);
803  return 0;
804  }
805 
806  for (i = 0; i < NUM_PRIOS; i++)
807  for (j = 0; j < MAX_GROUPS; j++)
808  globals->group[i][j] = ODP_SCHED_GROUP_INVALID;
809 
810  odp_thrmask_zero(&zeromask);
811 
812  for (i = 0; i < num; i++) {
813  group[i] = odp_schedule_group_create("test_group", &zeromask);
814 
815  if (group[i] == ODP_SCHED_GROUP_INVALID) {
816  ODPH_ERR("Group create failed %i\n", i);
817  break;
818  }
819 
820  if (globals->args.isolate) {
821  globals->group[i % 2][i / 2] = group[i];
822  } else {
823  globals->group[0][i] = group[i];
824  globals->group[1][i] = group[i];
825  }
826  }
827 
828  return i;
829 }
830 
831 static int destroy_groups(odp_schedule_group_t group[], int num)
832 {
833  int i;
834 
835  if (num <= 0)
836  return 0;
837 
838  for (i = 0; i < num; i++) {
839  if (odp_schedule_group_destroy(group[i])) {
840  ODPH_ERR("Group destroy failed %i\n", i);
841  return -1;
842  }
843  }
844 
845  return 0;
846 }
847 
848 static int calc_queue_sizes(test_globals_t *globals, uint32_t queue_size[])
849 {
851  test_args_t *args = &globals->args;
852  const uint32_t min_queue_size = 256;
853  uint32_t tot_queues = 0;
854 
855  if (odp_schedule_capability(&capa)) {
856  ODPH_ERR("Schedule capability failed\n");
857  return -1;
858  }
859 
860  for (int i = 0; i < NUM_PRIOS; i++) {
861  uint32_t events = args->prio[i].events;
862  int queues = args->prio[i].queues;
863 
864  if (!args->prio[i].events_per_queue && queues)
865  events = (events + queues - 1) / queues;
866 
867  /* Events may stack up if forwarding is enabled */
868  if (args->forward_mode != EVENT_FORWARD_NONE)
869  events *= queues;
870 
871  /* Reserve room for sample event */
872  events++;
873 
874  queue_size[i] = ODPH_MAX(events, min_queue_size);
875 
876  if (capa.max_queue_size && queue_size[i] > capa.max_queue_size) {
877  ODPH_ERR("Warn: queues may not be able to store all events (required size "
878  "%" PRIu32 ", max supported %" PRIu32 ")\n", queue_size[i],
879  capa.max_queue_size);
880  queue_size[i] = capa.max_queue_size;
881  }
882  tot_queues += queues;
883  }
884 
885  if (tot_queues > capa.max_queues) {
886  ODPH_ERR("Requested %" PRIu32 " queues, max %" PRIu32 " supported\n",
887  tot_queues, capa.max_queues);
888  return -1;
889  }
890 
891  return 0;
892 }
893 
894 static int create_queues(test_globals_t *globals)
895 {
896  odp_queue_param_t param;
897  test_args_t *args = &globals->args;
898  int num_group = args->num_group;
899  uint32_t queue_size[NUM_PRIOS];
900 
901  if (calc_queue_sizes(globals, queue_size))
902  return -1;
903 
904  odp_queue_param_init(&param);
905  param.type = ODP_QUEUE_TYPE_SCHED;
906  param.sched.sync = args->sync_type;
907 
908  for (int i = 0; i < NUM_PRIOS; i++) {
909  char name[] = "sched_XX_YY";
910  odp_queue_t queue;
911  odp_schedule_group_t grp = num_group < 0 ? ODP_SCHED_GROUP_WORKER :
913  const int prio = i == HI_PRIO ? odp_schedule_max_prio() :
915 
916  param.sched.prio = prio;
917  param.size = queue_size[i];
918 
919  /* Replace XX and YY in name to differentiate queues */
920  name[6] = '0' + (prio / 10);
921  name[7] = '0' + prio - (10 * (prio / 10));
922 
923  for (int j = 0; j < args->prio[i].queues; j++) {
924  name[9] = '0' + j / 10;
925  name[10] = '0' + j - 10 * (j / 10);
926 
927  /* Round robin queues into groups */
928  if (num_group > 0)
929  grp = globals->group[i][j % num_group];
930 
931  param.sched.group = grp;
932 
933  queue = odp_queue_create(name, &param);
934 
935  if (queue == ODP_QUEUE_INVALID) {
936  ODPH_ERR("Scheduled queue create failed\n");
937  return -1;
938  }
939 
940  globals->queue[i][j] = queue;
941  }
942  if (args->forward_mode == EVENT_FORWARD_RAND) {
943  uint64_t seed = i;
944 
945  randomize_queues(globals->queue[i], args->prio[i].queues, &seed);
946  }
947  }
948  return 0;
949 }
950 
954 int main(int argc, char *argv[])
955 {
956  odp_instance_t instance;
957  odp_init_t init_param;
958  odph_helper_options_t helper_options;
959  odph_thread_common_param_t thr_common;
960  odph_thread_param_t thr_param;
961  odp_cpumask_t cpumask;
962  odp_pool_capability_t pool_capa;
963  odp_pool_param_t params;
964  test_globals_t *globals;
965  test_args_t args;
966  char cpumaskstr[ODP_CPUMASK_STR_SIZE];
967  uint32_t pool_size;
968  int i, j, ret;
969  int num_group, tot_group;
970  odp_schedule_group_t group[2 * MAX_GROUPS];
971  odph_thread_t thread_tbl[ODP_THREAD_COUNT_MAX];
972  int err = 0;
973  int num_workers = 0;
976  test_common_options_t common_options;
977 
978  printf("\nODP scheduling latency benchmark starts\n\n");
979 
980  /* Let helper collect its own arguments (e.g. --odph_proc) */
981  argc = odph_parse_options(argc, argv);
982  if (odph_options(&helper_options)) {
983  ODPH_ERR("Error: reading ODP helper options failed.\n");
984  exit(EXIT_FAILURE);
985  }
986 
987  argc = test_common_parse_options(argc, argv);
988  if (test_common_options(&common_options)) {
989  ODPH_ERR("Error: reading test options failed\n");
990  exit(EXIT_FAILURE);
991  }
992 
993  odp_init_param_init(&init_param);
994  init_param.mem_model = helper_options.mem_model;
995 
996  memset(&args, 0, sizeof(args));
997  parse_args(argc, argv, &args);
998 
999  /* ODP global init */
1000  if (odp_init_global(&instance, &init_param, NULL)) {
1001  ODPH_ERR("ODP global init failed.\n");
1002  exit(EXIT_FAILURE);
1003  }
1004 
1005  /*
1006  * Init this thread. It makes also ODP calls when
1007  * setting up resources for worker threads.
1008  */
1009  if (odp_init_local(instance, ODP_THREAD_CONTROL)) {
1010  ODPH_ERR("ODP global init failed.\n");
1011  exit(EXIT_FAILURE);
1012  }
1013 
1015 
1016  num_group = args.num_group;
1017 
1018  tot_group = 0;
1019  if (num_group > 0)
1020  tot_group = args.isolate ? 2 * num_group : num_group;
1021 
1022  /* Get default worker cpumask */
1023  if (args.cpu_count)
1024  num_workers = args.cpu_count;
1025 
1026  num_workers = odp_cpumask_default_worker(&cpumask, num_workers);
1027  args.cpu_count = num_workers;
1028 
1029  (void)odp_cpumask_to_str(&cpumask, cpumaskstr, sizeof(cpumaskstr));
1030 
1031  printf("Test options:\n");
1032  printf(" Worker threads: %i\n", num_workers);
1033  printf(" First CPU: %i\n", odp_cpumask_first(&cpumask));
1034  printf(" CPU mask: %s\n", cpumaskstr);
1035  printf(" Test rounds: %iM\n", args.test_rounds);
1036  printf(" Warm-up rounds: %i\n", args.warm_up_rounds);
1037  printf(" Isolated groups: %i\n", args.isolate);
1038  printf(" Number of groups: %i\n", num_group);
1039  printf(" Created groups: %i\n", tot_group);
1040  printf("\n");
1041 
1042  shm = odp_shm_reserve("test_globals", sizeof(test_globals_t), ODP_CACHE_LINE_SIZE, 0);
1043  if (shm == ODP_SHM_INVALID) {
1044  ODPH_ERR("Shared memory reserve failed.\n");
1045  err = -1;
1046  goto error;
1047  }
1048 
1049  globals = odp_shm_addr(shm);
1050  memset(globals, 0, sizeof(test_globals_t));
1051  memcpy(&globals->args, &args, sizeof(test_args_t));
1052 
1053  globals->common_options = common_options;
1054 
1055  odp_schedule_config(NULL);
1056 
1057  /*
1058  * Create event pool
1059  */
1060  if (odp_pool_capability(&pool_capa)) {
1061  ODPH_ERR("pool capa failed\n");
1062  err = -1;
1063  goto error;
1064  }
1065 
1066  pool_size = EVENT_POOL_SIZE;
1067  if (pool_capa.buf.max_num && pool_capa.buf.max_num < EVENT_POOL_SIZE)
1068  pool_size = pool_capa.buf.max_num;
1069 
1070  odp_pool_param_init(&params);
1071  params.buf.size = sizeof(test_event_t);
1072  params.buf.align = 0;
1073  params.buf.num = pool_size;
1074  params.type = ODP_POOL_BUFFER;
1075 
1076  pool = odp_pool_create("event_pool", &params);
1077 
1078  if (pool == ODP_POOL_INVALID) {
1079  ODPH_ERR("Pool create failed.\n");
1080  err = -1;
1081  goto error;
1082  }
1083  globals->pool = pool;
1084 
1085  /* Create groups */
1086  ret = create_groups(globals, group, tot_group);
1087  if (ret != tot_group) {
1088  ODPH_ERR("Group create failed.\n");
1089  tot_group = ret;
1090  err = -1;
1091  goto error;
1092  }
1093 
1094  if (create_queues(globals)) {
1095  ODPH_ERR("Creating test queues failed.\n");
1096  err = -1;
1097  goto error;
1098  }
1099 
1100  odp_barrier_init(&globals->barrier, num_workers);
1101 
1102  /* Create and launch worker threads */
1103  memset(thread_tbl, 0, sizeof(thread_tbl));
1104 
1105  odph_thread_common_param_init(&thr_common);
1106  thr_common.instance = instance;
1107  thr_common.cpumask = &cpumask;
1108  thr_common.share_param = 1;
1109 
1110  odph_thread_param_init(&thr_param);
1111  thr_param.start = run_thread;
1112  thr_param.arg = NULL;
1113  thr_param.thr_type = ODP_THREAD_WORKER;
1114 
1115  odph_thread_create(thread_tbl, &thr_common, &thr_param, num_workers);
1116 
1117  /* Wait for worker threads to terminate */
1118  if (odph_thread_join(thread_tbl, num_workers) != num_workers)
1119  err = -1;
1120 
1121  printf("ODP scheduling latency test complete\n\n");
1122 
1123  for (i = 0; i < NUM_PRIOS; i++) {
1124  odp_queue_t queue;
1125  int num_queues;
1126 
1127  num_queues = args.prio[i].queues;
1128 
1129  for (j = 0; j < num_queues; j++) {
1130  queue = globals->queue[i][j];
1131  if (odp_queue_destroy(queue)) {
1132  ODPH_ERR("Queue destroy failed [%i][%i]\n", i, j);
1133  err = -1;
1134  break;
1135  }
1136  }
1137  }
1138 
1139 error:
1140  if (destroy_groups(group, tot_group)) {
1141  ODPH_ERR("Group destroy failed\n");
1142  err = -1;
1143  }
1144 
1145  if (pool != ODP_POOL_INVALID) {
1146  if (odp_pool_destroy(pool)) {
1147  ODPH_ERR("Pool destroy failed\n");
1148  err = -1;
1149  }
1150  }
1151 
1152  if (shm != ODP_SHM_INVALID) {
1153  if (odp_shm_free(shm)) {
1154  ODPH_ERR("SHM destroy failed\n");
1155  err = -1;
1156  }
1157  }
1158 
1159  err += odp_term_local();
1160  err += odp_term_global(instance);
1161 
1162  return err;
1163 }
void odp_barrier_init(odp_barrier_t *barr, int count)
Initialize barrier with thread count.
void odp_barrier_wait(odp_barrier_t *barr)
Synchronize thread execution on barrier.
odp_event_t odp_buffer_to_event(odp_buffer_t buf)
Convert buffer handle to event.
odp_buffer_t odp_buffer_alloc(odp_pool_t pool)
Buffer alloc.
int odp_buffer_is_valid(odp_buffer_t buf)
Check that buffer is valid.
void * odp_buffer_addr(odp_buffer_t buf)
Buffer start address.
odp_buffer_t odp_buffer_from_event(odp_event_t ev)
Get buffer handle from event.
int odp_buffer_alloc_multi(odp_pool_t pool, odp_buffer_t buf[], int num)
Allocate multiple buffers.
#define ODP_BUFFER_INVALID
Invalid buffer.
void odp_buffer_free_multi(const odp_buffer_t buf[], int num)
Free multiple buffers.
#define ODP_ALIGNED_CACHE
Defines type/struct/variable to be cache line size aligned.
#define odp_unlikely(x)
Branch unlikely taken.
Definition: spec/hints.h:64
#define ODP_UNUSED
Intentionally unused variables of functions.
Definition: spec/hints.h:54
int odp_cpumask_default_worker(odp_cpumask_t *mask, int num)
Default CPU mask for worker threads.
int odp_cpumask_first(const odp_cpumask_t *mask)
Find first set CPU in mask.
int32_t odp_cpumask_to_str(const odp_cpumask_t *mask, char *str, int32_t size)
Format a string from CPU mask.
#define ODP_CPUMASK_STR_SIZE
The maximum number of characters needed to record any CPU mask as a string (output of odp_cpumask_to_...
void odp_event_free(odp_event_t event)
Free event.
#define ODP_EVENT_INVALID
Invalid event.
void odp_init_param_init(odp_init_t *param)
Initialize the odp_init_t to default values for all fields.
int odp_init_local(odp_instance_t instance, odp_thread_type_t thr_type)
Thread local ODP initialization.
int odp_init_global(odp_instance_t *instance, const odp_init_t *params, const odp_platform_init_t *platform_params)
Global ODP initialization.
int odp_term_local(void)
Thread local ODP termination.
int odp_term_global(odp_instance_t instance)
Global ODP termination.
uint64_t odp_instance_t
ODP instance ID.
odp_pool_t odp_pool_create(const char *name, const odp_pool_param_t *param)
Create a pool.
int odp_pool_capability(odp_pool_capability_t *capa)
Query pool capabilities.
void odp_pool_param_init(odp_pool_param_t *param)
Initialize pool params.
int odp_pool_destroy(odp_pool_t pool)
Destroy a pool previously created by odp_pool_create()
#define ODP_POOL_INVALID
Invalid pool.
@ ODP_POOL_BUFFER
Buffer pool.
int odp_queue_enq_multi(odp_queue_t queue, const odp_event_t events[], int num)
Enqueue multiple events to a queue.
void odp_queue_param_init(odp_queue_param_t *param)
Initialize queue params.
#define ODP_QUEUE_INVALID
Invalid queue.
int odp_queue_enq(odp_queue_t queue, odp_event_t ev)
Enqueue an event to a queue.
odp_queue_t odp_queue_create(const char *name, const odp_queue_param_t *param)
Queue create.
int odp_queue_destroy(odp_queue_t queue)
Destroy ODP queue.
uint64_t odp_queue_to_u64(odp_queue_t hdl)
Get printable value for an odp_queue_t.
@ ODP_QUEUE_TYPE_SCHED
Scheduled queue.
int32_t odp_random_test_data(uint8_t *buf, uint32_t len, uint64_t *seed)
Generate repeatable random data for testing purposes.
int odp_schedule_sync_t
Scheduler synchronization method.
#define ODP_SCHED_WAIT
Wait infinitely.
#define ODP_SCHED_SYNC_PARALLEL
Parallel scheduled queues.
int odp_schedule_group_t
Scheduler thread group.
int odp_schedule_group_join(odp_schedule_group_t group, const odp_thrmask_t *mask)
Join a schedule group.
#define ODP_SCHED_SYNC_ATOMIC
Atomic queue synchronization.
#define ODP_SCHED_SYNC_ORDERED
Ordered queue synchronization.
int odp_schedule_min_prio(void)
Minimum scheduling priority level.
#define ODP_SCHED_GROUP_WORKER
Group of all worker threads.
int odp_schedule_group_destroy(odp_schedule_group_t group)
Schedule group destroy.
#define ODP_SCHED_GROUP_INVALID
Invalid scheduler group.
#define ODP_SCHED_NO_WAIT
Do not wait.
void odp_schedule_pause(void)
Pause scheduling.
int odp_schedule_max_prio(void)
Maximum scheduling priority level.
int odp_schedule_config(const odp_schedule_config_t *config)
Global schedule configuration.
int odp_schedule_capability(odp_schedule_capability_t *capa)
Query scheduler capabilities.
odp_schedule_group_t odp_schedule_group_create(const char *name, const odp_thrmask_t *mask)
Schedule group create.
odp_event_t odp_schedule(odp_queue_t *from, uint64_t wait)
Schedule an event.
void odp_schedule_resume(void)
Resume scheduling.
#define ODP_SCHED_GROUP_ALL
Group of all threads.
odp_shm_t odp_shm_lookup(const char *name)
Lookup for a block of shared memory.
int odp_shm_free(odp_shm_t shm)
Free a contiguous block of shared memory.
#define ODP_SHM_INVALID
Invalid shared memory block.
void * odp_shm_addr(odp_shm_t shm)
Shared memory block address.
odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, uint32_t flags)
Reserve a contiguous block of shared memory.
bool odp_bool_t
Boolean type.
void odp_sys_info_print(void)
Print system info.
#define ODP_THREAD_COUNT_MAX
Maximum number of threads supported in build time.
void odp_thrmask_set(odp_thrmask_t *mask, int thr)
Add thread to mask.
int odp_thread_id(void)
Get thread identifier.
void odp_thrmask_zero(odp_thrmask_t *mask)
Clear entire thread mask.
@ ODP_THREAD_WORKER
Worker thread.
@ ODP_THREAD_CONTROL
Control thread.
uint64_t odp_time_to_ns(odp_time_t time)
Convert time to nanoseconds.
odp_time_t odp_time_global_strict(void)
Current global time (strict)
The OpenDataPlane API.
Global initialization parameters.
odp_mem_model_t mem_model
Application memory model.
struct odp_pool_capability_t::@121 buf
Buffer pool capabilities
uint32_t max_num
Maximum number of buffers of any size.
Pool parameters.
uint32_t num
Number of buffers in the pool.
uint32_t align
Minimum buffer alignment in bytes.
uint32_t size
Minimum buffer size in bytes.
odp_pool_type_t type
Pool type.
struct odp_pool_param_t::@125 buf
Parameters for buffer pools.
ODP Queue parameters.
odp_schedule_param_t sched
Scheduler parameters.
uint32_t size
Queue size.
odp_queue_type_t type
Queue type.
uint32_t max_groups
Maximum number of scheduling groups.
uint32_t max_queues
Maximum number of scheduled (ODP_BLOCKING) queues of the default size.
uint32_t max_queue_size
Maximum number of events a scheduled (ODP_BLOCKING) queue can store simultaneously.
odp_schedule_group_t group
Thread group.
odp_schedule_prio_t prio
Priority level.
odp_schedule_sync_t sync
Synchronization method.