2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
19 #include <rte_cycles.h>
20 #include <rte_version.h>
22 #include "prox_malloc.h"
26 #include "handle_impair.h"
29 #if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0)
30 #define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE
33 #define DELAY_ACCURACY 11 // accuracy of 2048 cycles ~= 1 micro-second
34 #define DELAY_MAX_MASK 0x1FFFFF // Maximum 2M * 2K cycles ~1 second
37 struct rte_mbuf *mbuf;
42 struct queue_elem *queue_elem;
48 struct task_base base;
49 struct queue_elem *queue;
50 uint32_t random_delay_us;
53 uint64_t delay_time_mask;
66 static int handle_bulk_impair(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
67 static int handle_bulk_impair_random(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
68 static int handle_bulk_random_drop(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
70 void task_impair_set_proba(struct task_base *tbase, float proba)
72 struct task_impair *task = (struct task_impair *)tbase;
73 task->tresh = ((uint64_t) RAND_MAX) * (uint32_t)(proba * 10000) / 1000000;
76 void task_impair_set_delay_us(struct task_base *tbase, uint32_t delay_us, uint32_t random_delay_us)
78 struct task_impair *task = (struct task_impair *)tbase;
79 task->need_update = 1;
80 task->random_delay_us = random_delay_us;
81 task->delay_us = delay_us;
84 static void task_impair_update(struct task_base *tbase)
86 struct task_impair *task = (struct task_impair *)tbase;
87 uint32_t queue_len = 0;
89 if (!task->need_update)
91 task->need_update = 0;
92 uint64_t now = rte_rdtsc();
93 uint8_t out[MAX_PKT_BURST] = {0};
94 uint64_t now_idx = (now >> DELAY_ACCURACY) & DELAY_MAX_MASK;
96 if (task->random_delay_us) {
97 tbase->handle_bulk = handle_bulk_impair_random;
98 task->delay_time = usec_to_tsc(task->random_delay_us);
99 task->delay_time_mask = rte_align32pow2(task->delay_time) - 1;
100 queue_len = rte_align32pow2((1250L * task->random_delay_us) / 84 / (DELAY_MAX_MASK + 1));
101 } else if (task->delay_us == 0) {
102 tbase->handle_bulk = handle_bulk_random_drop;
103 task->delay_time = 0;
105 tbase->handle_bulk = handle_bulk_impair;
106 task->delay_time = usec_to_tsc(task->delay_us);
107 queue_len = rte_align32pow2(1250 * task->delay_us / 84);
110 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
111 while (task->queue_tail != task->queue_head) {
114 while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
115 if (task->queue[task->queue_tail].tsc <= now) {
116 out[idx] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
117 new_mbufs[idx++] = task->queue[task->queue_tail].mbuf;
118 task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
125 task->base.tx_pkt(&task->base, new_mbufs, idx, out);
127 prox_free(task->queue);
131 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
132 while (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK)) {
134 uint16_t pkt_idx = 0;
135 while ((pkt_idx < MAX_PKT_BURST) && (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK))) {
136 struct queue *queue = &task->buffer[task->last_idx];
137 while ((pkt_idx < MAX_PKT_BURST) && (queue->queue_tail != queue->queue_head)) {
138 out[pkt_idx] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
139 new_mbufs[pkt_idx++] = queue->queue_elem[queue->queue_tail].mbuf;
140 queue->queue_tail = (queue->queue_tail + 1) & task->queue_mask;
142 task->last_idx = (task->last_idx + 1) & DELAY_MAX_MASK;
146 task->base.tx_pkt(&task->base, new_mbufs, pkt_idx, out);
148 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
149 if (task->buffer[i].queue_elem)
150 prox_free(task->buffer[i].queue_elem);
152 prox_free(task->buffer);
156 if (queue_len < MAX_PKT_BURST)
157 queue_len= MAX_PKT_BURST;
158 task->queue_mask = queue_len - 1;
159 if (task->queue_mask < MAX_PKT_BURST - 1)
160 task->queue_mask = MAX_PKT_BURST - 1;
161 mem_size = (task->queue_mask + 1) * sizeof(task->queue[0]);
163 if (task->delay_us) {
164 task->queue_head = 0;
165 task->queue_tail = 0;
166 task->queue = prox_zmalloc(mem_size, task->socket_id);
167 if (task->queue == NULL) {
168 plog_err("Not enough memory to allocate queue\n");
169 task->queue_mask = 0;
171 } else if (task->random_delay_us) {
172 size_t size = (DELAY_MAX_MASK + 1) * sizeof(struct queue);
173 plog_info("Allocating %zd bytes\n", size);
174 task->buffer = prox_zmalloc(size, task->socket_id);
175 PROX_PANIC(task->buffer == NULL, "Not enough memory to allocate buffer\n");
176 plog_info("Allocating %d x %zd bytes\n", DELAY_MAX_MASK + 1, mem_size);
178 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
179 task->buffer[i].queue_elem = prox_zmalloc(mem_size, task->socket_id);
180 PROX_PANIC(task->buffer[i].queue_elem == NULL, "Not enough memory to allocate buffer elems\n");
183 random_init_seed(&task->state);
186 static int handle_bulk_random_drop(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
188 struct task_impair *task = (struct task_impair *)tbase;
189 uint8_t out[MAX_PKT_BURST];
190 for (uint16_t i = 0; i < n_pkts; ++i) {
191 out[i] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
193 return task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
194 task_impair_update(tbase);
197 static int handle_bulk_impair(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
199 struct task_impair *task = (struct task_impair *)tbase;
200 uint64_t now = rte_rdtsc();
201 uint8_t out[MAX_PKT_BURST] = {0};
202 uint16_t enqueue_failed;
206 int nb_empty_slots = (task->queue_tail - task->queue_head + task->queue_mask) & task->queue_mask;
207 if (likely(nb_empty_slots >= n_pkts)) {
208 /* We know n_pkts fits, no need to check for every packet */
209 for (i = 0; i < n_pkts; ++i) {
210 task->queue[task->queue_head].tsc = now + task->delay_time;
211 task->queue[task->queue_head].mbuf = mbufs[i];
212 task->queue_head = (task->queue_head + 1) & task->queue_mask;
215 for (i = 0; i < n_pkts; ++i) {
216 if (((task->queue_head + 1) & task->queue_mask) != task->queue_tail) {
217 task->queue[task->queue_head].tsc = now + task->delay_time;
218 task->queue[task->queue_head].mbuf = mbufs[i];
219 task->queue_head = (task->queue_head + 1) & task->queue_mask;
222 /* Rest does not fit, need to drop those packets. */
224 for (;i < n_pkts; ++i) {
225 out[i] = OUT_DISCARD;
227 ret+= task->base.tx_pkt(&task->base, mbufs + enqueue_failed,
228 n_pkts - enqueue_failed, out + enqueue_failed);
234 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
237 if (task->tresh != RAND_MAX) {
238 while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
239 if (task->queue[task->queue_tail].tsc <= now) {
240 out[idx] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
241 new_mbufs[idx] = task->queue[task->queue_tail].mbuf;
242 PREFETCH0(new_mbufs[idx]);
243 PREFETCH0(&new_mbufs[idx]->cacheline1);
245 task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
252 while (idx < MAX_PKT_BURST && task->queue_tail != task->queue_head) {
253 if (task->queue[task->queue_tail].tsc <= now) {
255 new_mbufs[idx] = task->queue[task->queue_tail].mbuf;
256 PREFETCH0(new_mbufs[idx]);
257 PREFETCH0(&new_mbufs[idx]->cacheline1);
259 task->queue_tail = (task->queue_tail + 1) & task->queue_mask;
268 ret+= task->base.tx_pkt(&task->base, new_mbufs, idx, out);
269 task_impair_update(tbase);
274 * We want to avoid using division and mod for performance reasons.
275 * We also want to support up to one second delay, and express it in tsc
276 * So the delay in tsc needs up to 32 bits (supposing procesor freq is less than 4GHz).
277 * If the max_delay is smaller, we make sure we use less bits.
278 * Note that we lose the MSB of the xorshift - 64 bits could hold
279 * two or three delays in TSC - but would probably make implementation more complex
280 * and not huge gain expected. Maybe room for optimization.
281 * Using this implementation, we might have to run random more than once for a delay
282 * but in average this should occur less than 50% of the time.
285 static inline uint64_t random_delay(struct random *state, uint64_t max_delay, uint64_t max_delay_mask)
289 val = random_next(state);
290 if ((val & max_delay_mask) < max_delay)
291 return (val & max_delay_mask);
295 static int handle_bulk_impair_random(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
297 struct task_impair *task = (struct task_impair *)tbase;
298 uint64_t now = rte_rdtsc();
299 uint8_t out[MAX_PKT_BURST];
300 uint16_t enqueue_failed;
303 uint64_t packet_time, idx;
304 uint64_t now_idx = (now >> DELAY_ACCURACY) & DELAY_MAX_MASK;
306 for (i = 0; i < n_pkts; ++i) {
307 packet_time = now + random_delay(&task->state, task->delay_time, task->delay_time_mask);
308 idx = (packet_time >> DELAY_ACCURACY) & DELAY_MAX_MASK;
309 while (idx != ((now_idx - 1) & DELAY_MAX_MASK)) {
310 struct queue *queue = &task->buffer[idx];
311 if (((queue->queue_head + 1) & task->queue_mask) != queue->queue_tail) {
312 queue->queue_elem[queue->queue_head].mbuf = mbufs[i];
313 queue->queue_head = (queue->queue_head + 1) & task->queue_mask;
316 idx = (idx + 1) & DELAY_MAX_MASK;
319 if (idx == ((now_idx - 1) & DELAY_MAX_MASK)) {
320 /* Rest does not fit, need to drop packet. Note that further packets might fit as might want to be sent earlier */
321 out[0] = OUT_DISCARD;
322 ret+= task->base.tx_pkt(&task->base, mbufs + i, 1, out);
323 plog_warn("Unexpectdly dropping packets\n");
327 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
328 uint16_t pkt_idx = 0;
330 while ((pkt_idx < MAX_PKT_BURST) && (task->last_idx != ((now_idx - 1) & DELAY_MAX_MASK))) {
331 struct queue *queue = &task->buffer[task->last_idx];
332 while ((pkt_idx < MAX_PKT_BURST) && (queue->queue_tail != queue->queue_head)) {
333 out[pkt_idx] = rand_r(&task->seed) <= task->tresh? 0 : OUT_DISCARD;
334 new_mbufs[pkt_idx] = queue->queue_elem[queue->queue_tail].mbuf;
335 PREFETCH0(new_mbufs[pkt_idx]);
336 PREFETCH0(&new_mbufs[pkt_idx]->cacheline1);
338 queue->queue_tail = (queue->queue_tail + 1) & task->queue_mask;
340 task->last_idx = (task->last_idx + 1) & DELAY_MAX_MASK;
344 ret+= task->base.tx_pkt(&task->base, new_mbufs, pkt_idx, out);
345 task_impair_update(tbase);
349 static void init_task(struct task_base *tbase, struct task_args *targ)
351 struct task_impair *task = (struct task_impair *)tbase;
352 uint32_t queue_len = 0;
355 uint64_t delay_us = 0;
357 task->seed = rte_rdtsc();
358 if (targ->probability == 0)
359 targ->probability = 1000000;
361 task->tresh = ((uint64_t) RAND_MAX) * targ->probability / 1000000;
363 if ((targ->delay_us == 0) && (targ->random_delay_us == 0)) {
364 tbase->handle_bulk = handle_bulk_random_drop;
365 task->delay_time = 0;
366 } else if (targ->random_delay_us) {
367 tbase->handle_bulk = handle_bulk_impair_random;
368 task->delay_time = usec_to_tsc(targ->random_delay_us);
369 task->delay_time_mask = rte_align32pow2(task->delay_time) - 1;
370 delay_us = targ->random_delay_us;
371 queue_len = rte_align32pow2((1250L * delay_us) / 84 / (DELAY_MAX_MASK + 1));
373 task->delay_time = usec_to_tsc(targ->delay_us);
374 delay_us = targ->delay_us;
375 queue_len = rte_align32pow2(1250 * delay_us / 84);
377 /* Assume Line-rate is maximum transmit speed.
378 TODO: take link speed if tx is port.
380 if (queue_len < MAX_PKT_BURST)
381 queue_len= MAX_PKT_BURST;
382 task->queue_mask = queue_len - 1;
383 if (task->queue_mask < MAX_PKT_BURST - 1)
384 task->queue_mask = MAX_PKT_BURST - 1;
386 mem_size = (task->queue_mask + 1) * sizeof(task->queue[0]);
387 socket_id = rte_lcore_to_socket_id(targ->lconf->id);
388 task->socket_id = rte_lcore_to_socket_id(targ->lconf->id);
390 if (targ->delay_us) {
391 task->queue = prox_zmalloc(mem_size, socket_id);
392 PROX_PANIC(task->queue == NULL, "Not enough memory to allocate queue\n");
393 task->queue_head = 0;
394 task->queue_tail = 0;
395 } else if (targ->random_delay_us) {
396 size_t size = (DELAY_MAX_MASK + 1) * sizeof(struct queue);
397 plog_info("Allocating %zd bytes\n", size);
398 task->buffer = prox_zmalloc(size, socket_id);
399 PROX_PANIC(task->buffer == NULL, "Not enough memory to allocate buffer\n");
400 plog_info("Allocating %d x %zd bytes\n", DELAY_MAX_MASK + 1, mem_size);
402 for (int i = 0; i < DELAY_MAX_MASK + 1; i++) {
403 task->buffer[i].queue_elem = prox_zmalloc(mem_size, socket_id);
404 PROX_PANIC(task->buffer[i].queue_elem == NULL, "Not enough memory to allocate buffer elems\n");
407 random_init_seed(&task->state);
410 static struct task_init tinit = {
411 .mode_str = "impair",
413 .handle = handle_bulk_impair,
414 .flag_features = TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS | TASK_FEATURE_ZERO_RX,
415 .size = sizeof(struct task_impair)
418 __attribute__((constructor)) static void ctor(void)