Added support for reporting packet (mis)order.
[samplevnf.git] / VNFs / DPPD-PROX / handle_impair.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <string.h>
18 #include <stdio.h>
19 #include <rte_cycles.h>
20 #include <rte_version.h>
21
22 #include "prox_malloc.h"
23 #include "lconf.h"
24 #include "log.h"
25 #include "random.h"
26 #include "handle_impair.h"
27 #include "prefetch.h"
28 #include "prox_port_cfg.h"
29
30 #if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0)
31 #define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE
32 #endif
33
34 #define DELAY_ACCURACY  11              // accuracy of 2048 cycles ~= 1 micro-second
35 #define DELAY_MAX_MASK  0x1FFFFF        // Maximum 2M * 2K cycles ~1 second
36
37 struct queue_elem {
38         struct rte_mbuf *mbuf;
39         uint64_t        tsc;
40 };
41
42 struct queue {
43         struct queue_elem *queue_elem;
44         unsigned queue_head;
45         unsigned queue_tail;
46 };
47
48 struct task_impair {
49         struct task_base base;
50         struct queue_elem *queue;
51         uint32_t random_delay_us;
52         uint32_t delay_us;
53         uint64_t delay_time;
54         uint64_t delay_time_mask;
55         unsigned queue_head;
56         unsigned queue_tail;
57         unsigned queue_mask;
58         int tresh_no_drop;
59         int tresh_duplicate;
60         int tresh_delay;
61         unsigned int seed;
62         struct random state;
63         uint64_t last_idx;
64         struct queue *buffer;
65         uint32_t socket_id;
66         uint32_t flags;
67         uint8_t src_mac[6];
68 };
69
70 #define IMPAIR_NEED_UPDATE     1
71 #define IMPAIR_SET_MAC         2
72
73 static int handle_bulk_impair(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
74 static int handle_bulk_impair_random(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
75 static int handle_bulk_random_drop(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
76
77 void task_impair_set_proba_no_drop(struct task_base *tbase, float proba_no_drop)
78 {
79         struct task_impair *task = (struct task_impair *)tbase;
80         task->tresh_no_drop = ((uint64_t) RAND_MAX) * (uint32_t)(proba_no_drop * 10000) / 1000000;
81 }
82
83 void task_impair_set_proba_delay(struct task_base *tbase, float proba_delay)
84 {
85         struct task_impair *task = (struct task_impair *)tbase;
86         task->tresh_delay = ((uint64_t) RAND_MAX) * (uint32_t)(proba_delay * 10000) / 1000000;
87         task->flags |= IMPAIR_NEED_UPDATE;
88 }
89
90 void task_impair_set_proba_duplicate(struct task_base *tbase, float proba_dup)
91 {
92         struct task_impair *task = (struct task_impair *)tbase;
93         task->tresh_duplicate = ((uint64_t) RAND_MAX) * (uint32_t)(proba_dup * 10000) / 1000000;
94 }
95
96 void task_impair_set_delay_us(struct task_base *tbase, uint32_t delay_us, uint32_t random_delay_us)
97 {
98         struct task_impair *task = (struct task_impair *)tbase;
99         task->flags |= IMPAIR_NEED_UPDATE;
100         task->random_delay_us = random_delay_us;
101         task->delay_us = delay_us;
102 }
103
104 static void task_impair_update(struct task_base *tbase)
105 {
106         struct task_impair *task = (struct task_impair *)tbase;
107         uint32_t queue_len = 0;
108         size_t mem_size;
109         if ((task->flags & IMPAIR_NEED_UPDATE) == 0)
110                 return;
111         task->flags &= ~IMPAIR_NEED_UPDATE;
112         uint64_t now = rte_rdtsc();
113         uint8_t out[MAX_PKT_BURST] = {0};
114         uint64_t now_idx = (now >> DELAY_ACCURACY) & DELAY_MAX_MASK;
115
116         if (task->random_delay_us) {
117                 tbase->handle_bulk = handle_bulk_impair_random;
118                 task->delay_time = usec_to_tsc(task->random_delay_us);
119                 task->delay_time_mask = rte_align32pow2(task->delay_time) - 1;
120                 queue_len = rte_align32pow2((1250L * task->random_delay_us) / 84 / (DELAY_MAX_MASK + 1));
121         } else if (task->delay_us == 0) {
122                 tbase->handle_bulk = handle_bulk_random_drop;
123                 task->delay_time = 0;
124         } else {
125                 tbase->handle_bulk = handle_bulk_impair;
126                 task->delay_time = usec_to_tsc(task->delay_us);
127                 queue_len = rte_align32pow2(1250 * task->delay_us / 84);
128         }
129         if (task->queue) {
130                 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
131                 while (task->queue_tail != task->queue_head) {
132                         now = rte_rdtsc();
133                         uint16_t idx = 0;
134                         while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
135                                 if (task->queue[task->queue_tail].tsc <= now) {
136                                         out[idx] = rand_r(&task->seed) <= task->tresh_no_drop? 0 : OUT_DISCARD;
137                                         new_mbufs[idx++] = task->queue[task->queue_tail].mbuf;
138                                         task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
139                                 }
140                                 else {
141                                         break;
142                                 }
143                         }
144                         if (idx)
145                                 task->base.tx_pkt(&task->base, new_mbufs, idx, out);
146                 }
147                 prox_free(task->queue);
148                 task->queue = NULL;
149         }
150         if (task->buffer) {
151                 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
152                 while (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK)) {
153                         now = rte_rdtsc();
154                         uint16_t pkt_idx = 0;
155                         while ((pkt_idx < MAX_PKT_BURST) && (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK))) {
156                                 struct queue *queue = &task->buffer[task->last_idx];
157                                 while ((pkt_idx < MAX_PKT_BURST) && (queue->queue_tail != queue->queue_head)) {
158                                         out[pkt_idx] = rand_r(&task->seed) <= task->tresh_no_drop? 0 : OUT_DISCARD;
159                                         new_mbufs[pkt_idx++] = queue->queue_elem[queue->queue_tail].mbuf;
160                                         queue->queue_tail = (queue->queue_tail + 1) & task->queue_mask;
161                                 }
162                                 task->last_idx = (task->last_idx + 1) & DELAY_MAX_MASK;
163                         }
164
165                         if (pkt_idx)
166                                 task->base.tx_pkt(&task->base, new_mbufs, pkt_idx, out);
167                 }
168                 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
169                         if (task->buffer[i].queue_elem)
170                                 prox_free(task->buffer[i].queue_elem);
171                 }
172                 prox_free(task->buffer);
173                 task->buffer = NULL;
174         }
175
176         if (queue_len < MAX_PKT_BURST)
177                 queue_len= MAX_PKT_BURST;
178         task->queue_mask = queue_len - 1;
179         if (task->queue_mask < MAX_PKT_BURST - 1)
180                 task->queue_mask = MAX_PKT_BURST - 1;
181         mem_size = (task->queue_mask + 1) * sizeof(task->queue[0]);
182
183         if (task->delay_us) {
184                 task->queue_head = 0;
185                 task->queue_tail = 0;
186                 task->queue = prox_zmalloc(mem_size, task->socket_id);
187                 if (task->queue == NULL) {
188                         plog_err("Not enough memory to allocate queue\n");
189                         task->queue_mask = 0;
190                 }
191         } else if (task->random_delay_us) {
192                 size_t size = (DELAY_MAX_MASK + 1) * sizeof(struct queue);
193                 plog_info("\t\tAllocating %zd bytes\n", size);
194                 task->buffer = prox_zmalloc(size, task->socket_id);
195                 PROX_PANIC(task->buffer == NULL, "Not enough memory to allocate buffer\n");
196                 plog_info("\t\tAllocating %d x %zd bytes\n", DELAY_MAX_MASK + 1, mem_size);
197
198                 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
199                         task->buffer[i].queue_elem = prox_zmalloc(mem_size, task->socket_id);
200                         PROX_PANIC(task->buffer[i].queue_elem == NULL, "Not enough memory to allocate buffer elems\n");
201                 }
202         }
203         random_init_seed(&task->state);
204 }
205
206 static int handle_bulk_random_drop(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
207 {
208         struct task_impair *task = (struct task_impair *)tbase;
209         uint8_t out[MAX_PKT_BURST];
210         prox_rte_ether_hdr * hdr[MAX_PKT_BURST];
211         int ret = 0;
212         for (uint16_t i = 0; i < n_pkts; ++i) {
213                 PREFETCH0(mbufs[i]);
214         }
215         for (uint16_t i = 0; i < n_pkts; ++i) {
216                 hdr[i] = rte_pktmbuf_mtod(mbufs[i], prox_rte_ether_hdr *);
217                 PREFETCH0(hdr[i]);
218         }
219         if (task->flags & IMPAIR_SET_MAC) {
220                 for (uint16_t i = 0; i < n_pkts; ++i) {
221                         prox_rte_ether_addr_copy((prox_rte_ether_addr *)&task->src_mac[0], &hdr[i]->s_addr);
222                         out[i] = rand_r(&task->seed) <= task->tresh_no_drop? 0 : OUT_DISCARD;
223                 }
224         } else {
225                 for (uint16_t i = 0; i < n_pkts; ++i) {
226                         out[i] = rand_r(&task->seed) <= task->tresh_no_drop? 0 : OUT_DISCARD;
227                 }
228         }
229         ret = task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
230         task_impair_update(tbase);
231         return ret;
232 }
233
234 static int handle_bulk_impair(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
235 {
236         struct task_impair *task = (struct task_impair *)tbase;
237         uint64_t now = rte_rdtsc();
238         uint8_t out[MAX_PKT_BURST] = {0};
239         uint16_t enqueue_failed;
240         uint16_t i;
241         int ret = 0;
242         prox_rte_ether_hdr * hdr[MAX_PKT_BURST];
243         for (uint16_t i = 0; i < n_pkts; ++i) {
244                 PREFETCH0(mbufs[i]);
245         }
246         for (uint16_t i = 0; i < n_pkts; ++i) {
247                 hdr[i] = rte_pktmbuf_mtod(mbufs[i], prox_rte_ether_hdr *);
248                 PREFETCH0(hdr[i]);
249         }
250
251         int nb_empty_slots = (task->queue_tail - task->queue_head + task->queue_mask) & task->queue_mask;
252         if (likely(nb_empty_slots >= n_pkts)) {
253                 /* We know n_pkts fits, no need to check for every packet */
254                 for (i = 0; i < n_pkts; ++i) {
255                         if (task->flags & IMPAIR_SET_MAC)
256                                 prox_rte_ether_addr_copy((prox_rte_ether_addr *)&task->src_mac[0], &hdr[i]->s_addr);
257                         task->queue[task->queue_head].tsc = now + task->delay_time;
258                         task->queue[task->queue_head].mbuf = mbufs[i];
259                         task->queue_head = (task->queue_head + 1) & task->queue_mask;
260                 }
261         } else {
262                 for (i = 0; i < n_pkts; ++i) {
263                         if (((task->queue_head + 1) & task->queue_mask) != task->queue_tail) {
264                                 if (task->flags & IMPAIR_SET_MAC)
265                                         prox_rte_ether_addr_copy((prox_rte_ether_addr *)&task->src_mac[0], &hdr[i]->s_addr);
266                                 task->queue[task->queue_head].tsc = now + task->delay_time;
267                                 task->queue[task->queue_head].mbuf = mbufs[i];
268                                 task->queue_head = (task->queue_head + 1) & task->queue_mask;
269                         }
270                         else {
271                                 /* Rest does not fit, need to drop those packets. */
272                                 enqueue_failed = i;
273                                 for (;i < n_pkts; ++i) {
274                                         out[i] = OUT_DISCARD;
275                                 }
276                                 ret+= task->base.tx_pkt(&task->base, mbufs + enqueue_failed,
277                                                 n_pkts - enqueue_failed, out + enqueue_failed);
278                                 break;
279                         }
280                 }
281         }
282
283         struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
284         uint16_t idx = 0;
285
286         if (task->tresh_no_drop != RAND_MAX) {
287                 while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
288                         if (task->queue[task->queue_tail].tsc <= now) {
289                                 out[idx] = rand_r(&task->seed) <= task->tresh_no_drop? 0 : OUT_DISCARD;
290                                 new_mbufs[idx] = task->queue[task->queue_tail].mbuf;
291                                 PREFETCH0(new_mbufs[idx]);
292                                 PREFETCH0(&new_mbufs[idx]->cacheline1);
293                                 idx++;
294                                 task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
295                         }
296                         else {
297                                 break;
298                         }
299                 }
300         } else {
301                 while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
302                         if (task->queue[task->queue_tail].tsc <= now) {
303                                 out[idx] = 0;
304                                 new_mbufs[idx] = task->queue[task->queue_tail].mbuf;
305                                 PREFETCH0(new_mbufs[idx]);
306                                 PREFETCH0(&new_mbufs[idx]->cacheline1);
307                                 idx++;
308                                 task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
309                         }
310                         else {
311                                 break;
312                         }
313                 }
314         }
315
316         if (idx)
317                 ret+= task->base.tx_pkt(&task->base, new_mbufs, idx, out);
318         task_impair_update(tbase);
319         return ret;
320 }
321
322 /*
323  * We want to avoid using division and mod for performance reasons.
324  * We also want to support up to one second delay, and express it in tsc
325  * So the delay in tsc needs up to 32 bits (supposing procesor freq is less than 4GHz).
326  * If the max_delay is smaller, we make sure we use less bits.
327  * Note that we lose the MSB of the xorshift - 64 bits could hold
328  * two or three delays in TSC - but would probably make implementation more complex
329  * and not huge gain expected. Maybe room for optimization.
330  * Using this implementation, we might have to run random more than once for a delay
331  * but in average this should occur less than 50% of the time.
332 */
333
334 static inline uint64_t random_delay(struct random *state, uint64_t max_delay, uint64_t max_delay_mask)
335 {
336         uint64_t val;
337         while(1) {
338                 val = random_next(state);
339                 if ((val & max_delay_mask) < max_delay)
340                         return (val & max_delay_mask);
341         }
342 }
343
344 static int handle_bulk_impair_random(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
345 {
346         struct task_impair *task = (struct task_impair *)tbase;
347         uint64_t now = rte_rdtsc();
348         uint8_t out[MAX_PKT_BURST];
349         uint16_t enqueue_failed;
350         uint16_t i;
351         int ret = 0;
352         uint64_t packet_time, idx;
353         uint64_t now_idx = (now >> DELAY_ACCURACY) & DELAY_MAX_MASK;
354         prox_rte_ether_hdr * hdr[MAX_PKT_BURST];
355         for (uint16_t i = 0; i < n_pkts; ++i) {
356                 PREFETCH0(mbufs[i]);
357         }
358         for (uint16_t i = 0; i < n_pkts; ++i) {
359                 hdr[i] = rte_pktmbuf_mtod(mbufs[i], prox_rte_ether_hdr *);
360                 PREFETCH0(hdr[i]);
361         }
362
363         for (i = 0; i < n_pkts; ++i) {
364                 if (rand_r(&task->seed) <= task->tresh_delay)
365                         packet_time = now + random_delay(&task->state, task->delay_time, task->delay_time_mask);
366                 else
367                         packet_time = now;
368                 idx = (packet_time >> DELAY_ACCURACY) & DELAY_MAX_MASK;
369                 while (idx != ((now_idx - 1) & DELAY_MAX_MASK)) {
370                         struct queue *queue = &task->buffer[idx];
371                         if (((queue->queue_head + 1) & task->queue_mask) != queue->queue_tail) {
372                                 if (task->flags & IMPAIR_SET_MAC)
373                                         prox_rte_ether_addr_copy((prox_rte_ether_addr *)&task->src_mac[0], &hdr[i]->s_addr);
374                                 queue->queue_elem[queue->queue_head].mbuf = mbufs[i];
375                                 queue->queue_head = (queue->queue_head + 1) & task->queue_mask;
376                                 break;
377                         } else {
378                                 idx = (idx + 1) & DELAY_MAX_MASK;
379                         }
380                 }
381                 if (idx == ((now_idx - 1) & DELAY_MAX_MASK)) {
382                         /* Rest does not fit, need to drop packet. Note that further packets might fit as might want to be sent earlier */
383                         out[0] = OUT_DISCARD;
384                         ret+= task->base.tx_pkt(&task->base, mbufs + i, 1, out);
385                         plog_warn("Unexpectdly dropping packets\n");
386                 }
387 #if RTE_VERSION >= RTE_VERSION_NUM(19,11,0,0)
388                 if (rand_r(&task->seed) <= task->tresh_duplicate) {
389                         mbufs[i] = rte_pktmbuf_copy(mbufs[i], mbufs[i]->pool, 0, UINT32_MAX);
390                         if (mbufs[i] == NULL) {
391                                 plog_err("Failed to duplicate mbuf\n");
392                         } else
393                                 i = i - 1;
394                 }
395 #endif
396         }
397
398         struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
399         uint16_t pkt_idx = 0;
400
401         while ((pkt_idx < MAX_PKT_BURST) && (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK))) {
402                 struct queue *queue = &task->buffer[task->last_idx];
403                 while ((pkt_idx < MAX_PKT_BURST) && (queue->queue_tail != queue->queue_head)) {
404                         out[pkt_idx] = rand_r(&task->seed) <= task->tresh_no_drop? 0 : OUT_DISCARD;
405                         new_mbufs[pkt_idx] = queue->queue_elem[queue->queue_tail].mbuf;
406                         PREFETCH0(new_mbufs[pkt_idx]);
407                         PREFETCH0(&new_mbufs[pkt_idx]->cacheline1);
408                         pkt_idx++;
409                         queue->queue_tail = (queue->queue_tail + 1) & task->queue_mask;
410                 }
411                 task->last_idx = (task->last_idx + 1) & DELAY_MAX_MASK;
412         }
413
414         if (pkt_idx)
415                 ret+= task->base.tx_pkt(&task->base, new_mbufs, pkt_idx, out);
416         task_impair_update(tbase);
417         return ret;
418 }
419
420 static void init_task(struct task_base *tbase, struct task_args *targ)
421 {
422         struct task_impair *task = (struct task_impair *)tbase;
423         uint32_t queue_len = 0;
424         size_t mem_size;
425         unsigned socket_id;
426         uint64_t delay_us = 0;
427
428         task->seed = rte_rdtsc();
429
430         task->tresh_no_drop = ((uint64_t) RAND_MAX) * targ->probability_no_drop / 1000000;
431         task->tresh_delay = ((uint64_t) RAND_MAX) * targ->probability_delay / 1000000;
432         task->tresh_duplicate = ((uint64_t) RAND_MAX) * targ->probability_duplicate / 1000000;
433
434         if ((targ->delay_us == 0) && (targ->random_delay_us == 0)) {
435                 tbase->handle_bulk = handle_bulk_random_drop;
436                 task->delay_time = 0;
437         } else if (targ->random_delay_us) {
438                 tbase->handle_bulk = handle_bulk_impair_random;
439                 task->delay_time = usec_to_tsc(targ->random_delay_us);
440                 task->delay_time_mask = rte_align32pow2(task->delay_time) - 1;
441                 delay_us = targ->random_delay_us;
442                 queue_len = rte_align32pow2((1250L * delay_us) / 84 / (DELAY_MAX_MASK + 1));
443         } else {
444                 task->delay_time = usec_to_tsc(targ->delay_us);
445                 delay_us = targ->delay_us;
446                 queue_len = rte_align32pow2(1250 * delay_us / 84);
447         }
448         /* Assume Line-rate is maximum transmit speed.
449            TODO: take link speed if tx is port.
450         */
451         if (queue_len < MAX_PKT_BURST)
452                 queue_len= MAX_PKT_BURST;
453         task->queue_mask = queue_len - 1;
454         if (task->queue_mask < MAX_PKT_BURST - 1)
455                 task->queue_mask = MAX_PKT_BURST - 1;
456
457         mem_size = (task->queue_mask + 1) * sizeof(task->queue[0]);
458         socket_id = rte_lcore_to_socket_id(targ->lconf->id);
459         task->socket_id = rte_lcore_to_socket_id(targ->lconf->id);
460
461         if (targ->delay_us) {
462                 task->queue = prox_zmalloc(mem_size, socket_id);
463                 PROX_PANIC(task->queue == NULL, "Not enough memory to allocate queue\n");
464                 task->queue_head = 0;
465                 task->queue_tail = 0;
466         } else if (targ->random_delay_us) {
467                 size_t size = (DELAY_MAX_MASK + 1) * sizeof(struct queue);
468                 plog_info("\t\tAllocating %zd bytes\n", size);
469                 task->buffer = prox_zmalloc(size, socket_id);
470                 PROX_PANIC(task->buffer == NULL, "Not enough memory to allocate buffer\n");
471                 plog_info("\t\tAllocating %d x %zd bytes\n", DELAY_MAX_MASK + 1, mem_size);
472
473                 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
474                         task->buffer[i].queue_elem = prox_zmalloc(mem_size, socket_id);
475                         PROX_PANIC(task->buffer[i].queue_elem == NULL, "Not enough memory to allocate buffer elems\n");
476                 }
477         }
478         random_init_seed(&task->state);
479         if (targ->nb_txports) {
480                 memcpy(&task->src_mac[0], &prox_port_cfg[tbase->tx_params_hw.tx_port_queue[0].port].eth_addr, sizeof(prox_rte_ether_addr));
481                 task->flags = IMPAIR_SET_MAC;
482         } else {
483                 task->flags = 0;
484         }
485 }
486
487 static struct task_init tinit = {
488         .mode_str = "impair",
489         .init = init_task,
490         .handle = handle_bulk_impair,
491         .flag_features = TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS | TASK_FEATURE_ZERO_RX,
492         .size = sizeof(struct task_impair)
493 };
494
495 __attribute__((constructor)) static void ctor(void)
496 {
497         reg_task(&tinit);
498 }