API Reference Manual  1.46.0
odp_simple_pipeline.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2019 Nokia
3  */
4 
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <getopt.h>
19 #include <signal.h>
20 #include <unistd.h>
21 #include <inttypes.h>
22 
23 #include <odp_api.h>
24 #include <odp/helper/odph_api.h>
25 
26 #define POOL_PKT_NUM 8192
27 #define POOL_PKT_LEN 1536
28 #define MAX_PKT_BURST 32
29 /* Three threads required for RX, TX and statistics */
30 #define MAX_WORKERS (ODP_THREAD_COUNT_MAX - 3)
31 #define QUEUE_SIZE 1024
32 #define MAX_PKTIOS 2
33 #define DUMMY_HASH 1234567890
34 
35 /* Get rid of path in filename - only for unix-type paths using '/' */
36 #define NO_PATH(file_name) (strrchr((file_name), '/') ? \
37  strrchr((file_name), '/') + 1 : (file_name))
38 
39 /* Statistics */
40 typedef union ODP_ALIGNED_CACHE {
41  struct {
42  uint64_t pps; /* Packet per second */
43  uint64_t rx_cnt; /* RX packets */
44  uint64_t tx_cnt; /* TX packets */
45  uint64_t rx_drops; /* Dropped packets on RX */
46  uint64_t tx_drops; /* Dropped packets on TX */
47  } s;
48  uint8_t padding[ODP_CACHE_LINE_SIZE];
49 } stats_t;
50 
51 /* Thread specific data */
52 typedef struct thread_args_t {
53  odp_queue_t rx_queue;
54  odp_queue_t tx_queue;
55  stats_t stats;
56 } thread_args_t;
57 
58 /* Parsed command line application arguments */
59 typedef struct {
60  char **if_names; /* Array of pointers to interface names */
61  odph_ethaddr_t dst_addr; /* Destination MAC address */
62  int accuracy; /* Statistics print interval in seconds */
63  int extra_work; /* Add extra processing to worker stage */
64  int dst_change; /* Change destination eth address */
65  int src_change; /* Change source eth address */
66  int dst_set; /* Custom destination eth address given */
67  int time; /* Time in seconds to run. */
68  int num_workers; /* Number of pipeline worker stages */
69  char *if_str; /* Storage for interface names */
70 } appl_args_t;
71 
72 /* Global application data */
73 typedef struct {
75  /* Thread specific arguments */
76  thread_args_t thread[ODP_THREAD_COUNT_MAX];
77  /* Barriers to synchronize main and workers */
78  odp_barrier_t init_barrier;
79  odp_barrier_t term_barrier;
80  /* Pktio interfaces */
81  odp_pktio_t if0, if1;
82  odp_pktin_queue_t if0in, if1in;
83  odp_pktout_queue_t if0out, if1out;
84  odph_ethaddr_t src_addr; /* Source MAC address */
85  odph_ethaddr_t dst_addr; /* Destination MAC address */
86  odp_atomic_u32_t exit_threads;
87  /* Application (parsed) arguments */
88  appl_args_t appl;
89 } global_data_t;
90 
91 static global_data_t *global;
92 
93 static void sig_handler(int signo ODP_UNUSED)
94 {
95  odp_atomic_store_u32(&global->exit_threads, 1);
96 }
97 
98 static odp_pktio_t create_pktio(const char *name, odp_pool_t pool,
99  odp_pktin_queue_t *pktin,
100  odp_pktout_queue_t *pktout)
101 {
102  odp_pktio_param_t pktio_param;
103  odp_pktin_queue_param_t in_param;
104  odp_pktout_queue_param_t out_param;
105  odp_pktio_t pktio;
106  odp_pktio_config_t config;
107 
108  odp_pktio_param_init(&pktio_param);
109 
110  pktio = odp_pktio_open(name, pool, &pktio_param);
111  if (pktio == ODP_PKTIO_INVALID) {
112  printf("Error: failed to open %s\n", name);
113  exit(1);
114  }
115 
116  odp_pktio_config_init(&config);
118  odp_pktio_config(pktio, &config);
119 
120  odp_pktin_queue_param_init(&in_param);
121  odp_pktout_queue_param_init(&out_param);
122 
123  in_param.op_mode = ODP_PKTIO_OP_MT_UNSAFE;
124 
125  if (odp_pktin_queue_config(pktio, &in_param)) {
126  printf("Error: failed to config input queue for %s\n", name);
127  exit(1);
128  }
129 
130  out_param.op_mode = ODP_PKTIO_OP_MT_UNSAFE;
131 
132  if (odp_pktout_queue_config(pktio, &out_param)) {
133  printf("Error: failed to config output queue for %s\n", name);
134  exit(1);
135  }
136 
137  if (odp_pktin_queue(pktio, pktin, 1) != 1) {
138  printf("Error: pktin queue query failed for %s\n", name);
139  exit(1);
140  }
141  if (odp_pktout_queue(pktio, pktout, 1) != 1) {
142  printf("Error: pktout queue query failed for %s\n", name);
143  exit(1);
144  }
145  return pktio;
146 }
147 
148 /*
149  * Fill packets' eth addresses and convert packets to events
150  *
151  * pkt_tbl Array of packets
152  * event_tbl[out] Array of events
153  * num Number of packets in the array
154  */
155 static inline unsigned int prep_events(odp_packet_t pkt_tbl[],
156  odp_event_t event_tbl[],
157  unsigned int num)
158 {
159  unsigned int i;
160  unsigned int events = 0;
161 
162  if (!global->appl.dst_change && !global->appl.src_change) {
163  odp_packet_to_event_multi(pkt_tbl, event_tbl, num);
164  return num;
165  }
166 
167  for (i = 0; i < num; ++i) {
168  odp_packet_t pkt = pkt_tbl[i];
169  odph_ethhdr_t *eth;
170 
171  odp_packet_prefetch(pkt, 0, ODPH_ETHHDR_LEN);
172 
173  if (odp_unlikely(!odp_packet_has_eth(pkt))) {
174  odp_packet_free(pkt);
175  continue;
176  }
177 
178  eth = odp_packet_data(pkt);
179 
180  if (global->appl.src_change)
181  eth->src = global->src_addr;
182 
183  if (global->appl.dst_change)
184  eth->dst = global->dst_addr;
185 
186  event_tbl[events++] = odp_packet_to_event(pkt);
187  }
188  return events;
189 }
190 
191 static inline int rx_thread(void *arg)
192 {
193  thread_args_t *thr_args = arg;
194  odp_event_t event_tbl[MAX_PKT_BURST];
195  odp_packet_t pkt_tbl[MAX_PKT_BURST];
196  odp_pktin_queue_t pktin_queue = global->if0in;
197  odp_queue_t out_queue = thr_args->tx_queue;
198  stats_t *stats = &thr_args->stats;
199  int pkts, events, sent, drops;
200 
201  odp_barrier_wait(&global->init_barrier);
202 
203  while (!odp_atomic_load_u32(&global->exit_threads)) {
204  pkts = odp_pktin_recv(pktin_queue, pkt_tbl, MAX_PKT_BURST);
205  if (odp_unlikely(pkts <= 0))
206  continue;
207 
208  stats->s.rx_cnt += pkts;
209 
210  events = prep_events(pkt_tbl, event_tbl, pkts);
211  drops = events - pkts;
212  if (odp_unlikely(drops))
213  stats->s.rx_drops += pkts - events;
214 
215  sent = odp_queue_enq_multi(out_queue, event_tbl, events);
216  if (odp_unlikely(sent < 0))
217  sent = 0;
218 
219  stats->s.tx_cnt += sent;
220 
221  drops = events - sent;
222  if (odp_unlikely(drops)) {
223  stats->s.tx_drops += drops;
224  odp_packet_free_multi(&pkt_tbl[sent], drops);
225  }
226  }
227 
228  /* Wait until pktio devices are stopped */
229  odp_barrier_wait(&global->term_barrier);
230 
231  return 0;
232 }
233 
234 static inline int tx_thread(void *arg)
235 {
236  thread_args_t *thr_args = arg;
237  odp_event_t event_tbl[MAX_PKT_BURST];
238  odp_packet_t pkt_tbl[MAX_PKT_BURST];
239  odp_queue_t rx_queue = thr_args->rx_queue;
240  odp_pktout_queue_t pktout_queue = global->if1out;
241  stats_t *stats = &thr_args->stats;
242  int events, sent, tx_drops;
243 
244  odp_barrier_wait(&global->init_barrier);
245 
246  while (!odp_atomic_load_u32(&global->exit_threads)) {
247  events = odp_queue_deq_multi(rx_queue, event_tbl,
248  MAX_PKT_BURST);
249  if (odp_unlikely(events <= 0))
250  continue;
251 
252  stats->s.rx_cnt += events;
253 
254  odp_packet_from_event_multi(pkt_tbl, event_tbl, events);
255 
256  sent = odp_pktout_send(pktout_queue, pkt_tbl, events);
257  if (odp_unlikely(sent < 0))
258  sent = 0;
259 
260  stats->s.tx_cnt += sent;
261 
262  tx_drops = events - sent;
263  if (odp_unlikely(tx_drops)) {
264  stats->s.tx_drops += tx_drops;
265  odp_packet_free_multi(&pkt_tbl[sent], tx_drops);
266  }
267  }
268 
269  /* Wait until pktio devices are stopped */
270  odp_barrier_wait(&global->term_barrier);
271 
272  /* Empty queue before exiting */
273  events = 1;
274  while (events > 0) {
275  events = odp_queue_deq_multi(rx_queue, event_tbl,
276  MAX_PKT_BURST);
277 
278  if (events > 0)
279  odp_event_free_multi(event_tbl, events);
280  }
281 
282  return 0;
283 }
284 
285 /*
286  * Work on packets
287  */
288 static inline void work_on_events(odp_event_t event_tbl[], unsigned int num)
289 {
290  unsigned int i;
291 
292  for (i = 0; i < num; i++) {
293  odp_packet_t pkt = odp_packet_from_event(event_tbl[i]);
294 
296  odp_packet_seg_len(pkt), 123) == DUMMY_HASH)
297  printf("Dummy hash match\n");
298  }
299 }
300 
301 static inline int worker_thread(void *arg ODP_UNUSED)
302 {
303  thread_args_t *thr_args = arg;
304  odp_event_t event_tbl[MAX_PKT_BURST];
305  stats_t *stats = &thr_args->stats;
306  odp_queue_t rx_queue = thr_args->rx_queue;
307  odp_queue_t tx_queue = thr_args->tx_queue;
308  int events, sent, tx_drops;
309  int extra_work = global->appl.extra_work;
310 
311  odp_barrier_wait(&global->init_barrier);
312 
313  while (!odp_atomic_load_u32(&global->exit_threads)) {
314  events = odp_queue_deq_multi(rx_queue, event_tbl,
315  MAX_PKT_BURST);
316 
317  if (odp_unlikely(events <= 0))
318  continue;
319 
320  stats->s.rx_cnt += events;
321 
322  if (extra_work)
323  work_on_events(event_tbl, events);
324 
325  sent = odp_queue_enq_multi(tx_queue, event_tbl, events);
326  if (odp_unlikely(sent < 0))
327  sent = 0;
328 
329  stats->s.tx_cnt += sent;
330 
331  tx_drops = events - sent;
332  if (odp_unlikely(tx_drops)) {
333  stats->s.tx_drops += tx_drops;
334  odp_event_free_multi(&event_tbl[sent], tx_drops);
335  }
336  }
337 
338  /* Wait until pktio devices are stopped */
339  odp_barrier_wait(&global->term_barrier);
340 
341  /* Empty queue before exiting */
342  events = 1;
343  while (events > 0) {
344  events = odp_queue_deq_multi(rx_queue, event_tbl,
345  MAX_PKT_BURST);
346 
347  if (events > 0)
348  odp_event_free_multi(event_tbl, events);
349  }
350 
351  return 0;
352 }
353 
354 static int setup_thread_masks(odp_cpumask_t *thr_mask_rx,
355  odp_cpumask_t *thr_mask_tx,
356  odp_cpumask_t *thr_mask_workers,
357  int num_workers)
358 {
359  odp_cpumask_t cpumask;
360  int num_threads = 0;
361  int i, cpu;
362 
363  if (num_workers > MAX_WORKERS) {
364  printf("Worker count limited to MAX_WORKERS define (=%d)\n",
365  MAX_WORKERS);
366  num_workers = MAX_WORKERS;
367  }
368 
369  /* Two threads required for RX and TX*/
370  num_threads = num_workers + 2;
371 
372  num_workers = odp_cpumask_default_worker(&cpumask, num_threads);
373  if (num_workers != num_threads) {
374  printf("Error: Not enough available CPU cores: %d/%d\n",
375  num_workers, num_threads);
376  exit(1);
377  }
378 
379  odp_cpumask_zero(thr_mask_rx);
380  odp_cpumask_zero(thr_mask_tx);
381  odp_cpumask_zero(thr_mask_workers);
382 
383  cpu = odp_cpumask_first(&cpumask);
384  for (i = 0; i < num_threads; i++) {
385  if (i == 0)
386  odp_cpumask_set(thr_mask_rx, cpu);
387  else if (i == 1)
388  odp_cpumask_set(thr_mask_tx, cpu);
389  else
390  odp_cpumask_set(thr_mask_workers, cpu);
391  cpu = odp_cpumask_next(&cpumask, cpu);
392  }
393 
394  return num_threads;
395 }
396 
397 /*
398  * Print statistics
399  *
400  * num_workers Number of worker threads
401  * thr_stats Pointers to stats storage
402  * duration Number of seconds to loop in
403  * timeout Number of seconds for stats calculation
404  */
405 static int print_speed_stats(int num_workers, stats_t **thr_stats,
406  int duration, int timeout)
407 {
408  uint64_t total_pkts = 0;
409  uint64_t pkts_prev = 0;
410  uint64_t maximum_pps = 0;
411  stats_t thr_stats_prev[num_workers];
412  int i;
413  int elapsed = 0;
414  int stats_enabled = 1;
415  int loop_forever = (duration == 0);
416 
417  memset(thr_stats_prev, 0, sizeof(thr_stats_prev));
418 
419  if (timeout <= 0) {
420  stats_enabled = 0;
421  timeout = 1;
422  }
423 
424  /* Wait for all threads to be ready*/
425  odp_barrier_wait(&global->init_barrier);
426 
427  do {
428  uint64_t total_rx_drops = 0;
429  uint64_t total_tx_drops = 0;
430  uint64_t pps;
431 
432  sleep(timeout);
433 
434  for (i = 0; i < num_workers; i++) {
435  uint64_t rx_cnt = thr_stats[i]->s.rx_cnt;
436  uint64_t tx_cnt = thr_stats[i]->s.tx_cnt;
437  uint64_t rx_drops = thr_stats[i]->s.rx_drops;
438  uint64_t tx_drops = thr_stats[i]->s.tx_drops;
439 
440  /* Count only transmitted packets */
441  if (i == (num_workers - 1))
442  total_pkts = tx_cnt;
443 
444  total_rx_drops += rx_drops;
445  total_tx_drops += tx_drops;
446 
447  pps = (tx_cnt - thr_stats_prev[i].s.tx_cnt) / timeout;
448  thr_stats_prev[i].s.pps = pps;
449  thr_stats_prev[i].s.rx_cnt = rx_cnt;
450  thr_stats_prev[i].s.tx_cnt = tx_cnt;
451  thr_stats_prev[i].s.rx_drops = rx_drops;
452  thr_stats_prev[i].s.tx_drops = tx_drops;
453  }
454  if (stats_enabled) {
455  printf("----------------------------------------\n");
456  for (i = 0; i < num_workers; i++) {
457  if (i == 0)
458  printf("RX thread: ");
459  else if (i == (num_workers - 1))
460  printf("TX thread: ");
461  else
462  printf("Worker %d: ", i - 1);
463 
464  printf("%" PRIu64 " pps, "
465  "%" PRIu64 " rx drops, "
466  "%" PRIu64 " tx drops\n",
467  thr_stats_prev[i].s.pps,
468  thr_stats_prev[i].s.rx_drops,
469  thr_stats_prev[i].s.tx_drops);
470  }
471  pps = (total_pkts - pkts_prev) / timeout;
472  if (pps > maximum_pps)
473  maximum_pps = pps;
474  printf("TOTAL: %" PRIu64 " pps, "
475  "%" PRIu64 " rx drops, "
476  "%" PRIu64 " tx drops, "
477  "%" PRIu64 " max pps\n",
478  pps, total_rx_drops, total_tx_drops,
479  maximum_pps);
480 
481  pkts_prev = total_pkts;
482  }
483  elapsed += timeout;
484  } while (!odp_atomic_load_u32(&global->exit_threads) && (loop_forever ||
485  (elapsed < duration)));
486 
487  if (stats_enabled)
488  printf("TEST RESULT: %" PRIu64 " maximum packets per second.\n",
489  maximum_pps);
490 
491  return total_pkts > 0 ? 0 : -1;
492 }
493 
494 /*
495  * Print system and application info
496  */
497 static void print_info(char *progname, appl_args_t *appl_args)
498 {
500 
501  printf("Running ODP appl: \"%s\"\n"
502  "-----------------\n"
503  "Using IFs: %s %s\n"
504  "Worker stages: %d\n"
505  "Extra work: %d\n\n",
506  progname, appl_args->if_names[0], appl_args->if_names[1],
507  appl_args->num_workers, appl_args->extra_work);
508 
509  fflush(NULL);
510 }
511 
512 /*
513  * Print usage information
514  */
515 static void usage(char *progname)
516 {
517  printf("\n"
518  "OpenDataPlane simple pipeline example application.\n"
519  "\n"
520  "Usage: %s [options]\n"
521  "\n"
522  " E.g. %s -i eth0,eth1 -e -w 3\n\n"
523  " ---- ---- ---- ---- ----\n"
524  " | RX | -> | W1 | -> | W2 | -> | W3 | -> | TX |\n"
525  " ---- ---- ---- ---- ----\n\n"
526  " In the above example,\n"
527  " each application stage is executed by a separate CPU thread and the stages\n"
528  " are connected using plain queues. The RX stage receives packets from eth0 and\n"
529  " enqueues them to the first worker stage (W1). The workers stages calculate\n"
530  " CRC-32C over packet data. After the final worker stage (W3) has processed\n"
531  " packets they are enqueued to the TX stage, which transmits the packets out\n"
532  " from interface eth1.\n"
533  "\n"
534  "Mandatory OPTIONS:\n"
535  " -i, --interface <name> Two eth interfaces (comma-separated, no spaces)\n"
536  "\n"
537  "Optional OPTIONS:\n"
538  " -a, --accuracy <sec> Time in seconds get print statistics\n"
539  " (default is 10 seconds).\n"
540  " -d, --dst_change <arg> 0: Don't change packets' dst eth addresses\n"
541  " 1: Change packets' dst eth addresses (default)\n"
542  " -s, --src_change <arg> 0: Don't change packets' src eth addresses\n"
543  " 1: Change packets' src eth addresses (default)\n"
544  " -r, --dst_addr <addr> Destination address\n"
545  " Requires also the -d flag to be set\n"
546  " -t, --time <sec> Time in seconds to run\n"
547  " -w, --workers <num> Number of worker stages (default 0)\n"
548  " -e, --extra-work Calculate CRC-32C over packet data in worker stage\n"
549  " -h, --help Display help and exit\n\n"
550  "\n", NO_PATH(progname), NO_PATH(progname)
551  );
552 }
553 
554 /*
555  * Parse and store the command line arguments
556  *
557  * argc Argument count
558  * argv Argument vector
559  * appl_args[out] Storage for application arguments
560  */
561 static void parse_args(int argc, char *argv[], appl_args_t *appl_args)
562 {
563  char *token;
564  size_t len;
565  int opt;
566  int i;
567  int if_count = 0;
568  static const struct option longopts[] = {
569  {"accuracy", required_argument, NULL, 'a'},
570  {"extra-work", no_argument, NULL, 'e'},
571  {"dst_addr", required_argument, NULL, 'r'},
572  {"dst_change", required_argument, NULL, 'd'},
573  {"src_change", required_argument, NULL, 's'},
574  {"interface", required_argument, NULL, 'i'},
575  {"time", required_argument, NULL, 't'},
576  {"workers", required_argument, NULL, 'w'},
577  {"help", no_argument, NULL, 'h'},
578  {NULL, 0, NULL, 0}
579  };
580 
581  static const char *shortopts = "+a:d:er:s:t:i:w:h";
582 
583  appl_args->accuracy = 10; /* get and print pps stats second */
584  appl_args->dst_change = 1; /* change eth dst address by default */
585  appl_args->src_change = 1; /* change eth src address by default */
586  appl_args->time = 0; /* loop forever if time to run is 0 */
587  appl_args->extra_work = 0;
588 
589  while (1) {
590  opt = getopt_long(argc, argv, shortopts, longopts, NULL);
591 
592  if (opt == -1)
593  break; /* No more options */
594 
595  switch (opt) {
596  case 'a':
597  appl_args->accuracy = atoi(optarg);
598  break;
599  case 'd':
600  appl_args->dst_change = atoi(optarg);
601  break;
602  case 'e':
603  appl_args->extra_work = 1;
604  break;
605  case 'r':
606  len = strlen(optarg);
607  if (len == 0) {
608  usage(argv[0]);
609  exit(EXIT_FAILURE);
610  }
611  len += 1; /* add room for '\0' */
612 
613  if (odph_eth_addr_parse(&appl_args->dst_addr,
614  optarg) != 0) {
615  printf("invalid MAC address\n");
616  usage(argv[0]);
617  exit(EXIT_FAILURE);
618  }
619 
620  appl_args->dst_set = 1;
621 
622  break;
623  case 's':
624  appl_args->src_change = atoi(optarg);
625  break;
626  case 't':
627  appl_args->time = atoi(optarg);
628  break;
629  case 'i':
630  len = strlen(optarg);
631  if (len == 0) {
632  usage(argv[0]);
633  exit(EXIT_FAILURE);
634  }
635  len += 1; /* add room for '\0' */
636 
637  appl_args->if_str = malloc(len);
638  if (appl_args->if_str == NULL) {
639  usage(argv[0]);
640  exit(EXIT_FAILURE);
641  }
642 
643  /* count the number of tokens separated by ',' */
644  strcpy(appl_args->if_str, optarg);
645  for (token = strtok(appl_args->if_str, ","), i = 0;
646  token != NULL;
647  token = strtok(NULL, ","), i++)
648  ;
649 
650  if_count = i;
651 
652  if (if_count != 2) {
653  usage(argv[0]);
654  exit(EXIT_FAILURE);
655  }
656 
657  /* allocate storage for the if names */
658  appl_args->if_names = calloc(if_count, sizeof(char *));
659 
660  /* store the if names (reset names string) */
661  strcpy(appl_args->if_str, optarg);
662  for (token = strtok(appl_args->if_str, ","), i = 0;
663  token != NULL; token = strtok(NULL, ","), i++) {
664  appl_args->if_names[i] = token;
665  }
666  break;
667  case 'w':
668  appl_args->num_workers = atoi(optarg);
669  break;
670  case 'h':
671  usage(argv[0]);
672  exit(EXIT_SUCCESS);
673  break;
674  default:
675  break;
676  }
677  }
678 
679  if (if_count != 2) {
680  usage(argv[0]);
681  exit(EXIT_FAILURE);
682  }
683 
684  optind = 1; /* reset 'extern optind' from the getopt lib */
685 }
686 
687 int main(int argc, char **argv)
688 {
689  odp_cpumask_t thr_mask_rx;
690  odp_cpumask_t thr_mask_tx;
691  odp_cpumask_t thr_mask_worker;
692  odp_init_t init_param;
693  odp_instance_t instance;
694  odp_pool_t pool;
695  odp_pool_capability_t pool_capa;
696  odp_pool_param_t pool_param;
697  odp_queue_capability_t queue_capa;
698  odp_queue_param_t queue_param;
699  odp_shm_t shm;
700  odph_helper_options_t helper_options;
701  odph_thread_t thr_tbl[ODP_THREAD_COUNT_MAX];
702  odph_thread_param_t thr_param[ODP_THREAD_COUNT_MAX];
703  odph_thread_common_param_t thr_common;
704  odph_ethaddr_t new_addr;
705  stats_t *stats[ODP_THREAD_COUNT_MAX];
706  thread_args_t *thr_args;
707  uint32_t pkt_len, seg_len, pkt_num;
708  int num_threads, num_workers;
709  int i;
710  int ret;
711 
712  /* Let helper collect its own arguments (e.g. --odph_proc) */
713  argc = odph_parse_options(argc, argv);
714  if (odph_options(&helper_options)) {
715  printf("Error: reading ODP helper options failed.\n");
716  exit(EXIT_FAILURE);
717  }
718 
719  odp_init_param_init(&init_param);
720  init_param.mem_model = helper_options.mem_model;
721 
722  if (odp_init_global(&instance, &init_param, NULL)) {
723  printf("Error: ODP global init failed.\n");
724  exit(1);
725  }
726 
727  if (odp_init_local(instance, ODP_THREAD_CONTROL)) {
728  printf("Error: ODP local init failed.\n");
729  exit(1);
730  }
731 
732  /* Reserve memory for global data */
733  shm = odp_shm_reserve("simple_pipeline", sizeof(global_data_t),
734  ODP_CACHE_LINE_SIZE, 0);
735  if (shm == ODP_SHM_INVALID) {
736  printf("Error: shared mem reserve failed.\n");
737  exit(EXIT_FAILURE);
738  }
739 
740  global = odp_shm_addr(shm);
741  if (global == NULL) {
742  printf("Error: shared mem alloc failed.\n");
743  exit(EXIT_FAILURE);
744  }
745 
746  memset(global, 0, sizeof(global_data_t));
747  odp_atomic_init_u32(&global->exit_threads, 0);
748 
749  signal(SIGINT, sig_handler);
750 
751  /* Parse and store the application arguments */
752  parse_args(argc, argv, &global->appl);
753 
754  num_threads = setup_thread_masks(&thr_mask_rx, &thr_mask_tx,
755  &thr_mask_worker,
756  global->appl.num_workers);
757  num_workers = num_threads - 2;
758 
759  /* Print both system and application information */
760  print_info(NO_PATH(argv[0]), &global->appl);
761 
762  /* Create queues for pipeline */
763  if (odp_queue_capability(&queue_capa)) {
764  printf("Error: reading queue capability failed.\n");
765  exit(EXIT_FAILURE);
766  }
767  if (queue_capa.plain.max_num < (unsigned int)num_threads) {
768  printf("Error: insufficient number of queues supported.\n");
769  exit(EXIT_FAILURE);
770  }
771 
772  odp_queue_param_init(&queue_param);
773  queue_param.type = ODP_QUEUE_TYPE_PLAIN;
774  queue_param.enq_mode = ODP_QUEUE_OP_MT_UNSAFE;
775  queue_param.deq_mode = ODP_QUEUE_OP_MT_UNSAFE;
776  queue_param.size = QUEUE_SIZE;
777  if (queue_capa.plain.max_size &&
778  queue_param.size > queue_capa.plain.max_size)
779  queue_param.size = queue_capa.plain.max_size;
780  for (i = 0; i < num_threads; i++) {
781  odp_queue_t queue = odp_queue_create("plain_queue",
782  &queue_param);
783 
784  if (queue == ODP_QUEUE_INVALID) {
785  printf("Error: queue create failed.\n");
786  exit(EXIT_FAILURE);
787  }
788  global->queue[i] = queue;
789  }
790 
791  /* Create packet pool */
792  if (odp_pool_capability(&pool_capa)) {
793  printf("Error: reading pool capability failed.\n");
794  exit(EXIT_FAILURE);
795  }
796 
797  pkt_len = POOL_PKT_LEN;
798  seg_len = POOL_PKT_LEN;
799  pkt_num = POOL_PKT_NUM;
800 
801  if (pool_capa.pkt.max_len && pkt_len > pool_capa.pkt.max_len)
802  pkt_len = pool_capa.pkt.max_len;
803 
804  if (pool_capa.pkt.max_seg_len && seg_len > pool_capa.pkt.max_seg_len)
805  seg_len = pool_capa.pkt.max_seg_len;
806 
807  if (pool_capa.pkt.max_num && pkt_num > pool_capa.pkt.max_num)
808  pkt_num = pool_capa.pkt.max_num;
809 
810  odp_pool_param_init(&pool_param);
811  pool_param.pkt.seg_len = seg_len;
812  pool_param.pkt.len = pkt_len;
813  pool_param.pkt.num = pkt_num;
814  pool_param.type = ODP_POOL_PACKET;
815 
816  pool = odp_pool_create("packet pool", &pool_param);
817  if (pool == ODP_POOL_INVALID) {
818  printf("Error: packet pool create failed.\n");
819  exit(1);
820  }
821 
822  global->if0 = create_pktio(global->appl.if_names[0], pool,
823  &global->if0in, &global->if0out);
824  global->if1 = create_pktio(global->appl.if_names[1], pool,
825  &global->if1in, &global->if1out);
826 
827  /* Save TX interface Ethernet address */
828  if (odp_pktio_mac_addr(global->if1, global->src_addr.addr,
829  ODPH_ETHADDR_LEN) != ODPH_ETHADDR_LEN) {
830  printf("Error: TX interface Ethernet address unknown\n");
831  exit(EXIT_FAILURE);
832  }
833 
834  /* Save destination Ethernet address */
835  if (global->appl.dst_change) {
836  /* 02:00:00:00:00:XX */
837  memset(&new_addr, 0, sizeof(odph_ethaddr_t));
838  if (global->appl.dst_set) {
839  memcpy(&new_addr, &global->appl.dst_addr,
840  sizeof(odph_ethaddr_t));
841  } else {
842  new_addr.addr[0] = 0x02;
843  new_addr.addr[5] = 1;
844  }
845  global->dst_addr = new_addr;
846  }
847 
848  if (odp_pktio_start(global->if0)) {
849  printf("Error: unable to start input interface\n");
850  exit(1);
851  }
852  if (odp_pktio_start(global->if1)) {
853  printf("Error: unable to start output interface\n");
854  exit(1);
855  }
856 
857  odp_barrier_init(&global->init_barrier, num_threads + 1);
858  odp_barrier_init(&global->term_barrier, num_threads + 1);
859 
860  for (i = 0; i < num_threads; i++)
861  stats[i] = &global->thread[i].stats;
862 
863  memset(thr_tbl, 0, sizeof(thr_tbl));
864  odph_thread_common_param_init(&thr_common);
865 
866  thr_common.instance = instance;
867 
868  /* RX thread */
869  thr_args = &global->thread[0];
870  thr_args->tx_queue = global->queue[0];
871  odph_thread_param_init(&thr_param[0]);
872  thr_param[0].start = rx_thread;
873  thr_param[0].arg = thr_args;
874  thr_param[0].thr_type = ODP_THREAD_WORKER;
875  thr_common.cpumask = &thr_mask_rx;
876  odph_thread_create(thr_tbl, &thr_common, thr_param, 1);
877 
878  /* Worker threads */
879  for (i = 0; i < num_workers; i++) {
880  thr_args = &global->thread[i + 1];
881  thr_args->rx_queue = global->queue[i];
882  thr_args->tx_queue = global->queue[i + 1];
883 
884  odph_thread_param_init(&thr_param[i]);
885  thr_param[i].start = worker_thread;
886  thr_param[i].arg = thr_args;
887  thr_param[i].thr_type = ODP_THREAD_WORKER;
888  }
889 
890  if (num_workers) {
891  thr_common.cpumask = &thr_mask_worker;
892  odph_thread_create(&thr_tbl[1], &thr_common, thr_param,
893  num_workers);
894  }
895 
896  /* TX thread */
897  thr_args = &global->thread[num_threads - 1];
898  thr_args->rx_queue = global->queue[num_workers];
899  odph_thread_param_init(&thr_param[0]);
900  thr_param[0].start = tx_thread;
901  thr_param[0].arg = thr_args;
902  thr_param[0].thr_type = ODP_THREAD_WORKER;
903  thr_common.cpumask = &thr_mask_tx;
904  odph_thread_create(&thr_tbl[num_threads - 1], &thr_common, thr_param,
905  1);
906 
907  ret = print_speed_stats(num_threads, stats, global->appl.time,
908  global->appl.accuracy);
909 
910  if (odp_pktio_stop(global->if0)) {
911  printf("Error: failed to stop interface %s\n", argv[1]);
912  exit(EXIT_FAILURE);
913  }
914  if (odp_pktio_stop(global->if1)) {
915  printf("Error: failed to stop interface %s\n", argv[2]);
916  exit(EXIT_FAILURE);
917  }
918 
919  odp_atomic_store_u32(&global->exit_threads, 1);
920  odp_barrier_wait(&global->term_barrier);
921 
922  odph_thread_join(thr_tbl, num_threads);
923 
924  free(global->appl.if_names);
925  free(global->appl.if_str);
926 
927  if (odp_pktio_close(global->if0)) {
928  printf("Error: failed to close interface %s\n", argv[1]);
929  exit(EXIT_FAILURE);
930  }
931  if (odp_pktio_close(global->if1)) {
932  printf("Error: failed to close interface %s\n", argv[2]);
933  exit(EXIT_FAILURE);
934  }
935 
936  for (i = 0; i < num_threads; i++) {
937  if (odp_queue_destroy(global->queue[i])) {
938  printf("Error: failed to destroy queue %d\n", i);
939  exit(EXIT_FAILURE);
940  }
941  }
942 
943  if (odp_pool_destroy(pool)) {
944  printf("Error: pool destroy\n");
945  exit(EXIT_FAILURE);
946  }
947 
948  if (odp_shm_free(shm)) {
949  printf("Error: shm free global data\n");
950  exit(EXIT_FAILURE);
951  }
952 
953  if (odp_term_local()) {
954  printf("Error: term local\n");
955  exit(EXIT_FAILURE);
956  }
957 
958  if (odp_term_global(instance)) {
959  printf("Error: term global\n");
960  exit(EXIT_FAILURE);
961  }
962 
963  return ret;
964 }
void odp_atomic_init_u32(odp_atomic_u32_t *atom, uint32_t val)
Initialize atomic uint32 variable.
uint32_t odp_atomic_load_u32(odp_atomic_u32_t *atom)
Load value of atomic uint32 variable.
void odp_atomic_store_u32(odp_atomic_u32_t *atom, uint32_t val)
Store value to atomic uint32 variable.
void odp_barrier_init(odp_barrier_t *barr, int count)
Initialize barrier with thread count.
void odp_barrier_wait(odp_barrier_t *barr)
Synchronize thread execution on barrier.
#define ODP_ALIGNED_CACHE
Defines type/struct/variable to be cache line size aligned.
#define odp_unlikely(x)
Branch unlikely taken.
Definition: spec/hints.h:64
#define ODP_UNUSED
Intentionally unused variables of functions.
Definition: spec/hints.h:54
void odp_cpumask_set(odp_cpumask_t *mask, int cpu)
Add CPU to mask.
int odp_cpumask_default_worker(odp_cpumask_t *mask, int num)
Default CPU mask for worker threads.
int odp_cpumask_first(const odp_cpumask_t *mask)
Find first set CPU in mask.
int odp_cpumask_next(const odp_cpumask_t *mask, int cpu)
Find next set CPU in mask.
void odp_cpumask_zero(odp_cpumask_t *mask)
Clear entire CPU mask.
void odp_event_free_multi(const odp_event_t event[], int num)
Free multiple events.
uint32_t odp_hash_crc32c(const void *data, uint32_t data_len, uint32_t init_val)
Calculate CRC-32C.
void odp_init_param_init(odp_init_t *param)
Initialize the odp_init_t to default values for all fields.
int odp_init_local(odp_instance_t instance, odp_thread_type_t thr_type)
Thread local ODP initialization.
int odp_init_global(odp_instance_t *instance, const odp_init_t *params, const odp_platform_init_t *platform_params)
Global ODP initialization.
int odp_term_local(void)
Thread local ODP termination.
int odp_term_global(odp_instance_t instance)
Global ODP termination.
uint64_t odp_instance_t
ODP instance ID.
int odp_pktio_mac_addr(odp_pktio_t pktio, void *mac_addr, int size)
Get the default MAC address of a packet IO interface.
void odp_pktin_queue_param_init(odp_pktin_queue_param_t *param)
Initialize packet input queue parameters.
void odp_pktio_param_init(odp_pktio_param_t *param)
Initialize pktio params.
int odp_pktio_close(odp_pktio_t pktio)
Close a packet IO interface.
int odp_pktout_queue(odp_pktio_t pktio, odp_pktout_queue_t queues[], int num)
Direct packet output queues.
void odp_pktio_config_init(odp_pktio_config_t *config)
Initialize packet IO configuration options.
odp_pktio_t odp_pktio_open(const char *name, odp_pool_t pool, const odp_pktio_param_t *param)
Open a packet IO interface.
int odp_pktio_config(odp_pktio_t pktio, const odp_pktio_config_t *config)
Configure packet IO interface options.
int odp_pktio_start(odp_pktio_t pktio)
Start packet receive and transmit.
#define ODP_PKTIO_INVALID
Invalid packet IO handle.
int odp_pktin_queue(odp_pktio_t pktio, odp_pktin_queue_t queues[], int num)
Direct packet input queues.
void odp_pktout_queue_param_init(odp_pktout_queue_param_t *param)
Initialize packet output queue parameters.
int odp_pktio_stop(odp_pktio_t pktio)
Stop packet receive and transmit.
int odp_pktin_recv(odp_pktin_queue_t queue, odp_packet_t packets[], int num)
Receive packets directly from an interface input queue.
int odp_pktout_send(odp_pktout_queue_t queue, const odp_packet_t packets[], int num)
Send packets directly to an interface output queue.
int odp_pktin_queue_config(odp_pktio_t pktio, const odp_pktin_queue_param_t *param)
Configure packet input queues.
int odp_pktout_queue_config(odp_pktio_t pktio, const odp_pktout_queue_param_t *param)
Configure packet output queues.
@ ODP_PKTIO_OP_MT_UNSAFE
Not multithread safe operation.
void odp_packet_from_event_multi(odp_packet_t pkt[], const odp_event_t ev[], int num)
Convert multiple packet events to packet handles.
odp_event_t odp_packet_to_event(odp_packet_t pkt)
Convert packet handle to event.
void odp_packet_to_event_multi(const odp_packet_t pkt[], odp_event_t ev[], int num)
Convert multiple packet handles to events.
uint32_t odp_packet_seg_len(odp_packet_t pkt)
Packet data length following the data pointer.
void odp_packet_prefetch(odp_packet_t pkt, uint32_t offset, uint32_t len)
Packet data prefetch.
void * odp_packet_data(odp_packet_t pkt)
Packet data pointer.
int odp_packet_has_eth(odp_packet_t pkt)
Check for Ethernet header.
odp_packet_t odp_packet_from_event(odp_event_t ev)
Get packet handle from event.
void odp_packet_free(odp_packet_t pkt)
Free packet.
void odp_packet_free_multi(const odp_packet_t pkt[], int num)
Free multiple packets.
@ ODP_PROTO_LAYER_L2
Layer L2 protocols (Ethernet, VLAN, etc)
odp_pool_t odp_pool_create(const char *name, const odp_pool_param_t *param)
Create a pool.
int odp_pool_capability(odp_pool_capability_t *capa)
Query pool capabilities.
void odp_pool_param_init(odp_pool_param_t *param)
Initialize pool params.
int odp_pool_destroy(odp_pool_t pool)
Destroy a pool previously created by odp_pool_create()
#define ODP_POOL_INVALID
Invalid pool.
@ ODP_POOL_PACKET
Packet pool.
int odp_queue_enq_multi(odp_queue_t queue, const odp_event_t events[], int num)
Enqueue multiple events to a queue.
void odp_queue_param_init(odp_queue_param_t *param)
Initialize queue params.
int odp_queue_capability(odp_queue_capability_t *capa)
Query queue capabilities.
#define ODP_QUEUE_INVALID
Invalid queue.
odp_queue_t odp_queue_create(const char *name, const odp_queue_param_t *param)
Queue create.
int odp_queue_destroy(odp_queue_t queue)
Destroy ODP queue.
int odp_queue_deq_multi(odp_queue_t queue, odp_event_t events[], int num)
Dequeue multiple events from a queue.
@ ODP_QUEUE_TYPE_PLAIN
Plain queue.
@ ODP_QUEUE_OP_MT_UNSAFE
Not multithread safe operation.
int odp_shm_free(odp_shm_t shm)
Free a contiguous block of shared memory.
#define ODP_SHM_INVALID
Invalid shared memory block.
void * odp_shm_addr(odp_shm_t shm)
Shared memory block address.
odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, uint32_t flags)
Reserve a contiguous block of shared memory.
void odp_sys_info_print(void)
Print system info.
#define ODP_THREAD_COUNT_MAX
Maximum number of threads supported in build time.
@ ODP_THREAD_WORKER
Worker thread.
@ ODP_THREAD_CONTROL
Control thread.
The OpenDataPlane API.
Global initialization parameters.
odp_mem_model_t mem_model
Application memory model.
Packet input queue parameters.
odp_pktio_op_mode_t op_mode
Operation mode.
Packet IO configuration options.
odp_pktio_parser_config_t parser
Packet input parser configuration.
Packet IO parameters.
odp_proto_layer_t layer
Protocol parsing level in packet input.
Packet output queue parameters.
odp_pktio_op_mode_t op_mode
Operation mode.
struct odp_pool_capability_t::@122 pkt
Packet pool capabilities
uint32_t max_num
Maximum number of buffers of any size.
uint32_t max_seg_len
Maximum packet segment data length in bytes.
uint32_t max_len
Maximum packet data length in bytes.
Pool parameters.
uint32_t num
Number of buffers in the pool.
odp_pool_type_t type
Pool type.
uint32_t len
Minimum length of 'num' packets.
uint32_t seg_len
Minimum number of packet data bytes that can be stored in the first segment of a newly allocated pack...
struct odp_pool_param_t::@126 pkt
Parameters for packet pools.
uint32_t max_size
Maximum number of events a plain (ODP_BLOCKING) queue can store simultaneously.
uint32_t max_num
Maximum number of plain (ODP_BLOCKING) queues of the default size.
struct odp_queue_capability_t::@140 plain
Plain queue capabilities.
ODP Queue parameters.
odp_queue_op_mode_t enq_mode
Enqueue mode.
uint32_t size
Queue size.
odp_queue_type_t type
Queue type.
odp_queue_op_mode_t deq_mode
Dequeue mode.