API Reference Manual  1.45.1
odp_pktio_ordered.c

Test application for ordered packet IO

/* SPDX-License-Identifier: BSD-3-Clause
* Copyright (c) 2016-2018 Linaro Limited
*/
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <stdlib.h>
#include <getopt.h>
#include <unistd.h>
#include <errno.h>
#include <inttypes.h>
#include "dummy_crc.h"
#include <odp_api.h>
#include <odp/helper/odph_api.h>
#define rot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
#define mix(a, b, c) \
{ \
a -= c; a ^= rot(c, 4); c += b; \
b -= a; b ^= rot(a, 6); a += c; \
c -= b; c ^= rot(b, 8); b += a; \
a -= c; a ^= rot(c, 16); c += b; \
b -= a; b ^= rot(a, 19); a += c; \
c -= b; c ^= rot(b, 4); b += a; \
}
#define final(a, b, c) \
{ \
c ^= b; c -= rot(b, 14); \
a ^= c; a -= rot(c, 11); \
b ^= a; b -= rot(a, 25); \
c ^= b; c -= rot(b, 16); \
a ^= c; a -= rot(c, 4); \
b ^= a; b -= rot(a, 14); \
c ^= b; c -= rot(b, 24); \
}
#define JHASH_GOLDEN_RATIO 0x9e3779b9
/* Maximum pool and queue size */
#define MAX_NUM_PKT (8 * 1024)
#define MAX_WORKERS (ODP_THREAD_COUNT_MAX - 1)
#define PKT_POOL_BUF_SIZE 1856
#define PKT_UAREA_SIZE 32
#define MAX_PKT_BURST 32
#define MAX_QUEUES 32
#define MAX_PKTIOS 8
#define MAX_FLOWS 128
ODP_STATIC_ASSERT(MAX_PKTIOS < MAX_FLOWS,
"MAX_FLOWS must be greater than MAX_PKTIOS\n");
#define MIN_PACKET_LEN (ODPH_ETHHDR_LEN + ODPH_IPV4HDR_LEN + ODPH_UDPHDR_LEN)
#define DEF_NUM_RX_QUEUES 1
#define DEF_NUM_FLOWS 12
#define DEF_EXTRA_ROUNDS 15
#define DEF_STATS_INT 1
#define NO_PATH(file_name) (strrchr((file_name), '/') ? \
strrchr((file_name), '/') + 1 : (file_name))
typedef enum pktin_mode_t {
SCHED_ORDERED = 0,
SCHED_ATOMIC,
SCHED_PARALLEL
} pktin_mode_t;
typedef struct {
unsigned int cpu_count;
int if_count;
int addr_count;
int num_rx_q;
int num_flows;
int extra_rounds;
char **if_names;
odph_ethaddr_t addrs[MAX_PKTIOS];
pktin_mode_t in_mode;
int time;
int accuracy;
char *if_str;
int promisc_mode;
} appl_args_t;
typedef struct {
odp_bool_t input_queue;
uint64_t idx;
uint64_t seq[MAX_FLOWS];
} qcontext_t;
typedef struct {
uint64_t seq;
uint32_t crc;
uint16_t idx;
uint8_t src_idx;
uint8_t dst_idx;
} flow_t;
ODP_STATIC_ASSERT(sizeof(flow_t) <= PKT_UAREA_SIZE,
"Flow data doesn't fit in the packet user area\n");
typedef union ODP_ALIGNED_CACHE {
struct {
uint64_t packets;
uint64_t rx_drops;
uint64_t tx_drops;
uint64_t invalid_seq;
} s;
uint8_t padding[ODP_CACHE_LINE_SIZE];
} stats_t;
typedef struct {
uint32_t src_ip;
uint32_t dst_ip;
uint16_t src_port;
uint16_t dst_port;
uint8_t proto;
uint8_t pad0;
uint16_t pad1;
} ipv4_tuple5_t;
typedef struct {
odph_ethhdr_t *eth;
odph_ipv4hdr_t *ipv4;
odph_udphdr_t *udp;
} packet_hdr_t;
typedef struct thread_args_t {
stats_t *stats;
} thread_args_t;
typedef struct {
stats_t stats[MAX_WORKERS];
appl_args_t appl;
thread_args_t thread[MAX_WORKERS];
odph_ethaddr_t port_eth_addr[MAX_PKTIOS];
odph_ethaddr_t dst_eth_addr[MAX_PKTIOS];
int dst_port[MAX_PKTIOS];
odp_queue_t fqueue[MAX_PKTIOS][MAX_FLOWS];
qcontext_t flow_qcontext[MAX_PKTIOS][MAX_FLOWS];
qcontext_t input_qcontext[MAX_PKTIOS][MAX_QUEUES];
struct {
odp_pktio_t pktio;
odp_pktout_queue_t pktout[MAX_FLOWS];
odp_queue_t pktin[MAX_QUEUES];
int num_rx_queue;
int num_tx_queue;
} pktios[MAX_PKTIOS];
odp_barrier_t barrier;
odp_atomic_u32_t exit_threads;
} args_t;
static args_t *gbl_args;
static inline int lookup_dest_port(odp_packet_t pkt)
{
int i, src_idx;
odp_pktio_t pktio_src;
pktio_src = odp_packet_input(pkt);
for (src_idx = -1, i = 0; gbl_args->pktios[i].pktio
if (gbl_args->pktios[i].pktio == pktio_src)
src_idx = i;
if (src_idx == -1)
ODPH_ABORT("Failed to determine pktio input\n");
return gbl_args->dst_port[src_idx];
}
static inline int packet_hdr(odp_packet_t pkt, packet_hdr_t *hdr)
{
uint8_t *udp;
uint16_t eth_type;
uint8_t ihl;
if (odp_unlikely(odp_packet_seg_len(pkt) < MIN_PACKET_LEN))
return -1;
return -1;
hdr->eth = odp_packet_l2_ptr(pkt, NULL);
eth_type = odp_be_to_cpu_16(hdr->eth->type);
if (odp_unlikely(eth_type != ODPH_ETHTYPE_IPV4))
return -1;
hdr->ipv4 = (odph_ipv4hdr_t *)(hdr->eth + 1);
if (odp_unlikely(hdr->ipv4->proto != ODPH_IPPROTO_UDP))
return -1;
ihl = ODPH_IPV4HDR_IHL(hdr->ipv4->ver_ihl);
if (odp_unlikely(ihl < ODPH_IPV4HDR_IHL_MIN))
return -1;
udp = (uint8_t *)hdr->ipv4 + (ihl * 4);
hdr->udp = (odph_udphdr_t *)udp;
return 0;
}
static inline uint64_t calc_ipv4_5tuple_hash(ipv4_tuple5_t *tuple)
{
uint32_t a, b, c;
a = tuple->proto + JHASH_GOLDEN_RATIO;
b = tuple->src_ip + JHASH_GOLDEN_RATIO;
c = tuple->dst_ip + JHASH_GOLDEN_RATIO;
mix(a, b, c);
a += ((uint32_t)tuple->src_port << 16) + tuple->dst_port + JHASH_GOLDEN_RATIO;
final(a, b, c);
return c;
}
static inline uint64_t calc_flow_idx(packet_hdr_t *hdr)
{
ipv4_tuple5_t tuple;
uint64_t idx;
tuple.dst_ip = odp_be_to_cpu_32(hdr->ipv4->dst_addr);
tuple.src_ip = odp_be_to_cpu_32(hdr->ipv4->src_addr);
tuple.proto = hdr->ipv4->proto;
tuple.src_port = odp_be_to_cpu_16(hdr->udp->src_port);
tuple.dst_port = odp_be_to_cpu_16(hdr->udp->dst_port);
tuple.pad0 = 0;
tuple.pad1 = 0;
idx = calc_ipv4_5tuple_hash(&tuple);
return idx % gbl_args->appl.num_flows;
}
static inline void fill_eth_addrs(packet_hdr_t *hdr, int dst_port)
{
hdr->eth->src = gbl_args->port_eth_addr[dst_port];
hdr->eth->dst = gbl_args->dst_eth_addr[dst_port];
}
static inline void process_flow(odp_event_t ev_tbl[], int num, stats_t *stats,
qcontext_t *qcontext,
odp_pktout_queue_t pktout[][MAX_FLOWS])
{
flow_t *flow;
uint64_t queue_seq;
int dst_if;
int i;
int sent;
for (i = 0; i < num; i++) {
pkt = odp_packet_from_event(ev_tbl[i]);
flow = odp_packet_user_area(pkt);
queue_seq = qcontext->seq[flow->src_idx];
/* Check sequence number */
if (gbl_args->appl.in_mode != SCHED_PARALLEL &&
odp_unlikely(flow->seq != queue_seq)) {
printf("Invalid sequence number: packet_seq=%" PRIu64 ""
" queue_seq=%" PRIu64 ", src_if=%" PRIu8 ", "
"dst_if=%" PRIu8 ", flow=%" PRIu16 "\n",
flow->seq, queue_seq, flow->src_idx,
flow->dst_idx, flow->idx);
qcontext->seq[flow->src_idx] = flow->seq + 1;
stats->s.invalid_seq++;
} else {
qcontext->seq[flow->src_idx]++;
}
dst_if = flow->dst_idx;
sent = odp_pktout_send(pktout[dst_if][flow->idx], &pkt, 1);
if (odp_unlikely(sent != 1)) {
stats->s.tx_drops++;
}
stats->s.packets++;
}
}
static inline void process_input(odp_event_t ev_tbl[], int num, stats_t *stats,
qcontext_t *qcontext)
{
flow_t *flow;
flow_t *flow_tbl[MAX_PKT_BURST];
int ret;
int i, j;
int pkts = 0;
for (i = 0; i < num; i++) {
packet_hdr_t hdr;
int flow_idx;
pkt = odp_packet_from_event(ev_tbl[i]);
odp_packet_prefetch(pkt, 0, MIN_PACKET_LEN);
ret = packet_hdr(pkt, &hdr);
if (odp_unlikely(ret)) {
stats->s.rx_drops++;
continue;
}
flow_idx = calc_flow_idx(&hdr);
fill_eth_addrs(&hdr, flow_idx);
flow = odp_packet_user_area(pkt);
flow->idx = flow_idx;
flow->src_idx = qcontext->idx;
flow->dst_idx = lookup_dest_port(pkt);
flow_tbl[pkts] = flow;
/* Simulate "fat pipe" processing by generating extra work */
for (j = 0; j < gbl_args->appl.extra_rounds; j++)
flow->crc = dummy_hash_crc32c(odp_packet_data(pkt),
odp_packet_len(pkt), 0);
pkts++;
}
if (odp_unlikely(!pkts))
return;
/* Set sequence numbers */
if (gbl_args->appl.in_mode == SCHED_ORDERED)
for (i = 0; i < pkts; i++) {
flow = flow_tbl[i];
flow->seq = qcontext->seq[flow->idx]++;
}
if (gbl_args->appl.in_mode == SCHED_ORDERED)
for (i = 0; i < pkts; i++) {
flow = flow_tbl[i];
ret = odp_queue_enq(gbl_args->fqueue[flow->dst_idx][flow->idx],
ev_tbl[i]);
if (odp_unlikely(ret != 0)) {
ODPH_ERR("odp_queue_enq() failed\n");
stats->s.tx_drops++;
odp_event_free(ev_tbl[i]);
} else {
stats->s.packets++;
}
}
}
static int run_worker(void *arg)
{
odp_event_t ev_tbl[MAX_PKT_BURST];
odp_queue_t queue;
odp_pktout_queue_t pktout[MAX_PKTIOS][MAX_FLOWS];
qcontext_t *qcontext;
thread_args_t *thr_args = arg;
stats_t *stats = thr_args->stats;
int pkts;
int i, j;
memset(pktout, 0, sizeof(pktout));
for (i = 0; i < gbl_args->appl.if_count; i++) {
for (j = 0; j < gbl_args->appl.num_flows; j++) {
pktout[i][j] = gbl_args->pktios[i].pktout[j %
gbl_args->pktios[i].num_tx_queue];
}
}
odp_barrier_wait(&gbl_args->barrier);
/* Loop packets */
while (!odp_atomic_load_u32(&gbl_args->exit_threads)) {
pkts = odp_schedule_multi(&queue, ODP_SCHED_NO_WAIT, ev_tbl,
MAX_PKT_BURST);
if (pkts <= 0)
continue;
qcontext = odp_queue_context(queue);
if (qcontext->input_queue)
process_input(ev_tbl, pkts, stats, qcontext);
else
process_flow(ev_tbl, pkts, stats, qcontext, pktout);
}
/* Free remaining events in queues */
while (1) {
ev = odp_schedule(NULL,
if (ev == ODP_EVENT_INVALID)
break;
}
return 0;
}
static int create_pktio(const char *dev, int idx, int num_rx, int num_tx,
odp_pool_t pool)
{
odp_pktio_t pktio;
odp_pktio_param_t pktio_param;
int i;
odp_pktio_param_init(&pktio_param);
pktio = odp_pktio_open(dev, pool, &pktio_param);
if (pktio == ODP_PKTIO_INVALID) {
ODPH_ERR("Error: failed to open %s\n", dev);
return -1;
}
printf("Created pktio %" PRIu64 " (%s)\n",
odp_pktio_to_u64(pktio), dev);
if (odp_pktio_capability(pktio, &capa)) {
ODPH_ERR("Error: capability query failed %s\n", dev);
return -1;
}
odp_pktio_config(pktio, &config);
if (gbl_args->appl.promisc_mode && odp_pktio_promisc_mode(pktio) != 1) {
if (!capa.set_op.op.promisc_mode) {
ODPH_ERR("Error: promisc mode set not supported %s\n",
dev);
return -1;
}
/* Enable promisc mode */
if (odp_pktio_promisc_mode_set(pktio, true)) {
ODPH_ERR("Error: promisc mode enable failed %s\n", dev);
return -1;
}
}
mode_tx = ODP_PKTIO_OP_MT;
mode_rx = ODP_PKTIO_OP_MT;
if (gbl_args->appl.in_mode == SCHED_ATOMIC) {
} else if (gbl_args->appl.in_mode == SCHED_PARALLEL) {
} else {
pktin_param.queue_param.sched.lock_count = 1;
}
if (num_rx > (int)capa.max_input_queues) {
printf("Allocating %i shared input queues, %i requested\n",
capa.max_input_queues, num_rx);
num_rx = capa.max_input_queues;
mode_rx = ODP_PKTIO_OP_MT;
}
if (num_tx > (int)capa.max_output_queues) {
printf("Allocating %i shared output queues, %i requested\n",
capa.max_output_queues, num_tx);
num_tx = capa.max_output_queues;
mode_tx = ODP_PKTIO_OP_MT;
}
pktin_param.hash_enable = (num_rx > 1) ? 1 : 0;
pktin_param.hash_proto.proto.ipv4_udp = 1;
pktin_param.num_queues = num_rx;
pktin_param.op_mode = mode_rx;
pktout_param.op_mode = mode_tx;
pktout_param.num_queues = num_tx;
if (odp_pktin_queue_config(pktio, &pktin_param)) {
ODPH_ERR("Error: input queue config failed %s\n", dev);
return -1;
}
if (odp_pktout_queue_config(pktio, &pktout_param)) {
ODPH_ERR("Error: output queue config failed %s\n", dev);
return -1;
}
if (odp_pktin_event_queue(pktio, gbl_args->pktios[idx].pktin,
num_rx) != num_rx) {
ODPH_ERR("Error: pktin event queue query failed %s\n", dev);
return -1;
}
/* Set queue contexts */
for (i = 0; i < num_rx; i++) {
gbl_args->input_qcontext[idx][i].idx = idx;
gbl_args->input_qcontext[idx][i].input_queue = 1;
if (odp_queue_context_set(gbl_args->pktios[idx].pktin[i],
&gbl_args->input_qcontext[idx][i],
sizeof(qcontext_t))) {
ODPH_ERR("Error: pktin queue context set failed %s\n",
dev);
return -1;
}
}
if (odp_pktout_queue(pktio,
gbl_args->pktios[idx].pktout,
num_tx) != num_tx) {
ODPH_ERR("Error: pktout queue query failed %s\n", dev);
return -1;
}
printf("Created %i input and %i output queues on (%s)\n",
num_rx, num_tx, dev);
gbl_args->pktios[idx].num_rx_queue = num_rx;
gbl_args->pktios[idx].num_tx_queue = num_tx;
gbl_args->pktios[idx].pktio = pktio;
return 0;
}
static int print_speed_stats(int num_workers, stats_t *thr_stats,
int duration, int timeout)
{
uint64_t pkts = 0;
uint64_t pkts_prev = 0;
uint64_t pps;
uint64_t rx_drops, tx_drops, invalid_seq;
uint64_t maximum_pps = 0;
int i;
int elapsed = 0;
int stats_enabled = 1;
int loop_forever = (duration == 0);
if (timeout <= 0) {
stats_enabled = 0;
timeout = 1;
}
/* Wait for all threads to be ready*/
odp_barrier_wait(&gbl_args->barrier);
do {
pkts = 0;
rx_drops = 0;
tx_drops = 0;
invalid_seq = 0;
sleep(timeout);
for (i = 0; i < num_workers; i++) {
pkts += thr_stats[i].s.packets;
rx_drops += thr_stats[i].s.rx_drops;
tx_drops += thr_stats[i].s.tx_drops;
invalid_seq += thr_stats[i].s.invalid_seq;
}
if (stats_enabled) {
pps = (pkts - pkts_prev) / timeout;
if (pps > maximum_pps)
maximum_pps = pps;
printf("%" PRIu64 " pps, %" PRIu64 " max pps, ", pps,
maximum_pps);
printf("%" PRIu64 " rx drops, %" PRIu64 " tx drops, ",
rx_drops, tx_drops);
printf("%" PRIu64 " invalid seq\n", invalid_seq);
pkts_prev = pkts;
}
elapsed += timeout;
} while (loop_forever || (elapsed < duration));
if (stats_enabled)
printf("TEST RESULT: %" PRIu64 " maximum packets per second.\n",
maximum_pps);
return (pkts > 100 && !invalid_seq) ? 0 : -1;
}
static int find_dest_port(int port)
{
/* Even number of ports */
if (gbl_args->appl.if_count % 2 == 0)
return (port % 2 == 0) ? port + 1 : port - 1;
/* Odd number of ports */
if (port == gbl_args->appl.if_count - 1)
return 0;
else
return port + 1;
}
static void init_forwarding_tbl(void)
{
int rx_idx;
for (rx_idx = 0; rx_idx < gbl_args->appl.if_count; rx_idx++)
gbl_args->dst_port[rx_idx] = find_dest_port(rx_idx);
}
static void usage(char *progname)
{
printf("\n"
"OpenDataPlane ordered pktio application.\n"
"\n"
"Usage: %s OPTIONS\n"
" E.g. %s -i eth0,eth1\n"
" In the above example,\n"
" eth0 will send pkts to eth1 and vice versa\n"
"\n"
"Mandatory OPTIONS:\n"
" -i, --interface Eth interfaces (comma-separated, no spaces)\n"
" Interface count min 1, max %i\n"
"\n"
"Optional OPTIONS:\n"
" -m, --mode Packet input mode\n"
" 0: Scheduled ordered queues (default)\n"
" 1: Scheduled atomic queues\n"
" 2: Scheduled parallel queues (packet order not maintained)\n"
" -r, --num_rx_q Number of RX queues per interface\n"
" -f, --num_flows Number of packet flows\n"
" -e, --extra_input <number> Number of extra input processing rounds\n"
" -c, --count <number> CPU count, 0=all available, default=1\n"
" -t, --time <number> Time in seconds to run.\n"
" -a, --accuracy <number> Statistics print interval in seconds\n"
" (default is 1 second).\n"
" -d, --dst_addr Destination addresses (comma-separated, no spaces)\n"
" -P, --promisc_mode Enable promiscuous mode.\n"
" -h, --help Display help and exit.\n\n"
"\n", NO_PATH(progname), NO_PATH(progname), MAX_PKTIOS
);
}
static void parse_args(int argc, char *argv[], appl_args_t *appl_args)
{
int opt;
int long_index;
char *token;
char *addr_str;
size_t len;
int i;
static const struct option longopts[] = {
{"count", required_argument, NULL, 'c'},
{"time", required_argument, NULL, 't'},
{"accuracy", required_argument, NULL, 'a'},
{"interface", required_argument, NULL, 'i'},
{"mode", required_argument, NULL, 'm'},
{"dst_addr", required_argument, NULL, 'd'},
{"num_rx_q", required_argument, NULL, 'r'},
{"num_flows", required_argument, NULL, 'f'},
{"extra_input", required_argument, NULL, 'e'},
{"promisc_mode", no_argument, NULL, 'P'},
{"help", no_argument, NULL, 'h'},
{NULL, 0, NULL, 0}
};
static const char *shortopts = "+c:t:a:i:m:d:r:f:e:Ph";
appl_args->time = 0; /* loop forever if time to run is 0 */
appl_args->accuracy = DEF_STATS_INT;
appl_args->cpu_count = 1; /* use one worker by default */
appl_args->num_rx_q = DEF_NUM_RX_QUEUES;
appl_args->num_flows = DEF_NUM_FLOWS;
appl_args->extra_rounds = DEF_EXTRA_ROUNDS;
appl_args->promisc_mode = 0;
while (1) {
opt = getopt_long(argc, argv, shortopts, longopts, &long_index);
if (opt == -1)
break; /* No more options */
switch (opt) {
case 'c':
appl_args->cpu_count = atoi(optarg);
break;
case 't':
appl_args->time = atoi(optarg);
break;
case 'a':
appl_args->accuracy = atoi(optarg);
break;
/* parse packet-io interface names */
case 'd':
len = strlen(optarg);
if (len == 0) {
usage(argv[0]);
exit(EXIT_FAILURE);
}
len += 1; /* add room for '\0' */
addr_str = malloc(len);
if (addr_str == NULL) {
usage(argv[0]);
exit(EXIT_FAILURE);
}
/* store the mac addresses names */
strcpy(addr_str, optarg);
for (token = strtok(addr_str, ","), i = 0;
token != NULL; token = strtok(NULL, ","), i++) {
if (i >= MAX_PKTIOS) {
printf("too many MAC addresses\n");
usage(argv[0]);
exit(EXIT_FAILURE);
}
if (odph_eth_addr_parse(&appl_args->addrs[i],
token) != 0) {
printf("invalid MAC address\n");
usage(argv[0]);
exit(EXIT_FAILURE);
}
}
appl_args->addr_count = i;
if (appl_args->addr_count < 1) {
usage(argv[0]);
exit(EXIT_FAILURE);
}
free(addr_str);
break;
case 'i':
len = strlen(optarg);
if (len == 0) {
usage(argv[0]);
exit(EXIT_FAILURE);
}
len += 1; /* add room for '\0' */
appl_args->if_str = malloc(len);
if (appl_args->if_str == NULL) {
usage(argv[0]);
exit(EXIT_FAILURE);
}
/* count the number of tokens separated by ',' */
strcpy(appl_args->if_str, optarg);
for (token = strtok(appl_args->if_str, ","), i = 0;
token != NULL;
token = strtok(NULL, ","), i++)
;
appl_args->if_count = i;
if (appl_args->if_count < 1 ||
appl_args->if_count > MAX_PKTIOS) {
usage(argv[0]);
exit(EXIT_FAILURE);
}
/* allocate storage for the if names */
appl_args->if_names =
calloc(appl_args->if_count, sizeof(char *));
/* store the if names (reset names string) */
strcpy(appl_args->if_str, optarg);
for (token = strtok(appl_args->if_str, ","), i = 0;
token != NULL; token = strtok(NULL, ","), i++) {
appl_args->if_names[i] = token;
}
break;
case 'm':
i = atoi(optarg);
if (i == 1)
appl_args->in_mode = SCHED_ATOMIC;
else if (i == 2)
appl_args->in_mode = SCHED_PARALLEL;
else
appl_args->in_mode = SCHED_ORDERED;
break;
case 'r':
appl_args->num_rx_q = atoi(optarg);
break;
case 'f':
appl_args->num_flows = atoi(optarg);
break;
case 'e':
appl_args->extra_rounds = atoi(optarg);
break;
case 'P':
appl_args->promisc_mode = 1;
break;
case 'h':
usage(argv[0]);
exit(EXIT_SUCCESS);
break;
default:
break;
}
}
if (appl_args->num_flows > MAX_FLOWS) {
printf("Too many flows requested %d, max: %d\n",
appl_args->num_flows, MAX_FLOWS);
exit(EXIT_FAILURE);
}
if (appl_args->if_count == 0 || appl_args->num_flows == 0 ||
appl_args->num_rx_q == 0) {
usage(argv[0]);
exit(EXIT_FAILURE);
}
if (appl_args->addr_count != 0 &&
appl_args->addr_count != appl_args->if_count) {
printf("Number of destination addresses differs from number"
" of interfaces\n");
usage(argv[0]);
exit(EXIT_FAILURE);
}
optind = 1; /* reset 'extern optind' from the getopt lib */
}
static void print_info(char *progname, appl_args_t *appl_args)
{
int i;
printf("%s options\n"
"-------------------------\n"
"IF-count: %i\n"
"Using IFs: ",
progname, appl_args->if_count);
for (i = 0; i < appl_args->if_count; ++i)
printf(" %s", appl_args->if_names[i]);
printf("\n"
"Input queues: %d\n"
"Mode: %s\n"
"Flows: %d\n"
"Extra rounds: %d\n"
"Promisc mode: %s\n", appl_args->num_rx_q,
(appl_args->in_mode == SCHED_ATOMIC) ? "PKTIN_SCHED_ATOMIC" :
(appl_args->in_mode == SCHED_PARALLEL ? "PKTIN_SCHED_PARALLEL" :
"PKTIN_SCHED_ORDERED"), appl_args->num_flows,
appl_args->extra_rounds, appl_args->promisc_mode ?
"enabled" : "disabled");
fflush(NULL);
}
static void gbl_args_init(args_t *args)
{
int pktio, queue;
memset(args, 0, sizeof(args_t));
odp_atomic_init_u32(&args->exit_threads, 0);
for (pktio = 0; pktio < MAX_PKTIOS; pktio++) {
args->pktios[pktio].pktio = ODP_PKTIO_INVALID;
for (queue = 0; queue < MAX_QUEUES; queue++)
args->pktios[pktio].pktin[queue] = ODP_QUEUE_INVALID;
}
}
int main(int argc, char *argv[])
{
odp_cpumask_t cpumask;
odp_instance_t instance;
odp_init_t init_param;
odp_pool_t pool;
odp_shm_t shm;
odp_schedule_config_t schedule_config;
odph_ethaddr_t new_addr;
odph_helper_options_t helper_options;
odph_thread_t thread_tbl[MAX_WORKERS];
odph_thread_common_param_t thr_common;
odph_thread_param_t thr_param[MAX_WORKERS];
stats_t *stats;
char cpumaskstr[ODP_CPUMASK_STR_SIZE];
int i, j;
int if_count;
int ret;
int num_workers;
uint32_t queue_size, pool_size;
/* Let helper collect its own arguments (e.g. --odph_proc) */
argc = odph_parse_options(argc, argv);
if (odph_options(&helper_options)) {
ODPH_ERR("Error: reading ODP helper options failed.\n");
exit(EXIT_FAILURE);
}
odp_init_param_init(&init_param);
init_param.mem_model = helper_options.mem_model;
/* Init ODP before calling anything else */
if (odp_init_global(&instance, &init_param, NULL)) {
ODPH_ERR("Error: ODP global init failed.\n");
exit(EXIT_FAILURE);
}
/* Init this thread */
ODPH_ERR("Error: ODP local init failed.\n");
exit(EXIT_FAILURE);
}
if (odp_schedule_capability(&schedule_capa)) {
printf("Error: Schedule capa failed.\n");
return -1;
}
if (odp_pool_capability(&pool_capa)) {
ODPH_ERR("Error: Pool capa failed\n");
exit(EXIT_FAILURE);
}
/* Reserve memory for args from shared mem */
shm = odp_shm_reserve("shm_args", sizeof(args_t),
ODP_CACHE_LINE_SIZE, 0);
if (shm == ODP_SHM_INVALID) {
ODPH_ERR("Error: shared mem reserve failed.\n");
exit(EXIT_FAILURE);
}
gbl_args = odp_shm_addr(shm);
if (gbl_args == NULL) {
ODPH_ERR("Error: shared mem alloc failed.\n");
exit(EXIT_FAILURE);
}
gbl_args_init(gbl_args);
/* Parse and store the application arguments */
parse_args(argc, argv, &gbl_args->appl);
odp_schedule_config_init(&schedule_config);
odp_schedule_config(&schedule_config);
if (gbl_args->appl.in_mode == SCHED_ORDERED) {
/* At least one ordered lock required */
if (schedule_capa.max_ordered_locks < 1) {
ODPH_ERR("Error: Ordered locks not available.\n");
exit(EXIT_FAILURE);
}
}
/* Print both system and application information */
print_info(NO_PATH(argv[0]), &gbl_args->appl);
num_workers = MAX_WORKERS;
if (gbl_args->appl.cpu_count && gbl_args->appl.cpu_count < MAX_WORKERS)
num_workers = gbl_args->appl.cpu_count;
/* Get default worker cpumask */
num_workers = odp_cpumask_default_worker(&cpumask, num_workers);
(void)odp_cpumask_to_str(&cpumask, cpumaskstr, sizeof(cpumaskstr));
if_count = gbl_args->appl.if_count;
printf("Num worker threads: %i\n", num_workers);
printf("First CPU: %i\n", odp_cpumask_first(&cpumask));
printf("CPU mask: %s\n\n", cpumaskstr);
pool_size = MAX_NUM_PKT;
if (pool_capa.pkt.max_num && pool_capa.pkt.max_num < MAX_NUM_PKT)
pool_size = pool_capa.pkt.max_num;
queue_size = MAX_NUM_PKT;
if (schedule_config.queue_size &&
schedule_config.queue_size < MAX_NUM_PKT)
queue_size = schedule_config.queue_size;
/* Pool should not be larger than queue, otherwise queue enqueues at
* packet input may fail. */
if (pool_size > queue_size)
pool_size = queue_size;
/* Create packet pool */
params.pkt.seg_len = PKT_POOL_BUF_SIZE;
params.pkt.len = PKT_POOL_BUF_SIZE;
params.pkt.num = pool_size;
params.pkt.uarea_size = PKT_UAREA_SIZE;
pool = odp_pool_create("packet pool", &params);
if (pool == ODP_POOL_INVALID) {
ODPH_ERR("Error: packet pool create failed.\n");
exit(EXIT_FAILURE);
}
init_forwarding_tbl();
for (i = 0; i < if_count; ++i) {
const char *dev = gbl_args->appl.if_names[i];
int num_rx, num_tx;
num_rx = gbl_args->appl.num_rx_q;
num_tx = gbl_args->appl.num_flows;
if (create_pktio(dev, i, num_rx, num_tx, pool))
exit(EXIT_FAILURE);
/* Save interface ethernet address */
if (odp_pktio_mac_addr(gbl_args->pktios[i].pktio,
gbl_args->port_eth_addr[i].addr,
ODPH_ETHADDR_LEN) != ODPH_ETHADDR_LEN) {
ODPH_ERR("Error: interface ethernet address unknown\n");
exit(EXIT_FAILURE);
}
odp_pktio_print(gbl_args->pktios[i].pktio);
/* Save destination eth address */
/* 02:00:00:00:00:XX */
memset(&new_addr, 0, sizeof(odph_ethaddr_t));
if (gbl_args->appl.addr_count) {
memcpy(&new_addr, &gbl_args->appl.addrs[i],
sizeof(odph_ethaddr_t));
} else {
new_addr.addr[0] = 0x02;
new_addr.addr[5] = i;
}
gbl_args->dst_eth_addr[i] = new_addr;
}
gbl_args->pktios[i].pktio = ODP_PKTIO_INVALID;
/* Allocate the same number of flows to each interface */
for (i = 0; i < if_count; i++) {
if (odp_pktio_capability(gbl_args->pktios[i].pktio, &capa)) {
ODPH_ERR("Error: pktio capability failed.\n");
exit(EXIT_FAILURE);
}
if ((uint32_t)gbl_args->appl.num_flows > capa.max_output_queues)
gbl_args->appl.num_flows = capa.max_output_queues;
}
/* Create atomic queues for packet tagging */
for (i = 0; i < if_count; i++) {
for (j = 0; j < gbl_args->appl.num_flows; j++) {
odp_queue_t queue;
char qname[ODP_QUEUE_NAME_LEN];
snprintf(qname, sizeof(qname), "flow_%d_%d", i, j);
qparam.size = queue_size;
gbl_args->flow_qcontext[i][j].idx = i;
gbl_args->flow_qcontext[i][j].input_queue = 0;
qparam.context = &gbl_args->flow_qcontext[i][j];
qparam.context_len = sizeof(qcontext_t);
queue = odp_queue_create(qname, &qparam);
if (queue == ODP_QUEUE_INVALID) {
ODPH_ERR("Error: flow queue create failed.\n");
exit(EXIT_FAILURE);
}
gbl_args->fqueue[i][j] = queue;
}
}
memset(thread_tbl, 0, sizeof(thread_tbl));
stats = gbl_args->stats;
odp_barrier_init(&gbl_args->barrier, num_workers + 1);
/* Create worker threads */
odph_thread_common_param_init(&thr_common);
thr_common.instance = instance;
thr_common.cpumask = &cpumask;
for (i = 0; i < num_workers; ++i) {
gbl_args->thread[i].stats = &stats[i];
odph_thread_param_init(&thr_param[i]);
thr_param[i].start = run_worker;
thr_param[i].arg = &gbl_args->thread[i];
thr_param[i].thr_type = ODP_THREAD_WORKER;
}
odph_thread_create(thread_tbl, &thr_common, thr_param, num_workers);
/* Start packet receive and transmit */
for (i = 0; i < if_count; ++i) {
odp_pktio_t pktio;
pktio = gbl_args->pktios[i].pktio;
ret = odp_pktio_start(pktio);
if (ret) {
ODPH_ERR("Error: unable to start %s\n",
gbl_args->appl.if_names[i]);
exit(EXIT_FAILURE);
}
}
ret = print_speed_stats(num_workers, stats, gbl_args->appl.time,
gbl_args->appl.accuracy);
/* Stop receiving new packet */
for (i = 0; i < if_count; i++)
odp_pktio_stop(gbl_args->pktios[i].pktio);
odp_atomic_store_u32(&gbl_args->exit_threads, 1);
/* Master thread waits for other threads to exit */
odph_thread_join(thread_tbl, num_workers);
for (i = 0; i < if_count; i++) {
odp_pktio_close(gbl_args->pktios[i].pktio);
for (j = 0; j < gbl_args->appl.num_flows; j++)
odp_queue_destroy(gbl_args->fqueue[i][j]);
}
free(gbl_args->appl.if_names);
free(gbl_args->appl.if_str);
if (odp_pool_destroy(pool)) {
ODPH_ERR("Error: pool destroy\n");
exit(EXIT_FAILURE);
}
if (odp_shm_free(shm)) {
ODPH_ERR("Error: shm free\n");
exit(EXIT_FAILURE);
}
if (odp_term_local()) {
ODPH_ERR("Error: term local\n");
exit(EXIT_FAILURE);
}
if (odp_term_global(instance)) {
ODPH_ERR("Error: term global\n");
exit(EXIT_FAILURE);
}
return ret;
}
void odp_atomic_init_u32(odp_atomic_u32_t *atom, uint32_t val)
Initialize atomic uint32 variable.
uint32_t odp_atomic_load_u32(odp_atomic_u32_t *atom)
Load value of atomic uint32 variable.
void odp_atomic_store_u32(odp_atomic_u32_t *atom, uint32_t val)
Store value to atomic uint32 variable.
void odp_barrier_init(odp_barrier_t *barr, int count)
Initialize barrier with thread count.
void odp_barrier_wait(odp_barrier_t *barr)
Synchronize thread execution on barrier.
#define ODP_ALIGNED_CACHE
Defines type/struct/variable to be cache line size aligned.
#define odp_unlikely(x)
Branch unlikely taken.
Definition: spec/hints.h:64
uint16_t odp_be_to_cpu_16(odp_u16be_t be16)
Convert 16bit big endian to cpu native uint16_t.
uint32_t odp_be_to_cpu_32(odp_u32be_t be32)
Convert 32bit big endian to cpu native uint32_t.
int odp_cpumask_default_worker(odp_cpumask_t *mask, int num)
Default CPU mask for worker threads.
int odp_cpumask_first(const odp_cpumask_t *mask)
Find first set CPU in mask.
int32_t odp_cpumask_to_str(const odp_cpumask_t *mask, char *str, int32_t size)
Format a string from CPU mask.
#define ODP_CPUMASK_STR_SIZE
The maximum number of characters needed to record any CPU mask as a string (output of odp_cpumask_to_...
void odp_event_free(odp_event_t event)
Free event.
#define ODP_EVENT_INVALID
Invalid event.
void odp_init_param_init(odp_init_t *param)
Initialize the odp_init_t to default values for all fields.
#define ODP_STATIC_ASSERT(cond, msg)
Compile time assertion macro.
int odp_init_local(odp_instance_t instance, odp_thread_type_t thr_type)
Thread local ODP initialization.
int odp_init_global(odp_instance_t *instance, const odp_init_t *params, const odp_platform_init_t *platform_params)
Global ODP initialization.
int odp_term_local(void)
Thread local ODP termination.
int odp_term_global(odp_instance_t instance)
Global ODP termination.
uint64_t odp_instance_t
ODP instance ID.
int odp_pktio_mac_addr(odp_pktio_t pktio, void *mac_addr, int size)
Get the default MAC address of a packet IO interface.
void odp_pktin_queue_param_init(odp_pktin_queue_param_t *param)
Initialize packet input queue parameters.
void odp_pktio_param_init(odp_pktio_param_t *param)
Initialize pktio params.
int odp_pktio_promisc_mode(odp_pktio_t pktio)
Determine if promiscuous mode is enabled for a packet IO interface.
int odp_pktio_close(odp_pktio_t pktio)
Close a packet IO interface.
int odp_pktout_queue(odp_pktio_t pktio, odp_pktout_queue_t queues[], int num)
Direct packet output queues.
int odp_pktio_promisc_mode_set(odp_pktio_t pktio, odp_bool_t enable)
Set promiscuous mode.
void odp_pktio_config_init(odp_pktio_config_t *config)
Initialize packet IO configuration options.
int odp_pktin_event_queue(odp_pktio_t pktio, odp_queue_t queues[], int num)
Event queues for packet input.
odp_pktio_t odp_pktio_open(const char *name, odp_pool_t pool, const odp_pktio_param_t *param)
Open a packet IO interface.
int odp_pktio_config(odp_pktio_t pktio, const odp_pktio_config_t *config)
Configure packet IO interface options.
void odp_pktio_print(odp_pktio_t pktio)
Print pktio info to the console.
int odp_pktio_start(odp_pktio_t pktio)
Start packet receive and transmit.
#define ODP_PKTIO_INVALID
Invalid packet IO handle.
void odp_pktout_queue_param_init(odp_pktout_queue_param_t *param)
Initialize packet output queue parameters.
int odp_pktio_stop(odp_pktio_t pktio)
Stop packet receive and transmit.
int odp_pktio_capability(odp_pktio_t pktio, odp_pktio_capability_t *capa)
Query packet IO interface capabilities.
int odp_pktout_send(odp_pktout_queue_t queue, const odp_packet_t packets[], int num)
Send packets directly to an interface output queue.
odp_pktio_op_mode_t
Packet IO operation mode.
uint64_t odp_pktio_to_u64(odp_pktio_t pktio)
Get printable value for an odp_pktio_t.
int odp_pktin_queue_config(odp_pktio_t pktio, const odp_pktin_queue_param_t *param)
Configure packet input queues.
int odp_pktout_queue_config(odp_pktio_t pktio, const odp_pktout_queue_param_t *param)
Configure packet output queues.
@ ODP_PKTIO_OP_MT
Multithread safe operation.
@ ODP_PKTIN_MODE_SCHED
Packet input through scheduler and scheduled event queues.
uint32_t odp_packet_seg_len(odp_packet_t pkt)
Packet data length following the data pointer.
void odp_packet_prefetch(odp_packet_t pkt, uint32_t offset, uint32_t len)
Packet data prefetch.
void * odp_packet_data(odp_packet_t pkt)
Packet data pointer.
int odp_packet_has_eth(odp_packet_t pkt)
Check for Ethernet header.
void * odp_packet_user_area(odp_packet_t pkt)
User area address.
uint32_t odp_packet_len(odp_packet_t pkt)
Packet data length.
odp_packet_t odp_packet_from_event(odp_event_t ev)
Get packet handle from event.
void odp_packet_free(odp_packet_t pkt)
Free packet.
void * odp_packet_l2_ptr(odp_packet_t pkt, uint32_t *len)
Layer 2 start pointer.
odp_pktio_t odp_packet_input(odp_packet_t pkt)
Packet input interface.
@ ODP_PROTO_LAYER_L2
Layer L2 protocols (Ethernet, VLAN, etc)
odp_pool_t odp_pool_create(const char *name, const odp_pool_param_t *param)
Create a pool.
int odp_pool_capability(odp_pool_capability_t *capa)
Query pool capabilities.
void odp_pool_param_init(odp_pool_param_t *param)
Initialize pool params.
int odp_pool_destroy(odp_pool_t pool)
Destroy a pool previously created by odp_pool_create()
void odp_pool_print(odp_pool_t pool)
Print pool info.
#define ODP_POOL_INVALID
Invalid pool.
@ ODP_POOL_PACKET
Packet pool.
int odp_queue_context_set(odp_queue_t queue, void *context, uint32_t len)
Set queue context.
void odp_queue_param_init(odp_queue_param_t *param)
Initialize queue params.
#define ODP_QUEUE_INVALID
Invalid queue.
#define ODP_QUEUE_NAME_LEN
Maximum queue name length, including the null character.
void * odp_queue_context(odp_queue_t queue)
Get queue context.
int odp_queue_enq(odp_queue_t queue, odp_event_t ev)
Enqueue an event to a queue.
odp_queue_t odp_queue_create(const char *name, const odp_queue_param_t *param)
Queue create.
int odp_queue_destroy(odp_queue_t queue)
Destroy ODP queue.
@ ODP_QUEUE_TYPE_SCHED
Scheduled queue.
#define ODP_SCHED_SYNC_PARALLEL
Parallel scheduled queues.
int odp_schedule_multi(odp_queue_t *from, uint64_t wait, odp_event_t events[], int num)
Schedule multiple events.
void odp_schedule_config_init(odp_schedule_config_t *config)
Initialize schedule configuration options.
#define ODP_SCHED_SYNC_ATOMIC
Atomic queue synchronization.
#define ODP_SCHED_SYNC_ORDERED
Ordered queue synchronization.
void odp_schedule_order_unlock(uint32_t lock_index)
Release ordered context lock.
#define ODP_SCHED_NO_WAIT
Do not wait.
int odp_schedule_default_prio(void)
Default scheduling priority level.
int odp_schedule_config(const odp_schedule_config_t *config)
Global schedule configuration.
uint64_t odp_schedule_wait_time(uint64_t ns)
Schedule wait time.
int odp_schedule_capability(odp_schedule_capability_t *capa)
Query scheduler capabilities.
odp_event_t odp_schedule(odp_queue_t *from, uint64_t wait)
Schedule an event.
#define ODP_SCHED_GROUP_ALL
Group of all threads.
void odp_schedule_order_lock(uint32_t lock_index)
Acquire ordered context lock.
int odp_shm_free(odp_shm_t shm)
Free a contiguous block of shared memory.
#define ODP_SHM_INVALID
Invalid shared memory block.
void * odp_shm_addr(odp_shm_t shm)
Shared memory block address.
odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, uint32_t flags)
Reserve a contiguous block of shared memory.
bool odp_bool_t
Boolean type.
void odp_sys_info_print(void)
Print system info.
@ ODP_THREAD_WORKER
Worker thread.
@ ODP_THREAD_CONTROL
Control thread.
#define ODP_TIME_SEC_IN_NS
A second in nanoseconds.
The OpenDataPlane API.
Global initialization parameters.
odp_mem_model_t mem_model
Application memory model.
Packet input queue parameters.
uint32_t num_queues
Number of input queues to be created.
odp_pktio_op_mode_t op_mode
Operation mode.
odp_queue_param_t queue_param
Queue parameters.
odp_pktin_hash_proto_t hash_proto
Protocol field selection for hashing.
odp_bool_t hash_enable
Enable flow hashing.
Packet IO capabilities.
odp_pktio_set_op_t set_op
Supported set operations.
uint32_t max_input_queues
Maximum number of input queues.
uint32_t max_output_queues
Maximum number of output queues.
Packet IO configuration options.
odp_pktio_parser_config_t parser
Packet input parser configuration.
Packet IO parameters.
odp_pktin_mode_t in_mode
Packet input mode.
odp_proto_layer_t layer
Protocol parsing level in packet input.
Packet output queue parameters.
odp_pktio_op_mode_t op_mode
Operation mode.
uint32_t num_queues
Number of output queues to be created.
uint32_t max_num
Maximum number of buffers of any size.
struct odp_pool_capability_t::@119 pkt
Packet pool capabilities
Pool parameters.
uint32_t uarea_size
Minimum user area size in bytes.
uint32_t num
Number of buffers in the pool.
odp_pool_type_t type
Pool type.
struct odp_pool_param_t::@123 pkt
Parameters for packet pools.
uint32_t len
Minimum length of 'num' packets.
uint32_t seg_len
Minimum number of packet data bytes that can be stored in the first segment of a newly allocated pack...
ODP Queue parameters.
odp_schedule_param_t sched
Scheduler parameters.
uint32_t size
Queue size.
odp_queue_type_t type
Queue type.
uint32_t context_len
Queue context data length.
void * context
Queue context pointer.
uint32_t max_ordered_locks
Maximum number of ordered locks per queue.
Schedule configuration.
uint32_t queue_size
Maximum number of events required to be stored simultaneously in scheduled queue.
odp_schedule_group_t group
Thread group.
odp_schedule_prio_t prio
Priority level.
uint32_t lock_count
Ordered lock count for this queue.
odp_schedule_sync_t sync
Synchronization method.
uint32_t ipv4_udp
IPv4 addresses and UDP port numbers.
struct odp_pktin_hash_proto_t::@99 proto
Protocol header fields for hashing.
struct odp_pktio_set_op_t::@104 op
Operation flags.
uint32_t promisc_mode
Promiscuous mode.