API Reference Manual  1.45.0
odp_queue_perf.c

Performance test application for queue APIs

/* SPDX-License-Identifier: BSD-3-Clause
* Copyright (c) 2018 Linaro Limited
* Copyright (c) 2021-2023 Nokia
*/
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <inttypes.h>
#include <stdlib.h>
#include <getopt.h>
#include <odp_api.h>
#include <odp/helper/odph_api.h>
#define MAX_QUEUES (32 * 1024)
typedef struct test_options_t {
uint32_t num_queue;
uint32_t num_event;
uint32_t num_round;
uint32_t max_burst;
int single;
int num_cpu;
} test_options_t;
typedef struct test_stat_t {
uint64_t rounds;
uint64_t events;
uint64_t nsec;
uint64_t cycles;
uint64_t deq_retry;
uint64_t enq_retry;
} test_stat_t;
typedef struct test_global_t {
odp_barrier_t barrier;
test_options_t options;
odp_instance_t instance;
odp_shm_t shm;
odp_pool_t pool;
odp_queue_t queue[MAX_QUEUES];
odph_thread_t thread_tbl[ODP_THREAD_COUNT_MAX];
test_stat_t stat[ODP_THREAD_COUNT_MAX];
} test_global_t;
static void print_usage(void)
{
printf("\n"
"Plain queue performance test\n"
"\n"
"Usage: odp_queue_perf [options]\n"
"\n"
" -c, --num_cpu Number of worker threads. Default: 1\n"
" -q, --num_queue Number of queues. Default: 1\n"
" -e, --num_event Number of events per queue. Default: 1\n"
" -b, --burst_size Maximum number of events per operation. Default: 1\n"
" -r, --num_round Number of rounds\n"
" -l, --lockfree Lockfree queues\n"
" -w, --waitfree Waitfree queues\n"
" -s, --single Single producer, single consumer\n"
" -h, --help This help\n"
"\n");
}
static int parse_options(int argc, char *argv[], test_options_t *test_options)
{
int opt;
int long_index;
int ret = 0;
static const struct option longopts[] = {
{"num_cpu", required_argument, NULL, 'c'},
{"num_queue", required_argument, NULL, 'q'},
{"num_event", required_argument, NULL, 'e'},
{"burst_size", required_argument, NULL, 'b'},
{"num_round", required_argument, NULL, 'r'},
{"lockfree", no_argument, NULL, 'l'},
{"waitfree", no_argument, NULL, 'w'},
{"single", no_argument, NULL, 's'},
{"help", no_argument, NULL, 'h'},
{NULL, 0, NULL, 0}
};
static const char *shortopts = "+c:q:e:b:r:lwsh";
test_options->num_cpu = 1;
test_options->num_queue = 1;
test_options->num_event = 1;
test_options->max_burst = 1;
test_options->num_round = 1000;
test_options->nonblock = ODP_BLOCKING;
test_options->single = 0;
while (1) {
opt = getopt_long(argc, argv, shortopts, longopts, &long_index);
if (opt == -1)
break;
switch (opt) {
case 'c':
test_options->num_cpu = atoi(optarg);
break;
case 'q':
test_options->num_queue = atoi(optarg);
break;
case 'e':
test_options->num_event = atoi(optarg);
break;
case 'b':
test_options->max_burst = atoi(optarg);
break;
case 'r':
test_options->num_round = atoi(optarg);
break;
case 'l':
test_options->nonblock = ODP_NONBLOCKING_LF;
break;
case 'w':
test_options->nonblock = ODP_NONBLOCKING_WF;
break;
case 's':
test_options->single = 1;
break;
case 'h':
/* fall through */
default:
print_usage();
ret = -1;
break;
}
}
if (test_options->num_queue > MAX_QUEUES) {
printf("Too many queues %u. Test maximum %u.\n",
test_options->num_queue, MAX_QUEUES);
return -1;
}
return ret;
}
static int create_queues(test_global_t *global)
{
odp_pool_param_t pool_param;
odp_queue_param_t queue_param;
odp_pool_t pool;
uint32_t i, j, max_size, max_num;
test_options_t *test_options = &global->options;
odp_nonblocking_t nonblock = test_options->nonblock;
uint32_t num_queue = test_options->num_queue;
uint32_t num_event = test_options->num_event;
uint32_t num_round = test_options->num_round;
uint32_t tot_event = num_queue * num_event;
int ret = 0;
odp_queue_t *queue = global->queue;
odp_event_t event[tot_event];
printf("\nTesting %s queues\n",
nonblock == ODP_BLOCKING ? "NORMAL" :
(nonblock == ODP_NONBLOCKING_LF ? "LOCKFREE" :
(nonblock == ODP_NONBLOCKING_WF ? "WAITFREE" : "???")));
printf(" num rounds %u\n", num_round);
printf(" num queues %u\n", num_queue);
printf(" num events per queue %u\n", num_event);
printf(" max burst size %u\n", test_options->max_burst);
for (i = 0; i < num_queue; i++)
queue[i] = ODP_QUEUE_INVALID;
for (i = 0; i < tot_event; i++)
event[i] = ODP_EVENT_INVALID;
if (odp_queue_capability(&queue_capa)) {
printf("Error: Queue capa failed.\n");
return -1;
}
if (odp_pool_capability(&pool_capa)) {
printf("Error: Pool capa failed.\n");
return -1;
}
if (nonblock == ODP_BLOCKING) {
if (num_queue > queue_capa.plain.max_num) {
printf("Max queues supported %u\n",
queue_capa.plain.max_num);
return -1;
}
max_size = queue_capa.plain.max_size;
if (max_size && num_event > max_size) {
printf("Max queue size supported %u\n", max_size);
return -1;
}
} else if (nonblock == ODP_NONBLOCKING_LF) {
if (queue_capa.plain.lockfree.max_num == 0) {
printf("Lockfree queues not supported\n");
return -1;
}
if (num_queue > queue_capa.plain.lockfree.max_num) {
printf("Max lockfree queues supported %u\n",
queue_capa.plain.lockfree.max_num);
return -1;
}
max_size = queue_capa.plain.lockfree.max_size;
if (max_size && num_event > max_size) {
printf("Max lockfree queue size supported %u\n",
max_size);
return -1;
}
} else if (nonblock == ODP_NONBLOCKING_WF) {
if (queue_capa.plain.waitfree.max_num == 0) {
printf("Waitfree queues not supported\n");
return -1;
}
if (num_queue > queue_capa.plain.waitfree.max_num) {
printf("Max waitfree queues supported %u\n",
queue_capa.plain.waitfree.max_num);
return -1;
}
max_size = queue_capa.plain.waitfree.max_size;
if (max_size && num_event > max_size) {
printf("Max waitfree queue size supported %u\n",
max_size);
return -1;
}
} else {
printf("Error: Bad queue blocking type\n");
return -1;
}
max_num = pool_capa.buf.max_num;
if (max_num && tot_event > max_num) {
printf("Error: max events supported %u\n", max_num);
return -1;
}
odp_pool_param_init(&pool_param);
pool_param.type = ODP_POOL_BUFFER;
pool_param.buf.num = tot_event;
pool = odp_pool_create("queue perf pool", &pool_param);
if (pool == ODP_POOL_INVALID) {
printf("Error: Pool create failed.\n");
return -1;
}
global->pool = pool;
odp_queue_param_init(&queue_param);
queue_param.type = ODP_QUEUE_TYPE_PLAIN;
queue_param.nonblocking = nonblock;
queue_param.size = num_event;
if (test_options->single) {
}
for (i = 0; i < num_queue; i++) {
queue[i] = odp_queue_create(NULL, &queue_param);
if (queue[i] == ODP_QUEUE_INVALID) {
printf("Error: Queue create failed %u.\n", i);
return -1;
}
}
for (i = 0; i < tot_event; i++) {
if (event[i] == ODP_EVENT_INVALID) {
printf("Error: Event alloc failed %u.\n", i);
ret = -1;
goto free_events;
}
}
for (i = 0; i < num_queue; i++) {
for (j = 0; j < num_event; j++) {
uint32_t id = i * num_event + j;
if (odp_queue_enq(queue[i], event[id])) {
printf("Error: Queue enq failed %u/%u\n", i, j);
ret = -1;
goto free_events;
}
event[id] = ODP_EVENT_INVALID;
}
}
free_events:
/* Free events that were not stored into queues */
for (i = 0; i < tot_event; i++) {
if (event[i] != ODP_EVENT_INVALID)
odp_event_free(event[i]);
}
return ret;
}
static int destroy_queues(test_global_t *global)
{
uint32_t i, j;
int ret = 0;
test_options_t *test_options = &global->options;
uint32_t num_queue = test_options->num_queue;
uint32_t num_event = test_options->num_event;
odp_queue_t *queue = global->queue;
odp_pool_t pool = global->pool;
for (i = 0; i < num_queue; i++) {
if (queue[i] == ODP_QUEUE_INVALID) {
printf("Error: Invalid queue handle (i: %u).\n", i);
break;
}
for (j = 0; j < num_event; j++) {
ev = odp_queue_deq(queue[i]);
if (ev != ODP_EVENT_INVALID)
}
if (odp_queue_destroy(queue[i])) {
printf("Error: Queue destroy failed %u.\n", i);
ret = -1;
break;
}
}
if (pool != ODP_POOL_INVALID && odp_pool_destroy(pool)) {
printf("Error: Pool destroy failed.\n");
ret = -1;
}
return ret;
}
static int run_test(void *arg)
{
uint64_t c1, c2, cycles, nsec;
odp_time_t t1, t2;
uint32_t rounds;
int num_ev;
test_stat_t *stat;
test_global_t *global = arg;
test_options_t *test_options = &global->options;
odp_queue_t queue;
uint64_t num_deq_retry = 0;
uint64_t num_enq_retry = 0;
uint64_t events = 0;
uint32_t num_queue = test_options->num_queue;
uint32_t num_round = test_options->num_round;
int thr = odp_thread_id();
int ret = 0;
uint32_t i = 0;
uint32_t max_burst = test_options->max_burst;
odp_event_t ev[max_burst];
stat = &global->stat[thr];
/* Start all workers at the same time */
odp_barrier_wait(&global->barrier);
for (rounds = 0; rounds < num_round; rounds++) {
int num_enq = 0;
do {
queue = global->queue[i++];
if (i == num_queue)
i = 0;
num_ev = odp_queue_deq_multi(queue, ev, max_burst);
if (odp_unlikely(num_ev < 0))
ODPH_ABORT("odp_queue_deq_multi() failed\n");
if (odp_unlikely(num_ev == 0))
num_deq_retry++;
} while (num_ev == 0);
while (num_enq < num_ev) {
int num = odp_queue_enq_multi(queue, &ev[num_enq], num_ev - num_enq);
if (odp_unlikely(num < 0))
ODPH_ABORT("odp_queue_enq_multi() failed\n");
num_enq += num;
if (odp_unlikely(num_enq != num_ev))
num_enq_retry++;
}
events += num_ev;
}
nsec = odp_time_diff_ns(t2, t1);
cycles = odp_cpu_cycles_diff(c2, c1);
stat->rounds = rounds;
stat->events = events;
stat->nsec = nsec;
stat->cycles = cycles;
stat->deq_retry = num_deq_retry;
stat->enq_retry = num_enq_retry;
return ret;
}
static int start_workers(test_global_t *global)
{
odph_thread_common_param_t thr_common;
odph_thread_param_t thr_param;
odp_cpumask_t cpumask;
int ret;
test_options_t *test_options = &global->options;
int num_cpu = test_options->num_cpu;
ret = odp_cpumask_default_worker(&cpumask, num_cpu);
if (num_cpu && ret != num_cpu) {
printf("Error: Too many workers. Max supported %i\n.", ret);
return -1;
}
/* Zero: all available workers */
if (num_cpu == 0) {
num_cpu = ret;
test_options->num_cpu = num_cpu;
}
printf(" num workers %u\n\n", num_cpu);
odp_barrier_init(&global->barrier, num_cpu);
odph_thread_common_param_init(&thr_common);
thr_common.instance = global->instance;
thr_common.cpumask = &cpumask;
thr_common.share_param = 1;
odph_thread_param_init(&thr_param);
thr_param.start = run_test;
thr_param.arg = global;
thr_param.thr_type = ODP_THREAD_WORKER;
if (odph_thread_create(global->thread_tbl, &thr_common, &thr_param,
num_cpu) != num_cpu)
return -1;
return 0;
}
static void print_stat(test_global_t *global)
{
int i, num;
double rounds_ave, events_ave, nsec_ave, cycles_ave;
test_options_t *test_options = &global->options;
int num_cpu = test_options->num_cpu;
uint64_t rounds_sum = 0;
uint64_t events_sum = 0;
uint64_t nsec_sum = 0;
uint64_t cycles_sum = 0;
uint64_t deq_retry_sum = 0;
uint64_t enq_retry_sum = 0;
/* Averages */
for (i = 0; i < ODP_THREAD_COUNT_MAX; i++) {
rounds_sum += global->stat[i].rounds;
events_sum += global->stat[i].events;
nsec_sum += global->stat[i].nsec;
cycles_sum += global->stat[i].cycles;
deq_retry_sum += global->stat[i].deq_retry;
enq_retry_sum += global->stat[i].enq_retry;
}
if (rounds_sum == 0) {
printf("No results.\n");
return;
}
rounds_ave = rounds_sum / num_cpu;
events_ave = events_sum / num_cpu;
nsec_ave = nsec_sum / num_cpu;
cycles_ave = cycles_sum / num_cpu;
num = 0;
printf("RESULTS - per thread (Million events per sec):\n");
printf("----------------------------------------------\n");
printf(" 1 2 3 4 5 6 7 8 9 10");
for (i = 0; i < ODP_THREAD_COUNT_MAX; i++) {
if (global->stat[i].rounds) {
if ((num % 10) == 0)
printf("\n ");
printf("%6.1f ", (1000.0 * global->stat[i].events) /
global->stat[i].nsec);
num++;
}
}
printf("\n\n");
printf("RESULTS - per thread average (%i threads):\n", num_cpu);
printf("------------------------------------------\n");
printf(" duration: %.3f msec\n", nsec_ave / 1000000);
printf(" num cycles: %.3f M\n", cycles_ave / 1000000);
printf(" events per dequeue: %.3f\n",
events_ave / rounds_ave);
printf(" cycles per event: %.3f\n",
cycles_ave / events_ave);
printf(" dequeue retries: %" PRIu64 "\n", deq_retry_sum);
printf(" enqueue retries: %" PRIu64 "\n", enq_retry_sum);
printf(" events per sec: %.3f M\n\n",
(1000.0 * events_ave) / nsec_ave);
printf("TOTAL events per sec: %.3f M\n\n",
(1000.0 * events_sum) / nsec_ave);
}
int main(int argc, char **argv)
{
odph_helper_options_t helper_options;
odp_instance_t instance;
odp_init_t init;
odp_shm_t shm;
test_global_t *global;
/* Let helper collect its own arguments (e.g. --odph_proc) */
argc = odph_parse_options(argc, argv);
if (odph_options(&helper_options)) {
ODPH_ERR("Error: Reading ODP helper options failed.\n");
exit(EXIT_FAILURE);
}
/* List features not to be used */
init.not_used.feat.cls = 1;
init.not_used.feat.crypto = 1;
init.not_used.feat.ipsec = 1;
init.not_used.feat.timer = 1;
init.not_used.feat.tm = 1;
init.mem_model = helper_options.mem_model;
/* Init ODP before calling anything else */
if (odp_init_global(&instance, &init, NULL)) {
printf("Error: Global init failed.\n");
return -1;
}
/* Init this thread */
printf("Error: Local init failed.\n");
return -1;
}
shm = odp_shm_reserve("queue_perf_global", sizeof(test_global_t), ODP_CACHE_LINE_SIZE, 0);
if (shm == ODP_SHM_INVALID) {
ODPH_ERR("Error: Shared mem reserve failed.\n");
exit(EXIT_FAILURE);
}
global = odp_shm_addr(shm);
if (global == NULL) {
ODPH_ERR("Error: Shared mem alloc failed\n");
exit(EXIT_FAILURE);
}
memset(global, 0, sizeof(test_global_t));
if (parse_options(argc, argv, &global->options))
return -1;
global->instance = instance;
if (create_queues(global)) {
printf("Error: Create queues failed.\n");
goto destroy;
}
if (start_workers(global)) {
printf("Error: Test start failed.\n");
return -1;
}
/* Wait workers to exit */
odph_thread_join(global->thread_tbl, global->options.num_cpu);
print_stat(global);
destroy:
if (destroy_queues(global)) {
printf("Error: Destroy queues failed.\n");
return -1;
}
if (odp_shm_free(shm)) {
ODPH_ERR("Error: Shared mem free failed.\n");
exit(EXIT_FAILURE);
}
if (odp_term_local()) {
printf("Error: term local failed.\n");
return -1;
}
if (odp_term_global(instance)) {
printf("Error: term global failed.\n");
return -1;
}
return 0;
}
void odp_barrier_init(odp_barrier_t *barr, int count)
Initialize barrier with thread count.
void odp_barrier_wait(odp_barrier_t *barr)
Synchronize thread execution on barrier.
odp_event_t odp_buffer_to_event(odp_buffer_t buf)
Convert buffer handle to event.
odp_buffer_t odp_buffer_alloc(odp_pool_t pool)
Buffer alloc.
#define odp_unlikely(x)
Branch unlikely taken.
Definition: spec/hints.h:64
uint64_t odp_cpu_cycles_diff(uint64_t c2, uint64_t c1)
CPU cycle count difference.
uint64_t odp_cpu_cycles(void)
Current CPU cycle count.
int odp_cpumask_default_worker(odp_cpumask_t *mask, int num)
Default CPU mask for worker threads.
void odp_event_free(odp_event_t event)
Free event.
#define ODP_EVENT_INVALID
Invalid event.
void odp_init_param_init(odp_init_t *param)
Initialize the odp_init_t to default values for all fields.
int odp_init_local(odp_instance_t instance, odp_thread_type_t thr_type)
Thread local ODP initialization.
int odp_init_global(odp_instance_t *instance, const odp_init_t *params, const odp_platform_init_t *platform_params)
Global ODP initialization.
int odp_term_local(void)
Thread local ODP termination.
int odp_term_global(odp_instance_t instance)
Global ODP termination.
uint64_t odp_instance_t
ODP instance ID.
odp_pool_t odp_pool_create(const char *name, const odp_pool_param_t *param)
Create a pool.
int odp_pool_capability(odp_pool_capability_t *capa)
Query pool capabilities.
void odp_pool_param_init(odp_pool_param_t *param)
Initialize pool params.
int odp_pool_destroy(odp_pool_t pool)
Destroy a pool previously created by odp_pool_create()
#define ODP_POOL_INVALID
Invalid pool.
@ ODP_POOL_BUFFER
Buffer pool.
odp_nonblocking_t
Non-blocking level.
int odp_queue_enq_multi(odp_queue_t queue, const odp_event_t events[], int num)
Enqueue multiple events to a queue.
void odp_queue_param_init(odp_queue_param_t *param)
Initialize queue params.
int odp_queue_capability(odp_queue_capability_t *capa)
Query queue capabilities.
#define ODP_QUEUE_INVALID
Invalid queue.
odp_event_t odp_queue_deq(odp_queue_t queue)
Dequeue an event from a queue.
int odp_queue_enq(odp_queue_t queue, odp_event_t ev)
Enqueue an event to a queue.
odp_queue_t odp_queue_create(const char *name, const odp_queue_param_t *param)
Queue create.
int odp_queue_destroy(odp_queue_t queue)
Destroy ODP queue.
int odp_queue_deq_multi(odp_queue_t queue, odp_event_t events[], int num)
Dequeue multiple events from a queue.
@ ODP_NONBLOCKING_WF
Non-blocking and wait-free implementation.
@ ODP_BLOCKING
Blocking implementation.
@ ODP_NONBLOCKING_LF
Non-blocking and lock-free implementation.
@ ODP_QUEUE_TYPE_PLAIN
Plain queue.
@ ODP_QUEUE_OP_MT_UNSAFE
Not multithread safe operation.
int odp_shm_free(odp_shm_t shm)
Free a contiguous block of shared memory.
#define ODP_SHM_INVALID
Invalid shared memory block.
void * odp_shm_addr(odp_shm_t shm)
Shared memory block address.
odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, uint32_t flags)
Reserve a contiguous block of shared memory.
void odp_sys_info_print(void)
Print system info.
#define ODP_THREAD_COUNT_MAX
Maximum number of threads supported in build time.
int odp_thread_id(void)
Get thread identifier.
@ ODP_THREAD_WORKER
Worker thread.
odp_time_t odp_time_local(void)
Current local time.
uint64_t odp_time_diff_ns(odp_time_t t2, odp_time_t t1)
Time difference in nanoseconds.
The OpenDataPlane API.
Global initialization parameters.
odp_mem_model_t mem_model
Application memory model.
odp_feature_t not_used
Unused features.
struct odp_pool_capability_t::@118 buf
Buffer pool capabilities
uint32_t max_num
Maximum number of buffers of any size.
Pool parameters.
uint32_t num
Number of buffers in the pool.
struct odp_pool_param_t::@122 buf
Parameters for buffer pools.
odp_pool_type_t type
Pool type.
uint32_t max_size
Maximum number of events a plain (ODP_BLOCKING) queue can store simultaneously.
uint32_t max_num
Maximum number of plain (ODP_BLOCKING) queues of the default size.
struct odp_queue_capability_t::@137 plain
Plain queue capabilities.
struct odp_queue_capability_t::@137::@139 waitfree
Wait-free (ODP_NONBLOCKING_WF) implementation capabilities.
struct odp_queue_capability_t::@137::@138 lockfree
Lock-free (ODP_NONBLOCKING_LF) implementation capabilities.
ODP Queue parameters.
odp_queue_op_mode_t enq_mode
Enqueue mode.
uint32_t size
Queue size.
odp_queue_type_t type
Queue type.
odp_queue_op_mode_t deq_mode
Dequeue mode.
odp_nonblocking_t nonblocking
Non-blocking level.
struct odp_feature_t::@145 feat
Individual feature bits.
uint32_t tm
Traffic Manager APIs, e.g., odp_tm_xxx()
uint32_t crypto
Crypto APIs, e.g., odp_crypto_xxx()
uint32_t ipsec
IPsec APIs, e.g., odp_ipsec_xxx()
uint32_t timer
Timer APIs, e.g., odp_timer_xxx(), odp_timeout_xxx()
uint32_t cls
Classifier APIs, e.g., odp_cls_xxx(), odp_cos_xxx()
uint32_t schedule
Scheduler APIs, e.g., odp_schedule_xxx()
uint32_t compress
Compression APIs, e.g., odp_comp_xxx()