Merge "Rework handle_esp.c (proto, DPDK<17.08, cleanup)"
[samplevnf.git] / VNFs / DPPD-PROX / handle_impair.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <string.h>
18 #include <stdio.h>
19 #include <rte_cycles.h>
20 #include <rte_version.h>
21
22 #include "prox_malloc.h"
23 #include "lconf.h"
24 #include "log.h"
25 #include "random.h"
26 #include "handle_impair.h"
27 #include "prefetch.h"
28 #include "prox_port_cfg.h"
29
30 #if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0)
31 #define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE
32 #endif
33
34 #define DELAY_ACCURACY  11              // accuracy of 2048 cycles ~= 1 micro-second
35 #define DELAY_MAX_MASK  0x1FFFFF        // Maximum 2M * 2K cycles ~1 second
36
37 struct queue_elem {
38         struct rte_mbuf *mbuf;
39         uint64_t        tsc;
40 };
41
42 struct queue {
43         struct queue_elem *queue_elem;
44         unsigned queue_head;
45         unsigned queue_tail;
46 };
47
48 struct task_impair {
49         struct task_base base;
50         struct queue_elem *queue;
51         uint32_t random_delay_us;
52         uint32_t delay_us;
53         uint64_t delay_time;
54         uint64_t delay_time_mask;
55         unsigned queue_head;
56         unsigned queue_tail;
57         unsigned queue_mask;
58         int tresh;
59         unsigned int seed;
60         struct random state;
61         uint64_t last_idx;
62         struct queue *buffer;
63         uint32_t socket_id;
64         uint32_t flags;
65         uint8_t src_mac[6];
66 };
67
68 #define IMPAIR_NEED_UPDATE     1
69 #define IMPAIR_SET_MAC         2
70
71 static int handle_bulk_impair(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
72 static int handle_bulk_impair_random(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
73 static int handle_bulk_random_drop(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
74
75 void task_impair_set_proba(struct task_base *tbase, float proba)
76 {
77         struct task_impair *task = (struct task_impair *)tbase;
78         task->tresh = ((uint64_t) RAND_MAX) * (uint32_t)(proba * 10000) / 1000000;
79 }
80
81 void task_impair_set_delay_us(struct task_base *tbase, uint32_t delay_us, uint32_t random_delay_us)
82 {
83         struct task_impair *task = (struct task_impair *)tbase;
84         task->flags |= IMPAIR_NEED_UPDATE;
85         task->random_delay_us = random_delay_us;
86         task->delay_us = delay_us;
87 }
88
89 static void task_impair_update(struct task_base *tbase)
90 {
91         struct task_impair *task = (struct task_impair *)tbase;
92         uint32_t queue_len = 0;
93         size_t mem_size;
94         if ((task->flags & IMPAIR_NEED_UPDATE) == 0)
95                 return;
96         task->flags &= ~IMPAIR_NEED_UPDATE;
97         uint64_t now = rte_rdtsc();
98         uint8_t out[MAX_PKT_BURST] = {0};
99         uint64_t now_idx = (now >> DELAY_ACCURACY) & DELAY_MAX_MASK;
100
101         if (task->random_delay_us) {
102                 tbase->handle_bulk = handle_bulk_impair_random;
103                 task->delay_time = usec_to_tsc(task->random_delay_us);
104                 task->delay_time_mask = rte_align32pow2(task->delay_time) - 1;
105                 queue_len = rte_align32pow2((1250L * task->random_delay_us) / 84 / (DELAY_MAX_MASK + 1));
106         } else if (task->delay_us == 0) {
107                 tbase->handle_bulk = handle_bulk_random_drop;
108                 task->delay_time = 0;
109         } else {
110                 tbase->handle_bulk = handle_bulk_impair;
111                 task->delay_time = usec_to_tsc(task->delay_us);
112                 queue_len = rte_align32pow2(1250 * task->delay_us / 84);
113         }
114         if (task->queue) {
115                 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
116                 while (task->queue_tail != task->queue_head) {
117                         now = rte_rdtsc();
118                         uint16_t idx = 0;
119                         while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
120                                 if (task->queue[task->queue_tail].tsc <= now) {
121                                         out[idx] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
122                                         new_mbufs[idx++] = task->queue[task->queue_tail].mbuf;
123                                         task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
124                                 }
125                                 else {
126                                         break;
127                                 }
128                         }
129                         if (idx)
130                                 task->base.tx_pkt(&task->base, new_mbufs, idx, out);
131                 }
132                 prox_free(task->queue);
133                 task->queue = NULL;
134         }
135         if (task->buffer) {
136                 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
137                 while (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK)) {
138                         now = rte_rdtsc();
139                         uint16_t pkt_idx = 0;
140                         while ((pkt_idx < MAX_PKT_BURST) && (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK))) {
141                                 struct queue *queue = &task->buffer[task->last_idx];
142                                 while ((pkt_idx < MAX_PKT_BURST) && (queue->queue_tail != queue->queue_head)) {
143                                         out[pkt_idx] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
144                                         new_mbufs[pkt_idx++] = queue->queue_elem[queue->queue_tail].mbuf;
145                                         queue->queue_tail = (queue->queue_tail + 1) & task->queue_mask;
146                                 }
147                                 task->last_idx = (task->last_idx + 1) & DELAY_MAX_MASK;
148                         }
149
150                         if (pkt_idx)
151                                 task->base.tx_pkt(&task->base, new_mbufs, pkt_idx, out);
152                 }
153                 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
154                         if (task->buffer[i].queue_elem)
155                                 prox_free(task->buffer[i].queue_elem);
156                 }
157                 prox_free(task->buffer);
158                 task->buffer = NULL;
159         }
160
161         if (queue_len < MAX_PKT_BURST)
162                 queue_len= MAX_PKT_BURST;
163         task->queue_mask = queue_len - 1;
164         if (task->queue_mask < MAX_PKT_BURST - 1)
165                 task->queue_mask = MAX_PKT_BURST - 1;
166         mem_size = (task->queue_mask + 1) * sizeof(task->queue[0]);
167
168         if (task->delay_us) {
169                 task->queue_head = 0;
170                 task->queue_tail = 0;
171                 task->queue = prox_zmalloc(mem_size, task->socket_id);
172                 if (task->queue == NULL) {
173                         plog_err("Not enough memory to allocate queue\n");
174                         task->queue_mask = 0;
175                 }
176         } else if (task->random_delay_us) {
177                 size_t size = (DELAY_MAX_MASK + 1) * sizeof(struct queue);
178                 plog_info("Allocating %zd bytes\n", size);
179                 task->buffer = prox_zmalloc(size, task->socket_id);
180                 PROX_PANIC(task->buffer == NULL, "Not enough memory to allocate buffer\n");
181                 plog_info("Allocating %d x %zd bytes\n", DELAY_MAX_MASK + 1, mem_size);
182
183                 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
184                         task->buffer[i].queue_elem = prox_zmalloc(mem_size, task->socket_id);
185                         PROX_PANIC(task->buffer[i].queue_elem == NULL, "Not enough memory to allocate buffer elems\n");
186                 }
187         }
188         random_init_seed(&task->state);
189 }
190
191 static int handle_bulk_random_drop(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
192 {
193         struct task_impair *task = (struct task_impair *)tbase;
194         uint8_t out[MAX_PKT_BURST];
195         struct ether_hdr * hdr[MAX_PKT_BURST];
196         for (uint16_t i = 0; i < n_pkts; ++i) {
197                 PREFETCH0(mbufs[i]);
198         }
199         for (uint16_t i = 0; i < n_pkts; ++i) {
200                 hdr[i] = rte_pktmbuf_mtod(mbufs[i], struct ether_hdr *);
201                 PREFETCH0(hdr[i]);
202         }
203         if (task->flags & IMPAIR_SET_MAC) {
204                 for (uint16_t i = 0; i < n_pkts; ++i) {
205                         ether_addr_copy((struct ether_addr *)&task->src_mac[0], &hdr[i]->s_addr);
206                         out[i] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
207                 }
208         } else {
209                 for (uint16_t i = 0; i < n_pkts; ++i) {
210                         out[i] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
211                 }
212         }
213         return task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
214         task_impair_update(tbase);
215 }
216
217 static int handle_bulk_impair(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
218 {
219         struct task_impair *task = (struct task_impair *)tbase;
220         uint64_t now = rte_rdtsc();
221         uint8_t out[MAX_PKT_BURST] = {0};
222         uint16_t enqueue_failed;
223         uint16_t i;
224         int ret = 0;
225         struct ether_hdr * hdr[MAX_PKT_BURST];
226         for (uint16_t i = 0; i < n_pkts; ++i) {
227                 PREFETCH0(mbufs[i]);
228         }
229         for (uint16_t i = 0; i < n_pkts; ++i) {
230                 hdr[i] = rte_pktmbuf_mtod(mbufs[i], struct ether_hdr *);
231                 PREFETCH0(hdr[i]);
232         }
233
234         int nb_empty_slots = (task->queue_tail - task->queue_head + task->queue_mask) & task->queue_mask;
235         if (likely(nb_empty_slots >= n_pkts)) {
236                 /* We know n_pkts fits, no need to check for every packet */
237                 for (i = 0; i < n_pkts; ++i) {
238                         if (task->flags & IMPAIR_SET_MAC)
239                                 ether_addr_copy((struct ether_addr *)&task->src_mac[0], &hdr[i]->s_addr);
240                         task->queue[task->queue_head].tsc = now + task->delay_time;
241                         task->queue[task->queue_head].mbuf = mbufs[i];
242                         task->queue_head = (task->queue_head + 1) & task->queue_mask;
243                 }
244         } else {
245                 for (i = 0; i < n_pkts; ++i) {
246                         if (((task->queue_head + 1) & task->queue_mask) != task->queue_tail) {
247                                 if (task->flags & IMPAIR_SET_MAC)
248                                         ether_addr_copy((struct ether_addr *)&task->src_mac[0], &hdr[i]->s_addr);
249                                 task->queue[task->queue_head].tsc = now + task->delay_time;
250                                 task->queue[task->queue_head].mbuf = mbufs[i];
251                                 task->queue_head = (task->queue_head + 1) & task->queue_mask;
252                         }
253                         else {
254                                 /* Rest does not fit, need to drop those packets. */
255                                 enqueue_failed = i;
256                                 for (;i < n_pkts; ++i) {
257                                         out[i] = OUT_DISCARD;
258                                 }
259                                 ret+= task->base.tx_pkt(&task->base, mbufs + enqueue_failed,
260                                                 n_pkts - enqueue_failed, out + enqueue_failed);
261                                 break;
262                         }
263                 }
264         }
265
266         struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
267         uint16_t idx = 0;
268
269         if (task->tresh != RAND_MAX) {
270                 while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
271                         if (task->queue[task->queue_tail].tsc <= now) {
272                                 out[idx] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
273                                 new_mbufs[idx] = task->queue[task->queue_tail].mbuf;
274                                 PREFETCH0(new_mbufs[idx]);
275                                 PREFETCH0(&new_mbufs[idx]->cacheline1);
276                                 idx++;
277                                 task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
278                         }
279                         else {
280                                 break;
281                         }
282                 }
283         } else {
284                 while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
285                         if (task->queue[task->queue_tail].tsc <= now) {
286                                 out[idx] = 0;
287                                 new_mbufs[idx] = task->queue[task->queue_tail].mbuf;
288                                 PREFETCH0(new_mbufs[idx]);
289                                 PREFETCH0(&new_mbufs[idx]->cacheline1);
290                                 idx++;
291                                 task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
292                         }
293                         else {
294                                 break;
295                         }
296                 }
297         }
298
299         if (idx)
300                 ret+= task->base.tx_pkt(&task->base, new_mbufs, idx, out);
301         task_impair_update(tbase);
302         return ret;
303 }
304
305 /*
306  * We want to avoid using division and mod for performance reasons.
307  * We also want to support up to one second delay, and express it in tsc
308  * So the delay in tsc needs up to 32 bits (supposing procesor freq is less than 4GHz).
309  * If the max_delay is smaller, we make sure we use less bits.
310  * Note that we lose the MSB of the xorshift - 64 bits could hold
311  * two or three delays in TSC - but would probably make implementation more complex
312  * and not huge gain expected. Maybe room for optimization.
313  * Using this implementation, we might have to run random more than once for a delay
314  * but in average this should occur less than 50% of the time.
315 */
316
317 static inline uint64_t random_delay(struct random *state, uint64_t max_delay, uint64_t max_delay_mask)
318 {
319         uint64_t val;
320         while(1) {
321                 val = random_next(state);
322                 if ((val & max_delay_mask) < max_delay)
323                         return (val & max_delay_mask);
324         }
325 }
326
327 static int handle_bulk_impair_random(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
328 {
329         struct task_impair *task = (struct task_impair *)tbase;
330         uint64_t now = rte_rdtsc();
331         uint8_t out[MAX_PKT_BURST];
332         uint16_t enqueue_failed;
333         uint16_t i;
334         int ret = 0;
335         uint64_t packet_time, idx;
336         uint64_t now_idx = (now >> DELAY_ACCURACY) & DELAY_MAX_MASK;
337         struct ether_hdr * hdr[MAX_PKT_BURST];
338         for (uint16_t i = 0; i < n_pkts; ++i) {
339                 PREFETCH0(mbufs[i]);
340         }
341         for (uint16_t i = 0; i < n_pkts; ++i) {
342                 hdr[i] = rte_pktmbuf_mtod(mbufs[i], struct ether_hdr *);
343                 PREFETCH0(hdr[i]);
344         }
345
346         for (i = 0; i < n_pkts; ++i) {
347                 packet_time = now + random_delay(&task->state, task->delay_time, task->delay_time_mask);
348                 idx = (packet_time >> DELAY_ACCURACY) & DELAY_MAX_MASK;
349                 while (idx != ((now_idx - 1) & DELAY_MAX_MASK)) {
350                         struct queue *queue = &task->buffer[idx];
351                         if (((queue->queue_head + 1) & task->queue_mask) != queue->queue_tail) {
352                                 if (task->flags & IMPAIR_SET_MAC)
353                                         ether_addr_copy((struct ether_addr *)&task->src_mac[0], &hdr[i]->s_addr);
354                                 queue->queue_elem[queue->queue_head].mbuf = mbufs[i];
355                                 queue->queue_head = (queue->queue_head + 1) & task->queue_mask;
356                                 break;
357                         } else {
358                                 idx = (idx + 1) & DELAY_MAX_MASK;
359                         }
360                 }
361                 if (idx == ((now_idx - 1) & DELAY_MAX_MASK)) {
362                         /* Rest does not fit, need to drop packet. Note that further packets might fit as might want to be sent earlier */
363                         out[0] = OUT_DISCARD;
364                         ret+= task->base.tx_pkt(&task->base, mbufs + i, 1, out);
365                         plog_warn("Unexpectdly dropping packets\n");
366                 }
367         }
368
369         struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
370         uint16_t pkt_idx = 0;
371
372         while ((pkt_idx < MAX_PKT_BURST) && (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK))) {
373                 struct queue *queue = &task->buffer[task->last_idx];
374                 while ((pkt_idx < MAX_PKT_BURST) && (queue->queue_tail != queue->queue_head)) {
375                         out[pkt_idx] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
376                         new_mbufs[pkt_idx] = queue->queue_elem[queue->queue_tail].mbuf;
377                         PREFETCH0(new_mbufs[pkt_idx]);
378                         PREFETCH0(&new_mbufs[pkt_idx]->cacheline1);
379                         pkt_idx++;
380                         queue->queue_tail = (queue->queue_tail + 1) & task->queue_mask;
381                 }
382                 task->last_idx = (task->last_idx + 1) & DELAY_MAX_MASK;
383         }
384
385         if (pkt_idx)
386                 ret+= task->base.tx_pkt(&task->base, new_mbufs, pkt_idx, out);
387         task_impair_update(tbase);
388         return ret;
389 }
390
391 static void init_task(struct task_base *tbase, struct task_args *targ)
392 {
393         struct task_impair *task = (struct task_impair *)tbase;
394         uint32_t queue_len = 0;
395         size_t mem_size;
396         unsigned socket_id;
397         uint64_t delay_us = 0;
398
399         task->seed = rte_rdtsc();
400         if (targ->probability == 0)
401                 targ->probability = 1000000;
402
403         task->tresh = ((uint64_t) RAND_MAX) * targ->probability / 1000000;
404
405         if ((targ->delay_us == 0) && (targ->random_delay_us == 0)) {
406                 tbase->handle_bulk = handle_bulk_random_drop;
407                 task->delay_time = 0;
408         } else if (targ->random_delay_us) {
409                 tbase->handle_bulk = handle_bulk_impair_random;
410                 task->delay_time = usec_to_tsc(targ->random_delay_us);
411                 task->delay_time_mask = rte_align32pow2(task->delay_time) - 1;
412                 delay_us = targ->random_delay_us;
413                 queue_len = rte_align32pow2((1250L * delay_us) / 84 / (DELAY_MAX_MASK + 1));
414         } else {
415                 task->delay_time = usec_to_tsc(targ->delay_us);
416                 delay_us = targ->delay_us;
417                 queue_len = rte_align32pow2(1250 * delay_us / 84);
418         }
419         /* Assume Line-rate is maximum transmit speed.
420            TODO: take link speed if tx is port.
421         */
422         if (queue_len < MAX_PKT_BURST)
423                 queue_len= MAX_PKT_BURST;
424         task->queue_mask = queue_len - 1;
425         if (task->queue_mask < MAX_PKT_BURST - 1)
426                 task->queue_mask = MAX_PKT_BURST - 1;
427
428         mem_size = (task->queue_mask + 1) * sizeof(task->queue[0]);
429         socket_id = rte_lcore_to_socket_id(targ->lconf->id);
430         task->socket_id = rte_lcore_to_socket_id(targ->lconf->id);
431
432         if (targ->delay_us) {
433                 task->queue = prox_zmalloc(mem_size, socket_id);
434                 PROX_PANIC(task->queue == NULL, "Not enough memory to allocate queue\n");
435                 task->queue_head = 0;
436                 task->queue_tail = 0;
437         } else if (targ->random_delay_us) {
438                 size_t size = (DELAY_MAX_MASK + 1) * sizeof(struct queue);
439                 plog_info("Allocating %zd bytes\n", size);
440                 task->buffer = prox_zmalloc(size, socket_id);
441                 PROX_PANIC(task->buffer == NULL, "Not enough memory to allocate buffer\n");
442                 plog_info("Allocating %d x %zd bytes\n", DELAY_MAX_MASK + 1, mem_size);
443
444                 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
445                         task->buffer[i].queue_elem = prox_zmalloc(mem_size, socket_id);
446                         PROX_PANIC(task->buffer[i].queue_elem == NULL, "Not enough memory to allocate buffer elems\n");
447                 }
448         }
449         random_init_seed(&task->state);
450         if (targ->nb_txports) {
451                 memcpy(&task->src_mac[0], &prox_port_cfg[tbase->tx_params_hw.tx_port_queue[0].port].eth_addr, sizeof(struct ether_addr));
452                 task->flags = IMPAIR_SET_MAC;
453         } else {
454                 task->flags = 0;
455         }
456 }
457
458 static struct task_init tinit = {
459         .mode_str = "impair",
460         .init = init_task,
461         .handle = handle_bulk_impair,
462         .flag_features = TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS | TASK_FEATURE_ZERO_RX,
463         .size = sizeof(struct task_impair)
464 };
465
466 __attribute__((constructor)) static void ctor(void)
467 {
468         reg_task(&tinit);
469 }