24 #include <odp/helper/odph_api.h>
26 #define POOL_PKT_NUM 8192
27 #define POOL_PKT_LEN 1536
28 #define MAX_PKT_BURST 32
30 #define MAX_WORKERS (ODP_THREAD_COUNT_MAX - 3)
31 #define QUEUE_SIZE 1024
33 #define DUMMY_HASH 1234567890
36 #define NO_PATH(file_name) (strrchr((file_name), '/') ? \
37 strrchr((file_name), '/') + 1 : (file_name))
48 uint8_t padding[ODP_CACHE_LINE_SIZE];
52 typedef struct thread_args_t {
61 odph_ethaddr_t dst_addr;
84 odph_ethaddr_t src_addr;
85 odph_ethaddr_t dst_addr;
91 static global_data_t *global;
112 printf(
"Error: failed to open %s\n", name);
126 printf(
"Error: failed to config input queue for %s\n", name);
133 printf(
"Error: failed to config output queue for %s\n", name);
138 printf(
"Error: pktin queue query failed for %s\n", name);
142 printf(
"Error: pktout queue query failed for %s\n", name);
155 static inline unsigned int prep_events(
odp_packet_t pkt_tbl[],
160 unsigned int events = 0;
162 if (!global->appl.dst_change && !global->appl.src_change) {
167 for (i = 0; i < num; ++i) {
180 if (global->appl.src_change)
181 eth->src = global->src_addr;
183 if (global->appl.dst_change)
184 eth->dst = global->dst_addr;
191 static inline int rx_thread(
void *arg)
193 thread_args_t *thr_args = arg;
198 stats_t *stats = &thr_args->stats;
199 int pkts, events, sent, drops;
208 stats->s.rx_cnt += pkts;
210 events = prep_events(pkt_tbl, event_tbl, pkts);
211 drops = events - pkts;
213 stats->s.rx_drops += pkts - events;
219 stats->s.tx_cnt += sent;
221 drops = events - sent;
223 stats->s.tx_drops += drops;
234 static inline int tx_thread(
void *arg)
236 thread_args_t *thr_args = arg;
241 stats_t *stats = &thr_args->stats;
242 int events, sent, tx_drops;
252 stats->s.rx_cnt += events;
260 stats->s.tx_cnt += sent;
262 tx_drops = events - sent;
264 stats->s.tx_drops += tx_drops;
288 static inline void work_on_events(
odp_event_t event_tbl[],
unsigned int num)
292 for (i = 0; i < num; i++) {
297 printf(
"Dummy hash match\n");
301 static inline int worker_thread(
void *arg
ODP_UNUSED)
303 thread_args_t *thr_args = arg;
305 stats_t *stats = &thr_args->stats;
308 int events, sent, tx_drops;
309 int extra_work = global->appl.extra_work;
320 stats->s.rx_cnt += events;
323 work_on_events(event_tbl, events);
329 stats->s.tx_cnt += sent;
331 tx_drops = events - sent;
333 stats->s.tx_drops += tx_drops;
363 if (num_workers > MAX_WORKERS) {
364 printf(
"Worker count limited to MAX_WORKERS define (=%d)\n",
366 num_workers = MAX_WORKERS;
370 num_threads = num_workers + 2;
373 if (num_workers != num_threads) {
374 printf(
"Error: Not enough available CPU cores: %d/%d\n",
375 num_workers, num_threads);
384 for (i = 0; i < num_threads; i++) {
405 static int print_speed_stats(
int num_workers, stats_t **thr_stats,
406 int duration,
int timeout)
408 uint64_t total_pkts = 0;
409 uint64_t pkts_prev = 0;
410 uint64_t maximum_pps = 0;
411 stats_t thr_stats_prev[num_workers];
414 int stats_enabled = 1;
415 int loop_forever = (duration == 0);
417 memset(thr_stats_prev, 0,
sizeof(thr_stats_prev));
428 uint64_t total_rx_drops = 0;
429 uint64_t total_tx_drops = 0;
434 for (i = 0; i < num_workers; i++) {
435 uint64_t rx_cnt = thr_stats[i]->s.rx_cnt;
436 uint64_t tx_cnt = thr_stats[i]->s.tx_cnt;
437 uint64_t rx_drops = thr_stats[i]->s.rx_drops;
438 uint64_t tx_drops = thr_stats[i]->s.tx_drops;
441 if (i == (num_workers - 1))
444 total_rx_drops += rx_drops;
445 total_tx_drops += tx_drops;
447 pps = (tx_cnt - thr_stats_prev[i].s.tx_cnt) / timeout;
448 thr_stats_prev[i].s.pps = pps;
449 thr_stats_prev[i].s.rx_cnt = rx_cnt;
450 thr_stats_prev[i].s.tx_cnt = tx_cnt;
451 thr_stats_prev[i].s.rx_drops = rx_drops;
452 thr_stats_prev[i].s.tx_drops = tx_drops;
455 printf(
"----------------------------------------\n");
456 for (i = 0; i < num_workers; i++) {
458 printf(
"RX thread: ");
459 else if (i == (num_workers - 1))
460 printf(
"TX thread: ");
462 printf(
"Worker %d: ", i - 1);
464 printf(
"%" PRIu64
" pps, "
465 "%" PRIu64
" rx drops, "
466 "%" PRIu64
" tx drops\n",
467 thr_stats_prev[i].s.pps,
468 thr_stats_prev[i].s.rx_drops,
469 thr_stats_prev[i].s.tx_drops);
471 pps = (total_pkts - pkts_prev) / timeout;
472 if (pps > maximum_pps)
474 printf(
"TOTAL: %" PRIu64
" pps, "
475 "%" PRIu64
" rx drops, "
476 "%" PRIu64
" tx drops, "
477 "%" PRIu64
" max pps\n",
478 pps, total_rx_drops, total_tx_drops,
481 pkts_prev = total_pkts;
485 (elapsed < duration)));
488 printf(
"TEST RESULT: %" PRIu64
" maximum packets per second.\n",
491 return total_pkts > 0 ? 0 : -1;
497 static void print_info(
char *progname, appl_args_t *appl_args)
501 printf(
"Running ODP appl: \"%s\"\n"
502 "-----------------\n"
504 "Worker stages: %d\n"
505 "Extra work: %d\n\n",
506 progname, appl_args->if_names[0], appl_args->if_names[1],
507 appl_args->num_workers, appl_args->extra_work);
515 static void usage(
char *progname)
518 "OpenDataPlane simple pipeline example application.\n"
520 "Usage: %s [options]\n"
522 " E.g. %s -i eth0,eth1 -e -w 3\n\n"
523 " ---- ---- ---- ---- ----\n"
524 " | RX | -> | W1 | -> | W2 | -> | W3 | -> | TX |\n"
525 " ---- ---- ---- ---- ----\n\n"
526 " In the above example,\n"
527 " each application stage is executed by a separate CPU thread and the stages\n"
528 " are connected using plain queues. The RX stage receives packets from eth0 and\n"
529 " enqueues them to the first worker stage (W1). The workers stages calculate\n"
530 " CRC-32C over packet data. After the final worker stage (W3) has processed\n"
531 " packets they are enqueued to the TX stage, which transmits the packets out\n"
532 " from interface eth1.\n"
534 "Mandatory OPTIONS:\n"
535 " -i, --interface <name> Two eth interfaces (comma-separated, no spaces)\n"
537 "Optional OPTIONS:\n"
538 " -a, --accuracy <sec> Time in seconds get print statistics\n"
539 " (default is 10 seconds).\n"
540 " -d, --dst_change <arg> 0: Don't change packets' dst eth addresses\n"
541 " 1: Change packets' dst eth addresses (default)\n"
542 " -s, --src_change <arg> 0: Don't change packets' src eth addresses\n"
543 " 1: Change packets' src eth addresses (default)\n"
544 " -r, --dst_addr <addr> Destination address\n"
545 " Requires also the -d flag to be set\n"
546 " -t, --time <sec> Time in seconds to run\n"
547 " -w, --workers <num> Number of worker stages (default 0)\n"
548 " -e, --extra-work Calculate CRC-32C over packet data in worker stage\n"
549 " -h, --help Display help and exit\n\n"
550 "\n", NO_PATH(progname), NO_PATH(progname)
561 static void parse_args(
int argc,
char *argv[], appl_args_t *appl_args)
568 static const struct option longopts[] = {
569 {
"accuracy", required_argument, NULL,
'a'},
570 {
"extra-work", no_argument, NULL,
'e'},
571 {
"dst_addr", required_argument, NULL,
'r'},
572 {
"dst_change", required_argument, NULL,
'd'},
573 {
"src_change", required_argument, NULL,
's'},
574 {
"interface", required_argument, NULL,
'i'},
575 {
"time", required_argument, NULL,
't'},
576 {
"workers", required_argument, NULL,
'w'},
577 {
"help", no_argument, NULL,
'h'},
581 static const char *shortopts =
"+a:d:er:s:t:i:w:h";
583 appl_args->accuracy = 10;
584 appl_args->dst_change = 1;
585 appl_args->src_change = 1;
587 appl_args->extra_work = 0;
590 opt = getopt_long(argc, argv, shortopts, longopts, NULL);
597 appl_args->accuracy = atoi(optarg);
600 appl_args->dst_change = atoi(optarg);
603 appl_args->extra_work = 1;
606 len = strlen(optarg);
613 if (odph_eth_addr_parse(&appl_args->dst_addr,
615 printf(
"invalid MAC address\n");
620 appl_args->dst_set = 1;
624 appl_args->src_change = atoi(optarg);
627 appl_args->time = atoi(optarg);
630 len = strlen(optarg);
637 appl_args->if_str = malloc(len);
638 if (appl_args->if_str == NULL) {
644 strcpy(appl_args->if_str, optarg);
645 for (token = strtok(appl_args->if_str,
","), i = 0;
647 token = strtok(NULL,
","), i++)
658 appl_args->if_names = calloc(if_count,
sizeof(
char *));
661 strcpy(appl_args->if_str, optarg);
662 for (token = strtok(appl_args->if_str,
","), i = 0;
663 token != NULL; token = strtok(NULL,
","), i++) {
664 appl_args->if_names[i] = token;
668 appl_args->num_workers = atoi(optarg);
687 int main(
int argc,
char **argv)
700 odph_helper_options_t helper_options;
703 odph_thread_common_param_t thr_common;
704 odph_ethaddr_t new_addr;
706 thread_args_t *thr_args;
707 uint32_t pkt_len, seg_len, pkt_num;
708 int num_threads, num_workers;
713 argc = odph_parse_options(argc, argv);
714 if (odph_options(&helper_options)) {
715 printf(
"Error: reading ODP helper options failed.\n");
720 init_param.
mem_model = helper_options.mem_model;
723 printf(
"Error: ODP global init failed.\n");
728 printf(
"Error: ODP local init failed.\n");
734 ODP_CACHE_LINE_SIZE, 0);
736 printf(
"Error: shared mem reserve failed.\n");
741 if (global == NULL) {
742 printf(
"Error: shared mem alloc failed.\n");
746 memset(global, 0,
sizeof(global_data_t));
749 signal(SIGINT, sig_handler);
752 parse_args(argc, argv, &global->appl);
754 num_threads = setup_thread_masks(&thr_mask_rx, &thr_mask_tx,
756 global->appl.num_workers);
757 num_workers = num_threads - 2;
760 print_info(NO_PATH(argv[0]), &global->appl);
764 printf(
"Error: reading queue capability failed.\n");
767 if (queue_capa.
plain.
max_num < (
unsigned int)num_threads) {
768 printf(
"Error: insufficient number of queues supported.\n");
776 queue_param.
size = QUEUE_SIZE;
780 for (i = 0; i < num_threads; i++) {
785 printf(
"Error: queue create failed.\n");
788 global->queue[i] = queue;
793 printf(
"Error: reading pool capability failed.\n");
797 pkt_len = POOL_PKT_LEN;
798 seg_len = POOL_PKT_LEN;
799 pkt_num = POOL_PKT_NUM;
812 pool_param.
pkt.
len = pkt_len;
813 pool_param.
pkt.
num = pkt_num;
818 printf(
"Error: packet pool create failed.\n");
822 global->if0 = create_pktio(global->appl.if_names[0], pool,
823 &global->if0in, &global->if0out);
824 global->if1 = create_pktio(global->appl.if_names[1], pool,
825 &global->if1in, &global->if1out);
829 ODPH_ETHADDR_LEN) != ODPH_ETHADDR_LEN) {
830 printf(
"Error: TX interface Ethernet address unknown\n");
835 if (global->appl.dst_change) {
837 memset(&new_addr, 0,
sizeof(odph_ethaddr_t));
838 if (global->appl.dst_set) {
839 memcpy(&new_addr, &global->appl.dst_addr,
840 sizeof(odph_ethaddr_t));
842 new_addr.addr[0] = 0x02;
843 new_addr.addr[5] = 1;
845 global->dst_addr = new_addr;
849 printf(
"Error: unable to start input interface\n");
853 printf(
"Error: unable to start output interface\n");
860 for (i = 0; i < num_threads; i++)
861 stats[i] = &global->thread[i].stats;
863 memset(thr_tbl, 0,
sizeof(thr_tbl));
864 odph_thread_common_param_init(&thr_common);
866 thr_common.instance = instance;
869 thr_args = &global->thread[0];
870 thr_args->tx_queue = global->queue[0];
871 odph_thread_param_init(&thr_param[0]);
872 thr_param[0].start = rx_thread;
873 thr_param[0].arg = thr_args;
875 thr_common.cpumask = &thr_mask_rx;
876 odph_thread_create(thr_tbl, &thr_common, thr_param, 1);
879 for (i = 0; i < num_workers; i++) {
880 thr_args = &global->thread[i + 1];
881 thr_args->rx_queue = global->queue[i];
882 thr_args->tx_queue = global->queue[i + 1];
884 odph_thread_param_init(&thr_param[i]);
885 thr_param[i].start = worker_thread;
886 thr_param[i].arg = thr_args;
891 thr_common.cpumask = &thr_mask_worker;
892 odph_thread_create(&thr_tbl[1], &thr_common, thr_param,
897 thr_args = &global->thread[num_threads - 1];
898 thr_args->rx_queue = global->queue[num_workers];
899 odph_thread_param_init(&thr_param[0]);
900 thr_param[0].start = tx_thread;
901 thr_param[0].arg = thr_args;
903 thr_common.cpumask = &thr_mask_tx;
904 odph_thread_create(&thr_tbl[num_threads - 1], &thr_common, thr_param,
907 ret = print_speed_stats(num_threads, stats, global->appl.time,
908 global->appl.accuracy);
911 printf(
"Error: failed to stop interface %s\n", argv[1]);
915 printf(
"Error: failed to stop interface %s\n", argv[2]);
922 odph_thread_join(thr_tbl, num_threads);
924 free(global->appl.if_names);
925 free(global->appl.if_str);
928 printf(
"Error: failed to close interface %s\n", argv[1]);
932 printf(
"Error: failed to close interface %s\n", argv[2]);
936 for (i = 0; i < num_threads; i++) {
938 printf(
"Error: failed to destroy queue %d\n", i);
944 printf(
"Error: pool destroy\n");
949 printf(
"Error: shm free global data\n");
954 printf(
"Error: term local\n");
959 printf(
"Error: term global\n");
void odp_atomic_init_u32(odp_atomic_u32_t *atom, uint32_t val)
Initialize atomic uint32 variable.
uint32_t odp_atomic_load_u32(odp_atomic_u32_t *atom)
Load value of atomic uint32 variable.
void odp_atomic_store_u32(odp_atomic_u32_t *atom, uint32_t val)
Store value to atomic uint32 variable.
void odp_barrier_init(odp_barrier_t *barr, int count)
Initialize barrier with thread count.
void odp_barrier_wait(odp_barrier_t *barr)
Synchronize thread execution on barrier.
#define ODP_ALIGNED_CACHE
Defines type/struct/variable to be cache line size aligned.
#define odp_unlikely(x)
Branch unlikely taken.
#define ODP_UNUSED
Intentionally unused variables of functions.
void odp_cpumask_set(odp_cpumask_t *mask, int cpu)
Add CPU to mask.
int odp_cpumask_default_worker(odp_cpumask_t *mask, int num)
Default CPU mask for worker threads.
int odp_cpumask_first(const odp_cpumask_t *mask)
Find first set CPU in mask.
int odp_cpumask_next(const odp_cpumask_t *mask, int cpu)
Find next set CPU in mask.
void odp_cpumask_zero(odp_cpumask_t *mask)
Clear entire CPU mask.
void odp_event_free_multi(const odp_event_t event[], int num)
Free multiple events.
uint32_t odp_hash_crc32c(const void *data, uint32_t data_len, uint32_t init_val)
Calculate CRC-32C.
void odp_init_param_init(odp_init_t *param)
Initialize the odp_init_t to default values for all fields.
int odp_init_local(odp_instance_t instance, odp_thread_type_t thr_type)
Thread local ODP initialization.
int odp_init_global(odp_instance_t *instance, const odp_init_t *params, const odp_platform_init_t *platform_params)
Global ODP initialization.
int odp_term_local(void)
Thread local ODP termination.
int odp_term_global(odp_instance_t instance)
Global ODP termination.
uint64_t odp_instance_t
ODP instance ID.
int odp_pktio_mac_addr(odp_pktio_t pktio, void *mac_addr, int size)
Get the default MAC address of a packet IO interface.
void odp_pktin_queue_param_init(odp_pktin_queue_param_t *param)
Initialize packet input queue parameters.
void odp_pktio_param_init(odp_pktio_param_t *param)
Initialize pktio params.
int odp_pktio_close(odp_pktio_t pktio)
Close a packet IO interface.
int odp_pktout_queue(odp_pktio_t pktio, odp_pktout_queue_t queues[], int num)
Direct packet output queues.
void odp_pktio_config_init(odp_pktio_config_t *config)
Initialize packet IO configuration options.
odp_pktio_t odp_pktio_open(const char *name, odp_pool_t pool, const odp_pktio_param_t *param)
Open a packet IO interface.
int odp_pktio_config(odp_pktio_t pktio, const odp_pktio_config_t *config)
Configure packet IO interface options.
int odp_pktio_start(odp_pktio_t pktio)
Start packet receive and transmit.
#define ODP_PKTIO_INVALID
Invalid packet IO handle.
int odp_pktin_queue(odp_pktio_t pktio, odp_pktin_queue_t queues[], int num)
Direct packet input queues.
void odp_pktout_queue_param_init(odp_pktout_queue_param_t *param)
Initialize packet output queue parameters.
int odp_pktio_stop(odp_pktio_t pktio)
Stop packet receive and transmit.
int odp_pktin_recv(odp_pktin_queue_t queue, odp_packet_t packets[], int num)
Receive packets directly from an interface input queue.
int odp_pktout_send(odp_pktout_queue_t queue, const odp_packet_t packets[], int num)
Send packets directly to an interface output queue.
int odp_pktin_queue_config(odp_pktio_t pktio, const odp_pktin_queue_param_t *param)
Configure packet input queues.
int odp_pktout_queue_config(odp_pktio_t pktio, const odp_pktout_queue_param_t *param)
Configure packet output queues.
@ ODP_PKTIO_OP_MT_UNSAFE
Not multithread safe operation.
void odp_packet_from_event_multi(odp_packet_t pkt[], const odp_event_t ev[], int num)
Convert multiple packet events to packet handles.
odp_event_t odp_packet_to_event(odp_packet_t pkt)
Convert packet handle to event.
void odp_packet_to_event_multi(const odp_packet_t pkt[], odp_event_t ev[], int num)
Convert multiple packet handles to events.
uint32_t odp_packet_seg_len(odp_packet_t pkt)
Packet data length following the data pointer.
void odp_packet_prefetch(odp_packet_t pkt, uint32_t offset, uint32_t len)
Packet data prefetch.
void * odp_packet_data(odp_packet_t pkt)
Packet data pointer.
int odp_packet_has_eth(odp_packet_t pkt)
Check for Ethernet header.
odp_packet_t odp_packet_from_event(odp_event_t ev)
Get packet handle from event.
void odp_packet_free(odp_packet_t pkt)
Free packet.
void odp_packet_free_multi(const odp_packet_t pkt[], int num)
Free multiple packets.
@ ODP_PROTO_LAYER_L2
Layer L2 protocols (Ethernet, VLAN, etc)
odp_pool_t odp_pool_create(const char *name, const odp_pool_param_t *param)
Create a pool.
int odp_pool_capability(odp_pool_capability_t *capa)
Query pool capabilities.
void odp_pool_param_init(odp_pool_param_t *param)
Initialize pool params.
int odp_pool_destroy(odp_pool_t pool)
Destroy a pool previously created by odp_pool_create()
#define ODP_POOL_INVALID
Invalid pool.
@ ODP_POOL_PACKET
Packet pool.
int odp_queue_enq_multi(odp_queue_t queue, const odp_event_t events[], int num)
Enqueue multiple events to a queue.
void odp_queue_param_init(odp_queue_param_t *param)
Initialize queue params.
int odp_queue_capability(odp_queue_capability_t *capa)
Query queue capabilities.
#define ODP_QUEUE_INVALID
Invalid queue.
odp_queue_t odp_queue_create(const char *name, const odp_queue_param_t *param)
Queue create.
int odp_queue_destroy(odp_queue_t queue)
Destroy ODP queue.
int odp_queue_deq_multi(odp_queue_t queue, odp_event_t events[], int num)
Dequeue multiple events from a queue.
@ ODP_QUEUE_TYPE_PLAIN
Plain queue.
@ ODP_QUEUE_OP_MT_UNSAFE
Not multithread safe operation.
int odp_shm_free(odp_shm_t shm)
Free a contiguous block of shared memory.
#define ODP_SHM_INVALID
Invalid shared memory block.
void * odp_shm_addr(odp_shm_t shm)
Shared memory block address.
odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, uint32_t flags)
Reserve a contiguous block of shared memory.
void odp_sys_info_print(void)
Print system info.
#define ODP_THREAD_COUNT_MAX
Maximum number of threads supported in build time.
@ ODP_THREAD_WORKER
Worker thread.
@ ODP_THREAD_CONTROL
Control thread.
Global initialization parameters.
odp_mem_model_t mem_model
Application memory model.
Packet input queue parameters.
odp_pktio_op_mode_t op_mode
Operation mode.
Packet IO configuration options.
odp_pktio_parser_config_t parser
Packet input parser configuration.
odp_proto_layer_t layer
Protocol parsing level in packet input.
Packet output queue parameters.
odp_pktio_op_mode_t op_mode
Operation mode.
struct odp_pool_capability_t::@122 pkt
Packet pool capabilities
uint32_t max_num
Maximum number of buffers of any size.
uint32_t max_seg_len
Maximum packet segment data length in bytes.
uint32_t max_len
Maximum packet data length in bytes.
uint32_t num
Number of buffers in the pool.
odp_pool_type_t type
Pool type.
uint32_t len
Minimum length of 'num' packets.
uint32_t seg_len
Minimum number of packet data bytes that can be stored in the first segment of a newly allocated pack...
struct odp_pool_param_t::@126 pkt
Parameters for packet pools.
uint32_t max_size
Maximum number of events a plain (ODP_BLOCKING) queue can store simultaneously.
uint32_t max_num
Maximum number of plain (ODP_BLOCKING) queues of the default size.
struct odp_queue_capability_t::@140 plain
Plain queue capabilities.
odp_queue_op_mode_t enq_mode
Enqueue mode.
odp_queue_type_t type
Queue type.
odp_queue_op_mode_t deq_mode
Dequeue mode.