API Reference Manual  1.45.0
odp_ipfragreass.c

IPv4 lock-free fragmentation and reassembly example application

/* SPDX-License-Identifier: BSD-3-Clause
* Copyright (c) 2017-2018 Linaro Limited
*/
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <assert.h>
#include <odp/helper/odph_api.h>
#include "odp_ipfragreass_fragment.h"
#include "odp_ipfragreass_reassemble.h"
#include "odp_ipfragreass_helpers.h"
#define NUM_PACKETS 200
#define MAX_WORKERS 32
#define FRAGLISTS 16384
#define MIN_MF_FRAG_SIZE 576
#define MAX_PKT_LEN 8192
#define MAX_FRAGS_PER_PKT 6
#define POOL_MIN_SEG_LEN IP_HDR_LEN_MIN
#define POOL_UAREA_SIZE sizeof(struct packet)
#define MAX_FRAGS (MAX_FRAGS_PER_PKT * NUM_PACKETS)
static odp_queue_t fragments;
static odp_queue_t reassembled_pkts;
static odp_atomic_u32_t packets_reassembled;
static struct ODP_ALIGNED_CACHE {
uint32_t frags;
} thread_stats[MAX_WORKERS];
static odp_atomic_u128_t *fraglists;
static odp_barrier_t barrier;
static void init(odp_instance_t *instance, odp_pool_t *fragment_pool,
odp_shm_t *shm, odp_cpumask_t *cpumask, int *num_workers)
{
unsigned int seed = time(NULL);
int i;
odp_pool_param_t pool_params;
odp_queue_param_t frag_queue_params;
odp_queue_param_t reass_queue_params;
char cpumask_str[ODP_CPUMASK_STR_SIZE];
union fraglist init_data;
srand(seed);
printf("= Seed: %d\n", seed);
printf("= MTU: %d\n", MTU);
/* ODP initialisation */
if (odp_init_global(instance, NULL, NULL)) {
fprintf(stderr, "ERROR: ODP global init failed.\n");
exit(1);
}
fprintf(stderr, "ERROR: ODP local init failed.\n");
exit(1);
}
/* Create a pool for packet storage */
odp_pool_param_init(&pool_params);
pool_params.pkt.seg_len = POOL_MIN_SEG_LEN;
pool_params.pkt.len = MAX_PKT_LEN;
pool_params.pkt.num = 2 * MAX_FRAGS + MAX_WORKERS;
pool_params.pkt.uarea_size = POOL_UAREA_SIZE;
pool_params.type = ODP_POOL_PACKET;
*fragment_pool = odp_pool_create("packet pool", &pool_params);
if (*fragment_pool == ODP_POOL_INVALID) {
fprintf(stderr, "ERROR: packet pool create failed.\n");
exit(1);
}
/* Reserve (and initialise) shared memory for reassembly fraglists */
*shm = odp_shm_reserve("fraglists",
FRAGLISTS * sizeof(odp_atomic_u128_t),
ODP_CACHE_LINE_SIZE, 0);
if (*shm == ODP_SHM_INVALID) {
fprintf(stderr, "ERROR: odp_shm_reserve\n");
exit(1);
}
fraglists = odp_shm_addr(*shm);
if (fraglists == NULL) {
fprintf(stderr, "ERROR: odp_shm_addr\n");
exit(1);
}
init_fraglist(&init_data);
for (i = 0; i < FRAGLISTS; ++i)
odp_atomic_init_u128(&fraglists[i], init_data.raw);
/* Create a queue for holding fragments */
odp_queue_param_init(&frag_queue_params);
frag_queue_params.type = ODP_QUEUE_TYPE_PLAIN;
frag_queue_params.enq_mode = ODP_QUEUE_OP_MT_UNSAFE;
fragments = odp_queue_create("fragments", &frag_queue_params);
if (fragments == ODP_QUEUE_INVALID) {
fprintf(stderr, "ERROR: odp_queue_create\n");
exit(1);
}
/* Create a queue for holding reassembled packets */
odp_queue_param_init(&reass_queue_params);
reass_queue_params.type = ODP_QUEUE_TYPE_PLAIN;
reass_queue_params.deq_mode = ODP_QUEUE_OP_MT_UNSAFE;
reassembled_pkts = odp_queue_create("reassembled packets",
&reass_queue_params);
if (reassembled_pkts == ODP_QUEUE_INVALID) {
fprintf(stderr, "ERROR: odp_queue_create\n");
exit(1);
}
/* Set up worker threads */
*num_workers = odp_cpumask_default_worker(cpumask, *num_workers);
odp_barrier_init(&barrier, *num_workers + 1);
odp_cpumask_to_str(cpumask, cpumask_str, sizeof(cpumask_str));
printf("= Workers: %d\n", *num_workers);
printf("= CPU Mask: %s (first CPU: %d)\n\n", cpumask_str,
odp_cpumask_first(cpumask));
}
static int run_worker(void *arg ODP_UNUSED)
{
int threadno = odp_thread_id() - 1;
int iterations = 0;
odp_barrier_wait(&barrier);
while (odp_atomic_load_u32(&packets_reassembled) < NUM_PACKETS) {
odp_time_t timestamp;
odph_ipv4hdr_t *hdr;
struct packet *fragment;
int reassembled;
ev = odp_queue_deq(fragments);
if (ev == ODP_EVENT_INVALID)
break;
assert(odp_event_type(ev) == ODP_EVENT_PACKET);
++thread_stats[threadno].frags;
hdr = odp_packet_data(pkt);
fragment = odp_packet_user_area(pkt);
timestamp = odp_time_global();
assert(fragment != NULL);
assert(odp_packet_len(pkt) == ipv4hdr_payload_len(*hdr)
+ ipv4hdr_ihl(*hdr));
assert(!ipv4hdr_more_fragments(*hdr) ||
(odp_packet_len(pkt) >= MIN_MF_FRAG_SIZE &&
- ipv4hdr_ihl(*hdr)) % 8 == 0));
assert(odp_packet_len(pkt) <= MAX_PKT_LEN);
fragment->handle = pkt;
fragment->prev = NULL;
fragment->arrival.t = odp_time_to_ns(timestamp) / TS_RES_NS;
reassembled = reassemble_ipv4_packets(fraglists, FRAGLISTS,
fragment, 1,
reassembled_pkts);
if (reassembled > 0)
odp_atomic_add_u32(&packets_reassembled, reassembled);
/*
* Perform garbage collection of stale fragments every 50
* iterations. (In real applications, use a timer!)
*/
if (threadno == 0 && iterations++ > 50) {
iterations = 0;
garbage_collect_fraglists(fraglists, FRAGLISTS,
reassembled_pkts, 0);
}
}
while ((ev = odp_queue_deq(fragments)) != ODP_EVENT_INVALID) {
assert(odp_event_type(ev) == ODP_EVENT_PACKET);
}
return 0;
}
int main(int argc ODP_UNUSED, char *argv[] ODP_UNUSED)
{
odp_instance_t instance;
odp_pool_t fragment_pool;
odp_shm_t shm;
odp_cpumask_t cpumask;
odph_thread_t thread_tbl[MAX_WORKERS];
odph_thread_common_param_t thr_common;
odph_thread_param_t thr_param;
odp_packet_t dequeued_pkts[NUM_PACKETS];
odp_u16be_t ip_id = 0;
odp_packet_t orig_pkts[NUM_PACKETS];
odp_packet_t fragment_buffer[MAX_FRAGS];
int total_fragments = 0;
int i;
int num_workers = MAX_WORKERS;
int reassembled;
init(&instance, &fragment_pool, &shm, &cpumask, &num_workers);
/* Packet generation & fragmentation */
printf("\n= Fragmenting %d packets...\n", NUM_PACKETS);
for (i = 0; i < NUM_PACKETS; ++i) {
odp_packet_t packet;
int num_fragments;
packet = pack_udp_ipv4_packet(fragment_pool, ip_id++,
MAX_PKT_LEN,
MTU + IP_HDR_LEN_MAX + 1);
if (packet == ODP_PACKET_INVALID) {
fprintf(stderr, "ERROR: pack_udp_ipv4_packet\n");
return 1;
}
orig_pkts[i] = odp_packet_copy(packet, fragment_pool);
if (orig_pkts[i] == ODP_PACKET_INVALID) {
fprintf(stderr, "ERROR: odp_packet_copy\n");
return 1;
}
if (fragment_ipv4_packet(packet,
&fragment_buffer[total_fragments],
&num_fragments)) {
fprintf(stderr, "ERROR: fragment_ipv4_packet\n");
return 1;
}
total_fragments += num_fragments;
}
/* Shuffle the fragments around so they aren't necessarily in order */
printf("\n= Shuffling %d fragments...\n", total_fragments);
shuffle(fragment_buffer, total_fragments);
/* Insert the fragments into a queue for consumption */
for (i = 0; i < total_fragments; ++i) {
ev = odp_packet_to_event(fragment_buffer[i]);
if (odp_queue_enq(fragments, ev) < 0) {
fprintf(stderr, "ERROR: odp_queue_enq\n");
return 1;
}
}
/* Spawn the worker threads for reassembly */
odph_thread_common_param_init(&thr_common);
thr_common.instance = instance;
thr_common.cpumask = &cpumask;
thr_common.share_param = 1;
odph_thread_param_init(&thr_param);
thr_param.start = run_worker;
thr_param.arg = 0;
thr_param.thr_type = ODP_THREAD_WORKER;
memset(thread_tbl, 0, sizeof(thread_tbl));
odph_thread_create(thread_tbl, &thr_common, &thr_param, num_workers);
/* Go! */
printf("\n= Starting reassembly...\n");
odp_barrier_wait(&barrier);
/* Wait for all threads to complete and output statistics */
odph_thread_join(thread_tbl, num_workers);
for (i = 0; i < num_workers; ++i)
printf("=== Thread %02d processed %3d fragments\n", i,
thread_stats[i].frags);
/* Dequeue the reassembled packets */
for (reassembled = 0; (ev = odp_queue_deq(reassembled_pkts)) !=
ODP_EVENT_INVALID; ++reassembled) {
assert(reassembled < NUM_PACKETS);
assert(odp_event_type(ev) == ODP_EVENT_PACKET);
dequeued_pkts[reassembled] = odp_packet_from_event(ev);
}
/* Check reassembled packets against the originals */
printf("\n= Checking reassembled packets...\n");
for (i = 0; i < reassembled; ++i) {
int j = -1;
int k;
odp_packet_t packet = dequeued_pkts[i];
uint32_t len = odp_packet_len(packet);
odph_ipv4hdr_t hdr;
odph_ipv4hdr_t reassembled_hdr;
reassembled_hdr = *(odph_ipv4hdr_t *)odp_packet_data(packet);
for (k = 0; k < reassembled; ++k) {
hdr = *(odph_ipv4hdr_t *)odp_packet_data(orig_pkts[k]);
if (hdr.src_addr == reassembled_hdr.src_addr &&
hdr.dst_addr == reassembled_hdr.dst_addr &&
hdr.id == reassembled_hdr.id &&
hdr.proto == reassembled_hdr.proto) {
assert(j < 0);
j = k;
}
}
assert(j >= 0);
assert(odp_packet_is_valid(packet));
assert(len == odp_packet_len(orig_pkts[j]));
assert(!packet_memcmp(orig_pkts[j], packet, 0, 0, len));
}
printf("=== Successfully reassembled %d of %d packets\n", reassembled,
NUM_PACKETS);
assert(reassembled == NUM_PACKETS);
printf("\n= Complete!\n");
/* Free packets */
for (i = 0; i < reassembled; ++i)
odp_packet_free(dequeued_pkts[i]);
for (i = 0; i < NUM_PACKETS; ++i)
odp_packet_free(orig_pkts[i]);
garbage_collect_fraglists(fraglists, FRAGLISTS, reassembled_pkts, 1);
/* ODP cleanup and termination */
assert(!odp_queue_destroy(fragments));
assert(!odp_queue_destroy(reassembled_pkts));
assert(!odp_shm_free(shm));
if (odp_pool_destroy(fragment_pool)) {
fprintf(stderr,
"ERROR: fragment_pool destruction failed\n");
return 1;
}
if (odp_term_local()) {
fprintf(stderr, "ERROR: odp_term_local\n");
return 1;
}
if (odp_term_global(instance)) {
fprintf(stderr, "ERROR: odp_term_global\n");
return 1;
}
return 0;
}
void odp_atomic_add_u32(odp_atomic_u32_t *atom, uint32_t val)
Add to atomic uint32 variable.
uint32_t odp_atomic_load_u32(odp_atomic_u32_t *atom)
Load value of atomic uint32 variable.
void odp_atomic_init_u128(odp_atomic_u128_t *atom, odp_u128_t val)
Initialize atomic odp_u128_t variable.
void odp_barrier_init(odp_barrier_t *barr, int count)
Initialize barrier with thread count.
void odp_barrier_wait(odp_barrier_t *barr)
Synchronize thread execution on barrier.
#define ODP_ALIGNED_CACHE
Defines type/struct/variable to be cache line size aligned.
#define ODP_UNUSED
Intentionally unused variables of functions.
Definition: spec/hints.h:54
uint16_t odp_u16be_t
unsigned 16bit big endian
int odp_cpumask_default_worker(odp_cpumask_t *mask, int num)
Default CPU mask for worker threads.
int odp_cpumask_first(const odp_cpumask_t *mask)
Find first set CPU in mask.
int32_t odp_cpumask_to_str(const odp_cpumask_t *mask, char *str, int32_t size)
Format a string from CPU mask.
#define ODP_CPUMASK_STR_SIZE
The maximum number of characters needed to record any CPU mask as a string (output of odp_cpumask_to_...
odp_event_type_t odp_event_type(odp_event_t event)
Event type of an event.
#define ODP_EVENT_INVALID
Invalid event.
int odp_init_local(odp_instance_t instance, odp_thread_type_t thr_type)
Thread local ODP initialization.
int odp_init_global(odp_instance_t *instance, const odp_init_t *params, const odp_platform_init_t *platform_params)
Global ODP initialization.
int odp_term_local(void)
Thread local ODP termination.
int odp_term_global(odp_instance_t instance)
Global ODP termination.
uint64_t odp_instance_t
ODP instance ID.
odp_event_t odp_packet_to_event(odp_packet_t pkt)
Convert packet handle to event.
odp_packet_t odp_packet_copy(odp_packet_t pkt, odp_pool_t pool)
Full copy of a packet.
void * odp_packet_data(odp_packet_t pkt)
Packet data pointer.
void * odp_packet_user_area(odp_packet_t pkt)
User area address.
uint32_t odp_packet_len(odp_packet_t pkt)
Packet data length.
odp_packet_t odp_packet_from_event(odp_event_t ev)
Get packet handle from event.
void odp_packet_free(odp_packet_t pkt)
Free packet.
#define ODP_PACKET_INVALID
Invalid packet.
int odp_packet_is_valid(odp_packet_t pkt)
Check that packet is valid.
odp_pool_t odp_pool_create(const char *name, const odp_pool_param_t *param)
Create a pool.
void odp_pool_param_init(odp_pool_param_t *param)
Initialize pool params.
int odp_pool_destroy(odp_pool_t pool)
Destroy a pool previously created by odp_pool_create()
#define ODP_POOL_INVALID
Invalid pool.
@ ODP_POOL_PACKET
Packet pool.
void odp_queue_param_init(odp_queue_param_t *param)
Initialize queue params.
#define ODP_QUEUE_INVALID
Invalid queue.
odp_event_t odp_queue_deq(odp_queue_t queue)
Dequeue an event from a queue.
int odp_queue_enq(odp_queue_t queue, odp_event_t ev)
Enqueue an event to a queue.
odp_queue_t odp_queue_create(const char *name, const odp_queue_param_t *param)
Queue create.
int odp_queue_destroy(odp_queue_t queue)
Destroy ODP queue.
@ ODP_QUEUE_TYPE_PLAIN
Plain queue.
@ ODP_QUEUE_OP_MT_UNSAFE
Not multithread safe operation.
int odp_shm_free(odp_shm_t shm)
Free a contiguous block of shared memory.
#define ODP_SHM_INVALID
Invalid shared memory block.
void * odp_shm_addr(odp_shm_t shm)
Shared memory block address.
odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, uint32_t flags)
Reserve a contiguous block of shared memory.
int odp_thread_id(void)
Get thread identifier.
@ ODP_THREAD_WORKER
Worker thread.
@ ODP_THREAD_CONTROL
Control thread.
uint64_t odp_time_to_ns(odp_time_t time)
Convert time to nanoseconds.
odp_time_t odp_time_global(void)
Current global time.
Pool parameters.
uint32_t uarea_size
Minimum user area size in bytes.
uint32_t num
Number of buffers in the pool.
odp_pool_type_t type
Pool type.
struct odp_pool_param_t::@123 pkt
Parameters for packet pools.
uint32_t len
Minimum length of 'num' packets.
uint32_t seg_len
Minimum number of packet data bytes that can be stored in the first segment of a newly allocated pack...
ODP Queue parameters.
odp_queue_op_mode_t enq_mode
Enqueue mode.
odp_queue_type_t type
Queue type.
odp_queue_op_mode_t deq_mode
Dequeue mode.