API Reference Manual  1.46.0
odp_ipfragreass.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2017-2018 Linaro Limited
3  */
4 
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <time.h>
16 #include <assert.h>
17 
18 #include <odp/helper/odph_api.h>
19 
20 #include "odp_ipfragreass_fragment.h"
21 #include "odp_ipfragreass_reassemble.h"
22 #include "odp_ipfragreass_helpers.h"
23 
24 #define NUM_PACKETS 200
25 #define MAX_WORKERS 32
26 #define FRAGLISTS 16384
28 #define MIN_MF_FRAG_SIZE 576
29 #define MAX_PKT_LEN 8192
30 #define MAX_FRAGS_PER_PKT 6
35 #define POOL_MIN_SEG_LEN IP_HDR_LEN_MIN
36 #define POOL_UAREA_SIZE sizeof(struct packet)
37 #define MAX_FRAGS (MAX_FRAGS_PER_PKT * NUM_PACKETS)
38 
40 static odp_queue_t fragments;
41 
43 static odp_queue_t reassembled_pkts;
44 
46 static odp_atomic_u32_t packets_reassembled;
47 
49 static struct ODP_ALIGNED_CACHE {
50  uint32_t frags;
51 } thread_stats[MAX_WORKERS];
52 
54 static odp_atomic_u128_t *fraglists;
55 
57 static odp_barrier_t barrier;
58 
68 static void init(odp_instance_t *instance, odp_pool_t *fragment_pool,
69  odp_shm_t *shm, odp_cpumask_t *cpumask, int *num_workers)
70 {
71  unsigned int seed = time(NULL);
72  int i;
73  odp_pool_param_t pool_params;
74  odp_queue_param_t frag_queue_params;
75  odp_queue_param_t reass_queue_params;
76  char cpumask_str[ODP_CPUMASK_STR_SIZE];
77  union fraglist init_data;
78 
79  srand(seed);
80  printf("= Seed: %d\n", seed);
81  printf("= MTU: %d\n", MTU);
82 
83  /* ODP initialisation */
84  if (odp_init_global(instance, NULL, NULL)) {
85  fprintf(stderr, "ERROR: ODP global init failed.\n");
86  exit(1);
87  }
88  if (odp_init_local(*instance, ODP_THREAD_CONTROL)) {
89  fprintf(stderr, "ERROR: ODP local init failed.\n");
90  exit(1);
91  }
92 
93  /* Create a pool for packet storage */
94  odp_pool_param_init(&pool_params);
95  pool_params.pkt.seg_len = POOL_MIN_SEG_LEN;
96  pool_params.pkt.len = MAX_PKT_LEN;
97  pool_params.pkt.num = 2 * MAX_FRAGS + MAX_WORKERS;
98  pool_params.pkt.uarea_size = POOL_UAREA_SIZE;
99  pool_params.type = ODP_POOL_PACKET;
100  *fragment_pool = odp_pool_create("packet pool", &pool_params);
101  if (*fragment_pool == ODP_POOL_INVALID) {
102  fprintf(stderr, "ERROR: packet pool create failed.\n");
103  exit(1);
104  }
105 
106  /* Reserve (and initialise) shared memory for reassembly fraglists */
107  *shm = odp_shm_reserve("fraglists",
108  FRAGLISTS * sizeof(odp_atomic_u128_t),
109  ODP_CACHE_LINE_SIZE, 0);
110  if (*shm == ODP_SHM_INVALID) {
111  fprintf(stderr, "ERROR: odp_shm_reserve\n");
112  exit(1);
113  }
114  fraglists = odp_shm_addr(*shm);
115  if (fraglists == NULL) {
116  fprintf(stderr, "ERROR: odp_shm_addr\n");
117  exit(1);
118  }
119 
120  init_fraglist(&init_data);
121  for (i = 0; i < FRAGLISTS; ++i)
122  odp_atomic_init_u128(&fraglists[i], init_data.raw);
123 
124  /* Create a queue for holding fragments */
125  odp_queue_param_init(&frag_queue_params);
126  frag_queue_params.type = ODP_QUEUE_TYPE_PLAIN;
127  frag_queue_params.enq_mode = ODP_QUEUE_OP_MT_UNSAFE;
128  fragments = odp_queue_create("fragments", &frag_queue_params);
129  if (fragments == ODP_QUEUE_INVALID) {
130  fprintf(stderr, "ERROR: odp_queue_create\n");
131  exit(1);
132  }
133 
134  /* Create a queue for holding reassembled packets */
135  odp_queue_param_init(&reass_queue_params);
136  reass_queue_params.type = ODP_QUEUE_TYPE_PLAIN;
137  reass_queue_params.deq_mode = ODP_QUEUE_OP_MT_UNSAFE;
138  reassembled_pkts = odp_queue_create("reassembled packets",
139  &reass_queue_params);
140  if (reassembled_pkts == ODP_QUEUE_INVALID) {
141  fprintf(stderr, "ERROR: odp_queue_create\n");
142  exit(1);
143  }
144 
145  /* Set up worker threads */
146  *num_workers = odp_cpumask_default_worker(cpumask, *num_workers);
147  odp_barrier_init(&barrier, *num_workers + 1);
148  odp_cpumask_to_str(cpumask, cpumask_str, sizeof(cpumask_str));
149  printf("= Workers: %d\n", *num_workers);
150  printf("= CPU Mask: %s (first CPU: %d)\n\n", cpumask_str,
151  odp_cpumask_first(cpumask));
152 }
153 
167 static int run_worker(void *arg ODP_UNUSED)
168 {
169  int threadno = odp_thread_id() - 1;
170  int iterations = 0;
171  odp_event_t ev;
172 
173  odp_barrier_wait(&barrier);
174  while (odp_atomic_load_u32(&packets_reassembled) < NUM_PACKETS) {
175  odp_packet_t pkt;
176  odp_time_t timestamp;
177  odph_ipv4hdr_t *hdr;
178  struct packet *fragment;
179  int reassembled;
180 
181  ev = odp_queue_deq(fragments);
182  if (ev == ODP_EVENT_INVALID)
183  break;
184  assert(odp_event_type(ev) == ODP_EVENT_PACKET);
185  ++thread_stats[threadno].frags;
186 
187  pkt = odp_packet_from_event(ev);
188  hdr = odp_packet_data(pkt);
189  fragment = odp_packet_user_area(pkt);
190  timestamp = odp_time_global();
191  assert(fragment != NULL);
192  assert(odp_packet_len(pkt) == ipv4hdr_payload_len(*hdr)
193  + ipv4hdr_ihl(*hdr));
194  assert(!ipv4hdr_more_fragments(*hdr) ||
195  (odp_packet_len(pkt) >= MIN_MF_FRAG_SIZE &&
196  (odp_packet_len(pkt)
197  - ipv4hdr_ihl(*hdr)) % 8 == 0));
198  assert(odp_packet_len(pkt) <= MAX_PKT_LEN);
199  fragment->handle = pkt;
200  fragment->prev = NULL;
201  fragment->arrival.t = odp_time_to_ns(timestamp) / TS_RES_NS;
202 
203  reassembled = reassemble_ipv4_packets(fraglists, FRAGLISTS,
204  fragment, 1,
205  reassembled_pkts);
206  if (reassembled > 0)
207  odp_atomic_add_u32(&packets_reassembled, reassembled);
208 
209  /*
210  * Perform garbage collection of stale fragments every 50
211  * iterations. (In real applications, use a timer!)
212  */
213  if (threadno == 0 && iterations++ > 50) {
214  iterations = 0;
215  garbage_collect_fraglists(fraglists, FRAGLISTS,
216  reassembled_pkts, 0);
217  }
218  }
219 
220  while ((ev = odp_queue_deq(fragments)) != ODP_EVENT_INVALID) {
221  assert(odp_event_type(ev) == ODP_EVENT_PACKET);
223  }
224 
225  return 0;
226 }
227 
231 int main(int argc ODP_UNUSED, char *argv[] ODP_UNUSED)
232 {
233  odp_instance_t instance;
234  odp_pool_t fragment_pool;
235  odp_shm_t shm;
236  odp_cpumask_t cpumask;
237  odph_thread_t thread_tbl[MAX_WORKERS];
238  odph_thread_common_param_t thr_common;
239  odph_thread_param_t thr_param;
240  odp_packet_t dequeued_pkts[NUM_PACKETS];
241  odp_event_t ev;
242  odp_u16be_t ip_id = 0;
243  odp_packet_t orig_pkts[NUM_PACKETS];
244  odp_packet_t fragment_buffer[MAX_FRAGS];
245  int total_fragments = 0;
246  int i;
247  int num_workers = MAX_WORKERS;
248  int reassembled;
249 
250  init(&instance, &fragment_pool, &shm, &cpumask, &num_workers);
251 
252  /* Packet generation & fragmentation */
253  printf("\n= Fragmenting %d packets...\n", NUM_PACKETS);
254  for (i = 0; i < NUM_PACKETS; ++i) {
255  odp_packet_t packet;
256  int num_fragments;
257 
258  packet = pack_udp_ipv4_packet(fragment_pool, ip_id++,
259  MAX_PKT_LEN,
260  MTU + IP_HDR_LEN_MAX + 1);
261  if (packet == ODP_PACKET_INVALID) {
262  fprintf(stderr, "ERROR: pack_udp_ipv4_packet\n");
263  return 1;
264  }
265 
266  orig_pkts[i] = odp_packet_copy(packet, fragment_pool);
267  if (orig_pkts[i] == ODP_PACKET_INVALID) {
268  fprintf(stderr, "ERROR: odp_packet_copy\n");
269  return 1;
270  }
271 
272  if (fragment_ipv4_packet(packet,
273  &fragment_buffer[total_fragments],
274  &num_fragments)) {
275  fprintf(stderr, "ERROR: fragment_ipv4_packet\n");
276  return 1;
277  }
278 
279  total_fragments += num_fragments;
280  }
281 
282  /* Shuffle the fragments around so they aren't necessarily in order */
283  printf("\n= Shuffling %d fragments...\n", total_fragments);
284  shuffle(fragment_buffer, total_fragments);
285 
286  /* Insert the fragments into a queue for consumption */
287  for (i = 0; i < total_fragments; ++i) {
288  ev = odp_packet_to_event(fragment_buffer[i]);
289 
290  if (odp_queue_enq(fragments, ev) < 0) {
291  fprintf(stderr, "ERROR: odp_queue_enq\n");
292  return 1;
293  }
294  }
295 
296  /* Spawn the worker threads for reassembly */
297  odph_thread_common_param_init(&thr_common);
298  thr_common.instance = instance;
299  thr_common.cpumask = &cpumask;
300  thr_common.share_param = 1;
301 
302  odph_thread_param_init(&thr_param);
303  thr_param.start = run_worker;
304  thr_param.arg = 0;
305  thr_param.thr_type = ODP_THREAD_WORKER;
306 
307  memset(thread_tbl, 0, sizeof(thread_tbl));
308  odph_thread_create(thread_tbl, &thr_common, &thr_param, num_workers);
309 
310  /* Go! */
311  printf("\n= Starting reassembly...\n");
312  odp_barrier_wait(&barrier);
313 
314  /* Wait for all threads to complete and output statistics */
315  odph_thread_join(thread_tbl, num_workers);
316  for (i = 0; i < num_workers; ++i)
317  printf("=== Thread %02d processed %3d fragments\n", i,
318  thread_stats[i].frags);
319 
320  /* Dequeue the reassembled packets */
321  for (reassembled = 0; (ev = odp_queue_deq(reassembled_pkts)) !=
322  ODP_EVENT_INVALID; ++reassembled) {
323  assert(reassembled < NUM_PACKETS);
324  assert(odp_event_type(ev) == ODP_EVENT_PACKET);
325  dequeued_pkts[reassembled] = odp_packet_from_event(ev);
326  }
327 
328  /* Check reassembled packets against the originals */
329  printf("\n= Checking reassembled packets...\n");
330  for (i = 0; i < reassembled; ++i) {
331  int j = -1;
332  int k;
333  odp_packet_t packet = dequeued_pkts[i];
334  uint32_t len = odp_packet_len(packet);
335  odph_ipv4hdr_t hdr;
336  odph_ipv4hdr_t reassembled_hdr;
337 
338  reassembled_hdr = *(odph_ipv4hdr_t *)odp_packet_data(packet);
339  for (k = 0; k < reassembled; ++k) {
340  hdr = *(odph_ipv4hdr_t *)odp_packet_data(orig_pkts[k]);
341  if (hdr.src_addr == reassembled_hdr.src_addr &&
342  hdr.dst_addr == reassembled_hdr.dst_addr &&
343  hdr.id == reassembled_hdr.id &&
344  hdr.proto == reassembled_hdr.proto) {
345  assert(j < 0);
346  j = k;
347  }
348  }
349  assert(j >= 0);
350 
351  assert(odp_packet_is_valid(packet));
352  assert(len == odp_packet_len(orig_pkts[j]));
353  assert(!packet_memcmp(orig_pkts[j], packet, 0, 0, len));
354  }
355  printf("=== Successfully reassembled %d of %d packets\n", reassembled,
356  NUM_PACKETS);
357  assert(reassembled == NUM_PACKETS);
358  printf("\n= Complete!\n");
359 
360  /* Free packets */
361  for (i = 0; i < reassembled; ++i)
362  odp_packet_free(dequeued_pkts[i]);
363  for (i = 0; i < NUM_PACKETS; ++i)
364  odp_packet_free(orig_pkts[i]);
365  garbage_collect_fraglists(fraglists, FRAGLISTS, reassembled_pkts, 1);
366 
367  /* ODP cleanup and termination */
368  assert(!odp_queue_destroy(fragments));
369  assert(!odp_queue_destroy(reassembled_pkts));
370  assert(!odp_shm_free(shm));
371  if (odp_pool_destroy(fragment_pool)) {
372  fprintf(stderr,
373  "ERROR: fragment_pool destruction failed\n");
374  return 1;
375  }
376  if (odp_term_local()) {
377  fprintf(stderr, "ERROR: odp_term_local\n");
378  return 1;
379  }
380  if (odp_term_global(instance)) {
381  fprintf(stderr, "ERROR: odp_term_global\n");
382  return 1;
383  }
384 
385  return 0;
386 }
void odp_atomic_add_u32(odp_atomic_u32_t *atom, uint32_t val)
Add to atomic uint32 variable.
uint32_t odp_atomic_load_u32(odp_atomic_u32_t *atom)
Load value of atomic uint32 variable.
void odp_atomic_init_u128(odp_atomic_u128_t *atom, odp_u128_t val)
Initialize atomic odp_u128_t variable.
void odp_barrier_init(odp_barrier_t *barr, int count)
Initialize barrier with thread count.
void odp_barrier_wait(odp_barrier_t *barr)
Synchronize thread execution on barrier.
#define ODP_ALIGNED_CACHE
Defines type/struct/variable to be cache line size aligned.
#define ODP_UNUSED
Intentionally unused variables of functions.
Definition: spec/hints.h:54
uint16_t odp_u16be_t
unsigned 16bit big endian
int odp_cpumask_default_worker(odp_cpumask_t *mask, int num)
Default CPU mask for worker threads.
int odp_cpumask_first(const odp_cpumask_t *mask)
Find first set CPU in mask.
int32_t odp_cpumask_to_str(const odp_cpumask_t *mask, char *str, int32_t size)
Format a string from CPU mask.
#define ODP_CPUMASK_STR_SIZE
The maximum number of characters needed to record any CPU mask as a string (output of odp_cpumask_to_...
odp_event_type_t odp_event_type(odp_event_t event)
Event type of an event.
#define ODP_EVENT_INVALID
Invalid event.
int odp_init_local(odp_instance_t instance, odp_thread_type_t thr_type)
Thread local ODP initialization.
int odp_init_global(odp_instance_t *instance, const odp_init_t *params, const odp_platform_init_t *platform_params)
Global ODP initialization.
int odp_term_local(void)
Thread local ODP termination.
int odp_term_global(odp_instance_t instance)
Global ODP termination.
uint64_t odp_instance_t
ODP instance ID.
odp_event_t odp_packet_to_event(odp_packet_t pkt)
Convert packet handle to event.
odp_packet_t odp_packet_copy(odp_packet_t pkt, odp_pool_t pool)
Full copy of a packet.
void * odp_packet_data(odp_packet_t pkt)
Packet data pointer.
void * odp_packet_user_area(odp_packet_t pkt)
User area address.
uint32_t odp_packet_len(odp_packet_t pkt)
Packet data length.
odp_packet_t odp_packet_from_event(odp_event_t ev)
Get packet handle from event.
void odp_packet_free(odp_packet_t pkt)
Free packet.
#define ODP_PACKET_INVALID
Invalid packet.
int odp_packet_is_valid(odp_packet_t pkt)
Check that packet is valid.
odp_pool_t odp_pool_create(const char *name, const odp_pool_param_t *param)
Create a pool.
void odp_pool_param_init(odp_pool_param_t *param)
Initialize pool params.
int odp_pool_destroy(odp_pool_t pool)
Destroy a pool previously created by odp_pool_create()
#define ODP_POOL_INVALID
Invalid pool.
@ ODP_POOL_PACKET
Packet pool.
void odp_queue_param_init(odp_queue_param_t *param)
Initialize queue params.
#define ODP_QUEUE_INVALID
Invalid queue.
odp_event_t odp_queue_deq(odp_queue_t queue)
Dequeue an event from a queue.
int odp_queue_enq(odp_queue_t queue, odp_event_t ev)
Enqueue an event to a queue.
odp_queue_t odp_queue_create(const char *name, const odp_queue_param_t *param)
Queue create.
int odp_queue_destroy(odp_queue_t queue)
Destroy ODP queue.
@ ODP_QUEUE_TYPE_PLAIN
Plain queue.
@ ODP_QUEUE_OP_MT_UNSAFE
Not multithread safe operation.
int odp_shm_free(odp_shm_t shm)
Free a contiguous block of shared memory.
#define ODP_SHM_INVALID
Invalid shared memory block.
void * odp_shm_addr(odp_shm_t shm)
Shared memory block address.
odp_shm_t odp_shm_reserve(const char *name, uint64_t size, uint64_t align, uint32_t flags)
Reserve a contiguous block of shared memory.
int odp_thread_id(void)
Get thread identifier.
@ ODP_THREAD_WORKER
Worker thread.
@ ODP_THREAD_CONTROL
Control thread.
uint64_t odp_time_to_ns(odp_time_t time)
Convert time to nanoseconds.
odp_time_t odp_time_global(void)
Current global time.
Pool parameters.
uint32_t uarea_size
Minimum user area size in bytes.
uint32_t num
Number of buffers in the pool.
odp_pool_type_t type
Pool type.
uint32_t len
Minimum length of 'num' packets.
uint32_t seg_len
Minimum number of packet data bytes that can be stored in the first segment of a newly allocated pack...
struct odp_pool_param_t::@126 pkt
Parameters for packet pools.
ODP Queue parameters.
odp_queue_op_mode_t enq_mode
Enqueue mode.
odp_queue_type_t type
Queue type.
odp_queue_op_mode_t deq_mode
Dequeue mode.