Merge "[l2l3 stack] implements new nd state machine & nd buffering"
[samplevnf.git] / VNFs / DPPD-PROX / handle_impair.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <string.h>
18 #include <stdio.h>
19 #include <rte_cycles.h>
20 #include <rte_version.h>
21
22 #include "prox_malloc.h"
23 #include "lconf.h"
24 #include "log.h"
25 #include "random.h"
26 #include "handle_impair.h"
27 #include "prefetch.h"
28
29 #if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0)
30 #define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE
31 #endif
32
33 #define DELAY_ACCURACY  11              // accuracy of 2048 cycles ~= 1 micro-second
34 #define DELAY_MAX_MASK  0x1FFFFF        // Maximum 2M * 2K cycles ~1 second
35
36 struct queue_elem {
37         struct rte_mbuf *mbuf;
38         uint64_t        tsc;
39 };
40
41 struct queue {
42         struct queue_elem *queue_elem;
43         unsigned queue_head;
44         unsigned queue_tail;
45 };
46
47 struct task_impair {
48         struct task_base base;
49         struct queue_elem *queue;
50         uint32_t random_delay_us;
51         uint32_t delay_us;
52         uint64_t delay_time;
53         uint64_t delay_time_mask;
54         unsigned queue_head;
55         unsigned queue_tail;
56         unsigned queue_mask;
57         int tresh;
58         unsigned int seed;
59         struct random state;
60         uint64_t last_idx;
61         struct queue *buffer;
62         uint32_t socket_id;
63         int need_update;
64 };
65
66 static int handle_bulk_impair(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
67 static int handle_bulk_impair_random(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
68 static int handle_bulk_random_drop(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
69
70 void task_impair_set_proba(struct task_base *tbase, float proba)
71 {
72         struct task_impair *task = (struct task_impair *)tbase;
73         task->tresh = ((uint64_t) RAND_MAX) * (uint32_t)(proba * 10000) / 1000000;
74 }
75
76 void task_impair_set_delay_us(struct task_base *tbase, uint32_t delay_us, uint32_t random_delay_us)
77 {
78         struct task_impair *task = (struct task_impair *)tbase;
79         task->need_update = 1;
80         task->random_delay_us = random_delay_us;
81         task->delay_us = delay_us;
82 }
83
84 static void task_impair_update(struct task_base *tbase)
85 {
86         struct task_impair *task = (struct task_impair *)tbase;
87         uint32_t queue_len = 0;
88         size_t mem_size;
89         if (!task->need_update)
90                 return;
91         task->need_update = 0;
92         uint64_t now = rte_rdtsc();
93         uint8_t out[MAX_PKT_BURST] = {0};
94         uint64_t now_idx = (now >> DELAY_ACCURACY) & DELAY_MAX_MASK;
95
96         if (task->random_delay_us) {
97                 tbase->handle_bulk = handle_bulk_impair_random;
98                 task->delay_time = usec_to_tsc(task->random_delay_us);
99                 task->delay_time_mask = rte_align32pow2(task->delay_time) - 1;
100                 queue_len = rte_align32pow2((1250L * task->random_delay_us) / 84 / (DELAY_MAX_MASK + 1));
101         } else if (task->delay_us == 0) {
102                 tbase->handle_bulk = handle_bulk_random_drop;
103                 task->delay_time = 0;
104         } else {
105                 tbase->handle_bulk = handle_bulk_impair;
106                 task->delay_time = usec_to_tsc(task->delay_us);
107                 queue_len = rte_align32pow2(1250 * task->delay_us / 84);
108         }
109         if (task->queue) {
110                 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
111                 while (task->queue_tail != task->queue_head) {
112                         now = rte_rdtsc();
113                         uint16_t idx = 0;
114                         while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
115                                 if (task->queue[task->queue_tail].tsc <= now) {
116                                         out[idx] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
117                                         new_mbufs[idx++] = task->queue[task->queue_tail].mbuf;
118                                         task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
119                                 }
120                                 else {
121                                         break;
122                                 }
123                         }
124                         if (idx)
125                                 task->base.tx_pkt(&task->base, new_mbufs, idx, out);
126                 }
127                 prox_free(task->queue);
128                 task->queue = NULL;
129         }
130         if (task->buffer) {
131                 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
132                 while (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK)) {
133                         now = rte_rdtsc();
134                         uint16_t pkt_idx = 0;
135                         while ((pkt_idx < MAX_PKT_BURST) && (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK))) {
136                                 struct queue *queue = &task->buffer[task->last_idx];
137                                 while ((pkt_idx < MAX_PKT_BURST) && (queue->queue_tail != queue->queue_head)) {
138                                         out[pkt_idx] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
139                                         new_mbufs[pkt_idx++] = queue->queue_elem[queue->queue_tail].mbuf;
140                                         queue->queue_tail = (queue->queue_tail + 1) & task->queue_mask;
141                                 }
142                                 task->last_idx = (task->last_idx + 1) & DELAY_MAX_MASK;
143                         }
144
145                         if (pkt_idx)
146                                 task->base.tx_pkt(&task->base, new_mbufs, pkt_idx, out);
147                 }
148                 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
149                         if (task->buffer[i].queue_elem)
150                                 prox_free(task->buffer[i].queue_elem);
151                 }
152                 prox_free(task->buffer);
153                 task->buffer = NULL;
154         }
155
156         if (queue_len < MAX_PKT_BURST)
157                 queue_len= MAX_PKT_BURST;
158         task->queue_mask = queue_len - 1;
159         if (task->queue_mask < MAX_PKT_BURST - 1)
160                 task->queue_mask = MAX_PKT_BURST - 1;
161         mem_size = (task->queue_mask + 1) * sizeof(task->queue[0]);
162
163         if (task->delay_us) {
164                 task->queue_head = 0;
165                 task->queue_tail = 0;
166                 task->queue = prox_zmalloc(mem_size, task->socket_id);
167                 if (task->queue == NULL) {
168                         plog_err("Not enough memory to allocate queue\n");
169                         task->queue_mask = 0;
170                 }
171         } else if (task->random_delay_us) {
172                 size_t size = (DELAY_MAX_MASK + 1) * sizeof(struct queue);
173                 plog_info("Allocating %zd bytes\n", size);
174                 task->buffer = prox_zmalloc(size, task->socket_id);
175                 PROX_PANIC(task->buffer == NULL, "Not enough memory to allocate buffer\n");
176                 plog_info("Allocating %d x %zd bytes\n", DELAY_MAX_MASK + 1, mem_size);
177
178                 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
179                         task->buffer[i].queue_elem = prox_zmalloc(mem_size, task->socket_id);
180                         PROX_PANIC(task->buffer[i].queue_elem == NULL, "Not enough memory to allocate buffer elems\n");
181                 }
182         }
183         random_init_seed(&task->state);
184 }
185
186 static int handle_bulk_random_drop(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
187 {
188         struct task_impair *task = (struct task_impair *)tbase;
189         uint8_t out[MAX_PKT_BURST];
190         for (uint16_t i = 0; i < n_pkts; ++i) {
191                 out[i] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
192         }
193         return task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
194         task_impair_update(tbase);
195 }
196
197 static int handle_bulk_impair(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
198 {
199         struct task_impair *task = (struct task_impair *)tbase;
200         uint64_t now = rte_rdtsc();
201         uint8_t out[MAX_PKT_BURST] = {0};
202         uint16_t enqueue_failed;
203         uint16_t i;
204         int ret = 0;
205
206         int nb_empty_slots = (task->queue_tail - task->queue_head + task->queue_mask) & task->queue_mask;
207         if (likely(nb_empty_slots >= n_pkts)) {
208                 /* We know n_pkts fits, no need to check for every packet */
209                 for (i = 0; i < n_pkts; ++i) {
210                         task->queue[task->queue_head].tsc = now + task->delay_time;
211                         task->queue[task->queue_head].mbuf = mbufs[i];
212                         task->queue_head = (task->queue_head + 1) & task->queue_mask;
213                 }
214         } else {
215                 for (i = 0; i < n_pkts; ++i) {
216                         if (((task->queue_head + 1) & task->queue_mask) != task->queue_tail) {
217                                 task->queue[task->queue_head].tsc = now + task->delay_time;
218                                 task->queue[task->queue_head].mbuf = mbufs[i];
219                                 task->queue_head = (task->queue_head + 1) & task->queue_mask;
220                         }
221                         else {
222                                 /* Rest does not fit, need to drop those packets. */
223                                 enqueue_failed = i;
224                                 for (;i < n_pkts; ++i) {
225                                         out[i] = OUT_DISCARD;
226                                 }
227                                 ret+= task->base.tx_pkt(&task->base, mbufs + enqueue_failed,
228                                                 n_pkts - enqueue_failed, out + enqueue_failed);
229                                 break;
230                         }
231                 }
232         }
233
234         struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
235         uint16_t idx = 0;
236
237         if (task->tresh != RAND_MAX) {
238                 while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
239                         if (task->queue[task->queue_tail].tsc <= now) {
240                                 out[idx] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
241                                 new_mbufs[idx] = task->queue[task->queue_tail].mbuf;
242                                 PREFETCH0(new_mbufs[idx]);
243                                 PREFETCH0(&new_mbufs[idx]->cacheline1);
244                                 idx++;
245                                 task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
246                         }
247                         else {
248                                 break;
249                         }
250                 }
251         } else {
252                 while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
253                         if (task->queue[task->queue_tail].tsc <= now) {
254                                 out[idx] = 0;
255                                 new_mbufs[idx] = task->queue[task->queue_tail].mbuf;
256                                 PREFETCH0(new_mbufs[idx]);
257                                 PREFETCH0(&new_mbufs[idx]->cacheline1);
258                                 idx++;
259                                 task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
260                         }
261                         else {
262                                 break;
263                         }
264                 }
265         }
266
267         if (idx)
268                 ret+= task->base.tx_pkt(&task->base, new_mbufs, idx, out);
269         task_impair_update(tbase);
270         return ret;
271 }
272
273 /*
274  * We want to avoid using division and mod for performance reasons.
275  * We also want to support up to one second delay, and express it in tsc
276  * So the delay in tsc needs up to 32 bits (supposing procesor freq is less than 4GHz).
277  * If the max_delay is smaller, we make sure we use less bits.
278  * Note that we lose the MSB of the xorshift - 64 bits could hold
279  * two or three delays in TSC - but would probably make implementation more complex
280  * and not huge gain expected. Maybe room for optimization.
281  * Using this implementation, we might have to run random more than once for a delay
282  * but in average this should occur less than 50% of the time.
283 */
284
285 static inline uint64_t random_delay(struct random *state, uint64_t max_delay, uint64_t max_delay_mask)
286 {
287         uint64_t val;
288         while(1) {
289                 val = random_next(state);
290                 if ((val & max_delay_mask) < max_delay)
291                         return (val & max_delay_mask);
292         }
293 }
294
295 static int handle_bulk_impair_random(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
296 {
297         struct task_impair *task = (struct task_impair *)tbase;
298         uint64_t now = rte_rdtsc();
299         uint8_t out[MAX_PKT_BURST];
300         uint16_t enqueue_failed;
301         uint16_t i;
302         int ret = 0;
303         uint64_t packet_time, idx;
304         uint64_t now_idx = (now >> DELAY_ACCURACY) & DELAY_MAX_MASK;
305
306         for (i = 0; i < n_pkts; ++i) {
307                 packet_time = now + random_delay(&task->state, task->delay_time, task->delay_time_mask);
308                 idx = (packet_time >> DELAY_ACCURACY) & DELAY_MAX_MASK;
309                 while (idx != ((now_idx - 1) & DELAY_MAX_MASK)) {
310                         struct queue *queue = &task->buffer[idx];
311                         if (((queue->queue_head + 1) & task->queue_mask) != queue->queue_tail) {
312                                 queue->queue_elem[queue->queue_head].mbuf = mbufs[i];
313                                 queue->queue_head = (queue->queue_head + 1) & task->queue_mask;
314                                 break;
315                         } else {
316                                 idx = (idx + 1) & DELAY_MAX_MASK;
317                         }
318                 }
319                 if (idx == ((now_idx - 1) & DELAY_MAX_MASK)) {
320                         /* Rest does not fit, need to drop packet. Note that further packets might fit as might want to be sent earlier */
321                         out[0] = OUT_DISCARD;
322                         ret+= task->base.tx_pkt(&task->base, mbufs + i, 1, out);
323                         plog_warn("Unexpectdly dropping packets\n");
324                 }
325         }
326
327         struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
328         uint16_t pkt_idx = 0;
329
330         while ((pkt_idx < MAX_PKT_BURST) && (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK))) {
331                 struct queue *queue = &task->buffer[task->last_idx];
332                 while ((pkt_idx < MAX_PKT_BURST) && (queue->queue_tail != queue->queue_head)) {
333                         out[pkt_idx] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
334                         new_mbufs[pkt_idx] = queue->queue_elem[queue->queue_tail].mbuf;
335                         PREFETCH0(new_mbufs[pkt_idx]);
336                         PREFETCH0(&new_mbufs[pkt_idx]->cacheline1);
337                         pkt_idx++;
338                         queue->queue_tail = (queue->queue_tail + 1) & task->queue_mask;
339                 }
340                 task->last_idx = (task->last_idx + 1) & DELAY_MAX_MASK;
341         }
342
343         if (pkt_idx)
344                 ret+= task->base.tx_pkt(&task->base, new_mbufs, pkt_idx, out);
345         task_impair_update(tbase);
346         return ret;
347 }
348
349 static void init_task(struct task_base *tbase, struct task_args *targ)
350 {
351         struct task_impair *task = (struct task_impair *)tbase;
352         uint32_t queue_len = 0;
353         size_t mem_size;
354         unsigned socket_id;
355         uint64_t delay_us = 0;
356
357         task->seed = rte_rdtsc();
358         if (targ->probability == 0)
359                 targ->probability = 1000000;
360
361         task->tresh = ((uint64_t) RAND_MAX) * targ->probability / 1000000;
362
363         if ((targ->delay_us == 0) && (targ->random_delay_us == 0)) {
364                 tbase->handle_bulk = handle_bulk_random_drop;
365                 task->delay_time = 0;
366         } else if (targ->random_delay_us) {
367                 tbase->handle_bulk = handle_bulk_impair_random;
368                 task->delay_time = usec_to_tsc(targ->random_delay_us);
369                 task->delay_time_mask = rte_align32pow2(task->delay_time) - 1;
370                 delay_us = targ->random_delay_us;
371                 queue_len = rte_align32pow2((1250L * delay_us) / 84 / (DELAY_MAX_MASK + 1));
372         } else {
373                 task->delay_time = usec_to_tsc(targ->delay_us);
374                 delay_us = targ->delay_us;
375                 queue_len = rte_align32pow2(1250 * delay_us / 84);
376         }
377         /* Assume Line-rate is maximum transmit speed.
378            TODO: take link speed if tx is port.
379         */
380         if (queue_len < MAX_PKT_BURST)
381                 queue_len= MAX_PKT_BURST;
382         task->queue_mask = queue_len - 1;
383         if (task->queue_mask < MAX_PKT_BURST - 1)
384                 task->queue_mask = MAX_PKT_BURST - 1;
385
386         mem_size = (task->queue_mask + 1) * sizeof(task->queue[0]);
387         socket_id = rte_lcore_to_socket_id(targ->lconf->id);
388         task->socket_id = rte_lcore_to_socket_id(targ->lconf->id);
389
390         if (targ->delay_us) {
391                 task->queue = prox_zmalloc(mem_size, socket_id);
392                 PROX_PANIC(task->queue == NULL, "Not enough memory to allocate queue\n");
393                 task->queue_head = 0;
394                 task->queue_tail = 0;
395         } else if (targ->random_delay_us) {
396                 size_t size = (DELAY_MAX_MASK + 1) * sizeof(struct queue);
397                 plog_info("Allocating %zd bytes\n", size);
398                 task->buffer = prox_zmalloc(size, socket_id);
399                 PROX_PANIC(task->buffer == NULL, "Not enough memory to allocate buffer\n");
400                 plog_info("Allocating %d x %zd bytes\n", DELAY_MAX_MASK + 1, mem_size);
401
402                 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
403                         task->buffer[i].queue_elem = prox_zmalloc(mem_size, socket_id);
404                         PROX_PANIC(task->buffer[i].queue_elem == NULL, "Not enough memory to allocate buffer elems\n");
405                 }
406         }
407         random_init_seed(&task->state);
408 }
409
410 static struct task_init tinit = {
411         .mode_str = "impair",
412         .init = init_task,
413         .handle = handle_bulk_impair,
414         .flag_features = TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS | TASK_FEATURE_ZERO_RX,
415         .size = sizeof(struct task_impair)
416 };
417
418 __attribute__((constructor)) static void ctor(void)
419 {
420         reg_task(&tinit);
421 }