2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
19 #include <rte_cycles.h>
20 #include <rte_version.h>
22 #include "prox_malloc.h"
26 #include "handle_impair.h"
28 #include "prox_port_cfg.h"
30 #if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0)
31 #define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE
34 #define DELAY_ACCURACY 11 // accuracy of 2048 cycles ~= 1 micro-second
35 #define DELAY_MAX_MASK 0x1FFFFF // Maximum 2M * 2K cycles ~1 second
38 struct rte_mbuf *mbuf;
43 struct queue_elem *queue_elem;
49 struct task_base base;
50 struct queue_elem *queue;
51 uint32_t random_delay_us;
54 uint64_t delay_time_mask;
70 #define IMPAIR_NEED_UPDATE 1
71 #define IMPAIR_SET_MAC 2
73 static int handle_bulk_impair(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
74 static int handle_bulk_impair_random(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
75 static int handle_bulk_random_drop(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
77 void task_impair_set_proba_no_drop(struct task_base *tbase, float proba_no_drop)
79 struct task_impair *task = (struct task_impair *)tbase;
80 task->tresh_no_drop = ((uint64_t) RAND_MAX) * (uint32_t)(proba_no_drop * 10000) / 1000000;
83 void task_impair_set_proba_delay(struct task_base *tbase, float proba_delay)
85 struct task_impair *task = (struct task_impair *)tbase;
86 task->tresh_delay = ((uint64_t) RAND_MAX) * (uint32_t)(proba_delay * 10000) / 1000000;
87 task->flags |= IMPAIR_NEED_UPDATE;
90 void task_impair_set_proba_duplicate(struct task_base *tbase, float proba_dup)
92 struct task_impair *task = (struct task_impair *)tbase;
93 task->tresh_duplicate = ((uint64_t) RAND_MAX) * (uint32_t)(proba_dup * 10000) / 1000000;
96 void task_impair_set_delay_us(struct task_base *tbase, uint32_t delay_us, uint32_t random_delay_us)
98 struct task_impair *task = (struct task_impair *)tbase;
99 task->flags |= IMPAIR_NEED_UPDATE;
100 task->random_delay_us = random_delay_us;
101 task->delay_us = delay_us;
104 static void task_impair_update(struct task_base *tbase)
106 struct task_impair *task = (struct task_impair *)tbase;
107 uint32_t queue_len = 0;
109 if ((task->flags & IMPAIR_NEED_UPDATE) == 0)
111 task->flags &= ~IMPAIR_NEED_UPDATE;
112 uint64_t now = rte_rdtsc();
113 uint8_t out[MAX_PKT_BURST] = {0};
114 uint64_t now_idx = (now >> DELAY_ACCURACY) & DELAY_MAX_MASK;
116 if (task->random_delay_us) {
117 tbase->handle_bulk = handle_bulk_impair_random;
118 task->delay_time = usec_to_tsc(task->random_delay_us);
119 task->delay_time_mask = rte_align32pow2(task->delay_time) - 1;
120 queue_len = rte_align32pow2((1250L * task->random_delay_us) / 84 / (DELAY_MAX_MASK + 1));
121 } else if (task->delay_us == 0) {
122 tbase->handle_bulk = handle_bulk_random_drop;
123 task->delay_time = 0;
125 tbase->handle_bulk = handle_bulk_impair;
126 task->delay_time = usec_to_tsc(task->delay_us);
127 queue_len = rte_align32pow2(1250 * task->delay_us / 84);
130 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
131 while (task->queue_tail != task->queue_head) {
134 while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
135 if (task->queue[task->queue_tail].tsc <= now) {
136 out[idx] = rand_r(&task->seed) <= task->tresh_no_drop? 0 : OUT_DISCARD;
137 new_mbufs[idx++] = task->queue[task->queue_tail].mbuf;
138 task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
145 task->base.tx_pkt(&task->base, new_mbufs, idx, out);
147 prox_free(task->queue);
151 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
152 while (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK)) {
154 uint16_t pkt_idx = 0;
155 while ((pkt_idx < MAX_PKT_BURST) && (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK))) {
156 struct queue *queue = &task->buffer[task->last_idx];
157 while ((pkt_idx < MAX_PKT_BURST) && (queue->queue_tail != queue->queue_head)) {
158 out[pkt_idx] = rand_r(&task->seed) <= task->tresh_no_drop? 0 : OUT_DISCARD;
159 new_mbufs[pkt_idx++] = queue->queue_elem[queue->queue_tail].mbuf;
160 queue->queue_tail = (queue->queue_tail + 1) & task->queue_mask;
162 task->last_idx = (task->last_idx + 1) & DELAY_MAX_MASK;
166 task->base.tx_pkt(&task->base, new_mbufs, pkt_idx, out);
168 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
169 if (task->buffer[i].queue_elem)
170 prox_free(task->buffer[i].queue_elem);
172 prox_free(task->buffer);
176 if (queue_len < MAX_PKT_BURST)
177 queue_len= MAX_PKT_BURST;
178 task->queue_mask = queue_len - 1;
179 if (task->queue_mask < MAX_PKT_BURST - 1)
180 task->queue_mask = MAX_PKT_BURST - 1;
181 mem_size = (task->queue_mask + 1) * sizeof(task->queue[0]);
183 if (task->delay_us) {
184 task->queue_head = 0;
185 task->queue_tail = 0;
186 task->queue = prox_zmalloc(mem_size, task->socket_id);
187 if (task->queue == NULL) {
188 plog_err("Not enough memory to allocate queue\n");
189 task->queue_mask = 0;
191 } else if (task->random_delay_us) {
192 size_t size = (DELAY_MAX_MASK + 1) * sizeof(struct queue);
193 plog_info("\t\tAllocating %zd bytes\n", size);
194 task->buffer = prox_zmalloc(size, task->socket_id);
195 PROX_PANIC(task->buffer == NULL, "Not enough memory to allocate buffer\n");
196 plog_info("\t\tAllocating %d x %zd bytes\n", DELAY_MAX_MASK + 1, mem_size);
198 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
199 task->buffer[i].queue_elem = prox_zmalloc(mem_size, task->socket_id);
200 PROX_PANIC(task->buffer[i].queue_elem == NULL, "Not enough memory to allocate buffer elems\n");
203 random_init_seed(&task->state);
206 static int handle_bulk_random_drop(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
208 struct task_impair *task = (struct task_impair *)tbase;
209 uint8_t out[MAX_PKT_BURST];
210 prox_rte_ether_hdr * hdr[MAX_PKT_BURST];
212 for (uint16_t i = 0; i < n_pkts; ++i) {
215 for (uint16_t i = 0; i < n_pkts; ++i) {
216 hdr[i] = rte_pktmbuf_mtod(mbufs[i], prox_rte_ether_hdr *);
219 if (task->flags & IMPAIR_SET_MAC) {
220 for (uint16_t i = 0; i < n_pkts; ++i) {
221 prox_rte_ether_addr_copy((prox_rte_ether_addr *)&task->src_mac[0], &hdr[i]->s_addr);
222 out[i] = rand_r(&task->seed) <= task->tresh_no_drop? 0 : OUT_DISCARD;
225 for (uint16_t i = 0; i < n_pkts; ++i) {
226 out[i] = rand_r(&task->seed) <= task->tresh_no_drop? 0 : OUT_DISCARD;
229 ret = task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
230 task_impair_update(tbase);
234 static int handle_bulk_impair(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
236 struct task_impair *task = (struct task_impair *)tbase;
237 uint64_t now = rte_rdtsc();
238 uint8_t out[MAX_PKT_BURST] = {0};
239 uint16_t enqueue_failed;
242 prox_rte_ether_hdr * hdr[MAX_PKT_BURST];
243 for (uint16_t i = 0; i < n_pkts; ++i) {
246 for (uint16_t i = 0; i < n_pkts; ++i) {
247 hdr[i] = rte_pktmbuf_mtod(mbufs[i], prox_rte_ether_hdr *);
251 int nb_empty_slots = (task->queue_tail - task->queue_head + task->queue_mask) & task->queue_mask;
252 if (likely(nb_empty_slots >= n_pkts)) {
253 /* We know n_pkts fits, no need to check for every packet */
254 for (i = 0; i < n_pkts; ++i) {
255 if (task->flags & IMPAIR_SET_MAC)
256 prox_rte_ether_addr_copy((prox_rte_ether_addr *)&task->src_mac[0], &hdr[i]->s_addr);
257 task->queue[task->queue_head].tsc = now + task->delay_time;
258 task->queue[task->queue_head].mbuf = mbufs[i];
259 task->queue_head = (task->queue_head + 1) & task->queue_mask;
262 for (i = 0; i < n_pkts; ++i) {
263 if (((task->queue_head + 1) & task->queue_mask) != task->queue_tail) {
264 if (task->flags & IMPAIR_SET_MAC)
265 prox_rte_ether_addr_copy((prox_rte_ether_addr *)&task->src_mac[0], &hdr[i]->s_addr);
266 task->queue[task->queue_head].tsc = now + task->delay_time;
267 task->queue[task->queue_head].mbuf = mbufs[i];
268 task->queue_head = (task->queue_head + 1) & task->queue_mask;
271 /* Rest does not fit, need to drop those packets. */
273 for (;i < n_pkts; ++i) {
274 out[i] = OUT_DISCARD;
276 ret+= task->base.tx_pkt(&task->base, mbufs + enqueue_failed,
277 n_pkts - enqueue_failed, out + enqueue_failed);
283 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
286 if (task->tresh_no_drop != RAND_MAX) {
287 while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
288 if (task->queue[task->queue_tail].tsc <= now) {
289 out[idx] = rand_r(&task->seed) <= task->tresh_no_drop? 0 : OUT_DISCARD;
290 new_mbufs[idx] = task->queue[task->queue_tail].mbuf;
291 PREFETCH0(new_mbufs[idx]);
292 PREFETCH0(&new_mbufs[idx]->cacheline1);
294 task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
301 while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
302 if (task->queue[task->queue_tail].tsc <= now) {
304 new_mbufs[idx] = task->queue[task->queue_tail].mbuf;
305 PREFETCH0(new_mbufs[idx]);
306 PREFETCH0(&new_mbufs[idx]->cacheline1);
308 task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
317 ret+= task->base.tx_pkt(&task->base, new_mbufs, idx, out);
318 task_impair_update(tbase);
323 * We want to avoid using division and mod for performance reasons.
324 * We also want to support up to one second delay, and express it in tsc
325 * So the delay in tsc needs up to 32 bits (supposing procesor freq is less than 4GHz).
326 * If the max_delay is smaller, we make sure we use less bits.
327 * Note that we lose the MSB of the xorshift - 64 bits could hold
328 * two or three delays in TSC - but would probably make implementation more complex
329 * and not huge gain expected. Maybe room for optimization.
330 * Using this implementation, we might have to run random more than once for a delay
331 * but in average this should occur less than 50% of the time.
334 static inline uint64_t random_delay(struct random *state, uint64_t max_delay, uint64_t max_delay_mask)
338 val = random_next(state);
339 if ((val & max_delay_mask) < max_delay)
340 return (val & max_delay_mask);
344 static int handle_bulk_impair_random(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
346 struct task_impair *task = (struct task_impair *)tbase;
347 uint64_t now = rte_rdtsc();
348 uint8_t out[MAX_PKT_BURST];
349 uint16_t enqueue_failed;
352 uint64_t packet_time, idx;
353 uint64_t now_idx = (now >> DELAY_ACCURACY) & DELAY_MAX_MASK;
354 prox_rte_ether_hdr * hdr[MAX_PKT_BURST];
355 for (uint16_t i = 0; i < n_pkts; ++i) {
358 for (uint16_t i = 0; i < n_pkts; ++i) {
359 hdr[i] = rte_pktmbuf_mtod(mbufs[i], prox_rte_ether_hdr *);
363 for (i = 0; i < n_pkts; ++i) {
364 if (rand_r(&task->seed) <= task->tresh_delay)
365 packet_time = now + random_delay(&task->state, task->delay_time, task->delay_time_mask);
368 idx = (packet_time >> DELAY_ACCURACY) & DELAY_MAX_MASK;
369 while (idx != ((now_idx - 1) & DELAY_MAX_MASK)) {
370 struct queue *queue = &task->buffer[idx];
371 if (((queue->queue_head + 1) & task->queue_mask) != queue->queue_tail) {
372 if (task->flags & IMPAIR_SET_MAC)
373 prox_rte_ether_addr_copy((prox_rte_ether_addr *)&task->src_mac[0], &hdr[i]->s_addr);
374 queue->queue_elem[queue->queue_head].mbuf = mbufs[i];
375 queue->queue_head = (queue->queue_head + 1) & task->queue_mask;
378 idx = (idx + 1) & DELAY_MAX_MASK;
381 if (idx == ((now_idx - 1) & DELAY_MAX_MASK)) {
382 /* Rest does not fit, need to drop packet. Note that further packets might fit as might want to be sent earlier */
383 out[0] = OUT_DISCARD;
384 ret+= task->base.tx_pkt(&task->base, mbufs + i, 1, out);
385 plog_warn("Unexpectdly dropping packets\n");
387 #if RTE_VERSION >= RTE_VERSION_NUM(19,11,0,0)
388 if (rand_r(&task->seed) <= task->tresh_duplicate) {
389 mbufs[i] = rte_pktmbuf_copy(mbufs[i], mbufs[i]->pool, 0, UINT32_MAX);
390 if (mbufs[i] == NULL) {
391 plog_err("Failed to duplicate mbuf\n");
398 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
399 uint16_t pkt_idx = 0;
401 while ((pkt_idx < MAX_PKT_BURST) && (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK))) {
402 struct queue *queue = &task->buffer[task->last_idx];
403 while ((pkt_idx < MAX_PKT_BURST) && (queue->queue_tail != queue->queue_head)) {
404 out[pkt_idx] = rand_r(&task->seed) <= task->tresh_no_drop? 0 : OUT_DISCARD;
405 new_mbufs[pkt_idx] = queue->queue_elem[queue->queue_tail].mbuf;
406 PREFETCH0(new_mbufs[pkt_idx]);
407 PREFETCH0(&new_mbufs[pkt_idx]->cacheline1);
409 queue->queue_tail = (queue->queue_tail + 1) & task->queue_mask;
411 task->last_idx = (task->last_idx + 1) & DELAY_MAX_MASK;
415 ret+= task->base.tx_pkt(&task->base, new_mbufs, pkt_idx, out);
416 task_impair_update(tbase);
420 static void init_task(struct task_base *tbase, struct task_args *targ)
422 struct task_impair *task = (struct task_impair *)tbase;
423 uint32_t queue_len = 0;
426 uint64_t delay_us = 0;
428 task->seed = rte_rdtsc();
430 task->tresh_no_drop = ((uint64_t) RAND_MAX) * targ->probability_no_drop / 1000000;
431 task->tresh_delay = ((uint64_t) RAND_MAX) * targ->probability_delay / 1000000;
432 task->tresh_duplicate = ((uint64_t) RAND_MAX) * targ->probability_duplicate / 1000000;
434 if ((targ->delay_us == 0) && (targ->random_delay_us == 0)) {
435 tbase->handle_bulk = handle_bulk_random_drop;
436 task->delay_time = 0;
437 } else if (targ->random_delay_us) {
438 tbase->handle_bulk = handle_bulk_impair_random;
439 task->delay_time = usec_to_tsc(targ->random_delay_us);
440 task->delay_time_mask = rte_align32pow2(task->delay_time) - 1;
441 delay_us = targ->random_delay_us;
442 queue_len = rte_align32pow2((1250L * delay_us) / 84 / (DELAY_MAX_MASK + 1));
444 task->delay_time = usec_to_tsc(targ->delay_us);
445 delay_us = targ->delay_us;
446 queue_len = rte_align32pow2(1250 * delay_us / 84);
448 /* Assume Line-rate is maximum transmit speed.
449 TODO: take link speed if tx is port.
451 if (queue_len < MAX_PKT_BURST)
452 queue_len= MAX_PKT_BURST;
453 task->queue_mask = queue_len - 1;
454 if (task->queue_mask < MAX_PKT_BURST - 1)
455 task->queue_mask = MAX_PKT_BURST - 1;
457 mem_size = (task->queue_mask + 1) * sizeof(task->queue[0]);
458 socket_id = rte_lcore_to_socket_id(targ->lconf->id);
459 task->socket_id = rte_lcore_to_socket_id(targ->lconf->id);
461 if (targ->delay_us) {
462 task->queue = prox_zmalloc(mem_size, socket_id);
463 PROX_PANIC(task->queue == NULL, "Not enough memory to allocate queue\n");
464 task->queue_head = 0;
465 task->queue_tail = 0;
466 } else if (targ->random_delay_us) {
467 size_t size = (DELAY_MAX_MASK + 1) * sizeof(struct queue);
468 plog_info("\t\tAllocating %zd bytes\n", size);
469 task->buffer = prox_zmalloc(size, socket_id);
470 PROX_PANIC(task->buffer == NULL, "Not enough memory to allocate buffer\n");
471 plog_info("\t\tAllocating %d x %zd bytes\n", DELAY_MAX_MASK + 1, mem_size);
473 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
474 task->buffer[i].queue_elem = prox_zmalloc(mem_size, socket_id);
475 PROX_PANIC(task->buffer[i].queue_elem == NULL, "Not enough memory to allocate buffer elems\n");
478 random_init_seed(&task->state);
479 if (targ->nb_txports) {
480 memcpy(&task->src_mac[0], &prox_port_cfg[tbase->tx_params_hw.tx_port_queue[0].port].eth_addr, sizeof(prox_rte_ether_addr));
481 task->flags = IMPAIR_SET_MAC;
487 static struct task_init tinit = {
488 .mode_str = "impair",
490 .handle = handle_bulk_impair,
491 .flag_features = TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS | TASK_FEATURE_ZERO_RX,
492 .size = sizeof(struct task_impair)
495 __attribute__((constructor)) static void ctor(void)