Fix potential crash in rx and tx distribution
[samplevnf.git] / VNFs / DPPD-PROX / tx_pkt.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_ethdev.h>
18 #include <rte_version.h>
19
20 #include "rx_pkt.h"
21 #include "tx_pkt.h"
22 #include "task_base.h"
23 #include "stats.h"
24 #include "prefetch.h"
25 #include "prox_assert.h"
26 #include "log.h"
27 #include "mbuf_utils.h"
28 #include "handle_master.h"
29
30 static void buf_pkt_single(struct task_base *tbase, struct rte_mbuf *mbuf, const uint8_t out)
31 {
32         const uint16_t prod = tbase->ws_mbuf->idx[out].prod++;
33         tbase->ws_mbuf->mbuf[out][prod & WS_MBUF_MASK] = mbuf;
34 }
35
36 static inline void buf_pkt_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
37 {
38         for (uint16_t j = 0; j < n_pkts; ++j) {
39                 if (unlikely(out[j] >= OUT_HANDLED)) {
40                         rte_pktmbuf_free(mbufs[j]);
41                         if (out[j] == OUT_HANDLED)
42                                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
43                         else
44                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
45                 }
46                 else {
47                         buf_pkt_single(tbase, mbufs[j], out[j]);
48                 }
49         }
50 }
51 #define MAX_PMD_TX 32
52
53 int tx_pkt_l3(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
54 {
55         uint32_t ip_dst;
56         int first = 0, ret, ok = 0, rc;
57         const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
58
59         for (int j = 0; j < n_pkts; j++) {
60                 if ((out) && (out[j] >= OUT_HANDLED))
61                         continue;
62                 if (unlikely((rc = write_dst_mac(tbase, mbufs[j], &ip_dst)) < 0)) {
63                         if (j - first) {
64                                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
65                                 ok += ret;
66                         }
67                         first = j + 1;
68                         if (rc == -1) {
69                                 mbufs[j]->port = tbase->l3.reachable_port_id;
70                                 tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, REQ_MAC_TO_CTRL, mbufs[j], tbase->l3.core_id, tbase->l3.task_id, ip_dst);
71                         } else if (rc == -2) {
72                                 tx_drop(mbufs[j]);
73                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
74                         }
75                 }
76         }
77         if (n_pkts - first) {
78                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
79                 ok += ret;
80         }
81         return ok;
82 }
83
84 /* The following help functions also report stats. Therefore we need
85    to pass the task_base struct. */
86 static inline int txhw_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
87 {
88         uint16_t ntx;
89         int ret;
90
91         /* TX vector mode can't transmit more than 32 packets */
92         if (n_pkts > MAX_PMD_TX) {
93                 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, MAX_PMD_TX);
94                 ntx += rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs + ntx, n_pkts - ntx);
95         } else {
96                 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
97         }
98         TASK_STATS_ADD_TX(&tbase->aux->stats, ntx);
99
100         ret =  n_pkts - ntx;
101         if (ntx < n_pkts) {
102                 plog_dbg("Failed to send %d packets from %p\n", ret, mbufs[0]);
103                 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts - ntx);
104                 if (tbase->tx_pkt == tx_pkt_bw) {
105                         uint32_t drop_bytes = 0;
106                         do {
107                                 drop_bytes += mbuf_wire_size(mbufs[ntx]);
108                                 rte_pktmbuf_free(mbufs[ntx++]);
109                         } while (ntx < n_pkts);
110                         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
111                 }
112                 else {
113                         do {
114                                 rte_pktmbuf_free(mbufs[ntx++]);
115                         } while (ntx < n_pkts);
116                 }
117         }
118         return ret;
119 }
120
121 static inline int txhw_no_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
122 {
123         uint16_t ret;
124         uint16_t n = n_pkts;
125
126         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
127         do {
128                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
129                 mbufs += ret;
130                 n_pkts -= ret;
131         }
132         while (n_pkts);
133         return (n != ret);
134 }
135
136 static inline int ring_enq_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
137 {
138         int ret = 0;
139         /* return 0 on succes, -ENOBUFS on failure */
140         // Rings can be single or multiproducer (ctrl rings are multi producer)
141 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
142         if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts))) {
143 #else
144         if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0)) {
145 #endif
146                 ret = n_pkts;
147                 if (tbase->tx_pkt == tx_pkt_bw) {
148                         uint32_t drop_bytes = 0;
149                         for (uint16_t i = 0; i < n_pkts; ++i) {
150                                 drop_bytes += mbuf_wire_size(mbufs[i]);
151                                 rte_pktmbuf_free(mbufs[i]);
152                         }
153                         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
154                         TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
155                 }
156                 else {
157                         for (uint16_t i = 0; i < n_pkts; ++i)
158                                 rte_pktmbuf_free(mbufs[i]);
159                         TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
160                 }
161         }
162         else {
163                 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
164         }
165         return ret;
166 }
167
168 static inline int ring_enq_no_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
169 {
170         int i = 0;
171 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
172         while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts)) {
173 #else
174         while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0) {
175 #endif
176                 i++;
177         };
178         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
179         return (i != 0);
180 }
181
182 void flush_queues_hw(struct task_base *tbase)
183 {
184         uint16_t prod, cons;
185
186         for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
187                 prod = tbase->ws_mbuf->idx[i].prod;
188                 cons = tbase->ws_mbuf->idx[i].cons;
189
190                 if (prod != cons) {
191                         tbase->ws_mbuf->idx[i].prod = 0;
192                         tbase->ws_mbuf->idx[i].cons = 0;
193                         txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
194                 }
195         }
196
197         tbase->flags &= ~FLAG_TX_FLUSH;
198 }
199
200 void flush_queues_sw(struct task_base *tbase)
201 {
202         uint16_t prod, cons;
203
204         for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
205                 prod = tbase->ws_mbuf->idx[i].prod;
206                 cons = tbase->ws_mbuf->idx[i].cons;
207
208                 if (prod != cons) {
209                         tbase->ws_mbuf->idx[i].prod = 0;
210                         tbase->ws_mbuf->idx[i].cons = 0;
211                         ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
212                 }
213         }
214         tbase->flags &= ~FLAG_TX_FLUSH;
215 }
216
217 void flush_queues_no_drop_hw(struct task_base *tbase)
218 {
219         uint16_t prod, cons;
220
221         for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
222                 prod = tbase->ws_mbuf->idx[i].prod;
223                 cons = tbase->ws_mbuf->idx[i].cons;
224
225                 if (prod != cons) {
226                         tbase->ws_mbuf->idx[i].prod = 0;
227                         tbase->ws_mbuf->idx[i].cons = 0;
228                         txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
229                 }
230         }
231
232         tbase->flags &= ~FLAG_TX_FLUSH;
233 }
234
235 void flush_queues_no_drop_sw(struct task_base *tbase)
236 {
237         uint16_t prod, cons;
238
239         for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
240                 prod = tbase->ws_mbuf->idx[i].prod;
241                 cons = tbase->ws_mbuf->idx[i].cons;
242
243                 if (prod != cons) {
244                         tbase->ws_mbuf->idx[i].prod = 0;
245                         tbase->ws_mbuf->idx[i].cons = 0;
246                         ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
247                 }
248         }
249         tbase->flags &= ~FLAG_TX_FLUSH;
250 }
251
252 /* "try" functions try to send packets to sw/hw w/o failing or blocking;
253    They return if ring/queue is full and are used by aggregators.
254    "try" functions do not have drop/no drop flavors
255    They are only implemented in never_discard mode (as by default they
256    use only one outgoing ring. */
257 uint16_t tx_try_self(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
258 {
259         if (n_pkts < 64) {
260                 tx_pkt_never_discard_self(tbase, mbufs, n_pkts, NULL);
261                 return n_pkts;
262         } else {
263                 tx_pkt_never_discard_self(tbase, mbufs, 64, NULL);
264                 return 64;
265         }
266 }
267
268 uint16_t tx_try_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
269 {
270         const int bulk_size = 64;
271         uint16_t ret = bulk_size, sent = 0, n_bulks;
272         n_bulks = n_pkts >> __builtin_ctz(bulk_size);
273
274         for (int i = 0; i < n_bulks; i++) {
275 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
276                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size);
277 #else
278                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size, NULL);
279 #endif
280                 mbufs += ret;
281                 sent += ret;
282                 if (ret != bulk_size)
283                         break;
284         }
285         if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
286 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
287                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)));
288 #else
289                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)), NULL);
290 #endif
291                 mbufs += ret;
292                 sent += ret;
293         }
294         TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
295         return sent;
296 }
297
298 uint16_t tx_try_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
299 {
300         const int bulk_size = 64;
301         uint16_t ret = bulk_size, n_bulks, sent = 0;
302         n_bulks = n_pkts >>  __builtin_ctz(bulk_size);
303
304         const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
305         for (int i = 0; i < n_bulks; i++) {
306                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, bulk_size);
307                 mbufs += ret;
308                 sent += ret;
309                 if (ret != bulk_size)
310                         break;
311         }
312         if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
313                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, (n_pkts & (bulk_size - 1)));
314                 mbufs += ret;
315                 sent += ret;
316         }
317         TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
318         return sent;
319 }
320
321 int tx_pkt_no_drop_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
322 {
323         return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
324 }
325
326 int tx_pkt_no_drop_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
327 {
328         static uint8_t fake_out[MAX_PKT_BURST] = {0};
329         int ret = 0;
330         if (n_pkts == MAX_PKT_BURST) {
331                 // First xmit what was queued
332                 uint16_t prod, cons;
333
334                 prod = tbase->ws_mbuf->idx[0].prod;
335                 cons = tbase->ws_mbuf->idx[0].cons;
336
337                 if ((uint16_t)(prod - cons)){
338                         tbase->flags &= ~FLAG_TX_FLUSH;
339                         tbase->ws_mbuf->idx[0].prod = 0;
340                         tbase->ws_mbuf->idx[0].cons = 0;
341                         ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
342                 }
343                 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
344         } else {
345                 ret+= tx_pkt_no_drop_hw(tbase, mbufs, n_pkts, fake_out);
346         }
347         return ret;
348 }
349
350 int tx_pkt_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
351 {
352         return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
353 }
354
355 int tx_pkt_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
356 {
357         static uint8_t fake_out[MAX_PKT_BURST] = {0};
358         int ret = 0;
359         if (n_pkts == MAX_PKT_BURST) {
360                 // First xmit what was queued
361                 uint16_t prod, cons;
362
363                 prod = tbase->ws_mbuf->idx[0].prod;
364                 cons = tbase->ws_mbuf->idx[0].cons;
365
366                 if ((uint16_t)(prod - cons)){
367                         tbase->flags &= ~FLAG_TX_FLUSH;
368                         tbase->ws_mbuf->idx[0].prod = 0;
369                         tbase->ws_mbuf->idx[0].cons = 0;
370                         ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
371                 }
372                 ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
373         } else {
374                 ret+= tx_pkt_hw(tbase, mbufs, n_pkts, fake_out);
375         }
376         return ret;
377 }
378
379 /* Transmit to hw using tx_params_hw_sw structure
380    This function is used  to transmit to hw when tx_params_hw_sw should be used
381    i.e. when the task needs to transmit both to hw and sw */
382 int tx_pkt_no_drop_never_discard_hw1_no_pointer(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
383 {
384         txhw_no_drop(&tbase->tx_params_hw_sw.tx_port_queue, mbufs, n_pkts, tbase);
385         return 0;
386 }
387
388 int tx_pkt_no_drop_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
389 {
390         return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
391 }
392
393 int tx_pkt_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
394 {
395         return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
396 }
397
398 static uint16_t tx_pkt_free_dropped(__attribute__((unused)) struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
399 {
400         uint64_t v = 0;
401         uint16_t i;
402         /* The most probable and most important optimize case is if
403            the no packets should be dropped. */
404         for (i = 0; i + 8 < n_pkts; i += 8) {
405                 v |= *((uint64_t*)(&out[i]));
406         }
407         for (; i < n_pkts; ++i) {
408                 v |= out[i];
409         }
410
411         if (unlikely(v)) {
412                 /* At least some packets need to be dropped, so the
413                    mbufs array needs to be updated. */
414                 uint16_t n_kept = 0;
415                 uint16_t n_discard = 0;
416                 for (uint16_t i = 0; i < n_pkts; ++i) {
417                         if (unlikely(out[i] >= OUT_HANDLED)) {
418                                 rte_pktmbuf_free(mbufs[i]);
419                                 n_discard += out[i] == OUT_DISCARD;
420                                 continue;
421                         }
422                         mbufs[n_kept++] = mbufs[i];
423                 }
424                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, n_discard);
425                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts - n_kept - n_discard);
426                 return n_kept;
427         }
428         return n_pkts;
429 }
430
431 int tx_pkt_no_drop_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
432 {
433         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
434         int ret = 0;
435
436         if (likely(n_kept))
437                 ret = txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
438         return ret;
439 }
440
441 int tx_pkt_no_drop_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
442 {
443         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
444         int ret = 0;
445
446         if (likely(n_kept))
447                 ret = ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
448         return ret;
449 }
450
451 int tx_pkt_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
452 {
453         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
454
455         if (likely(n_kept))
456                 return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
457         return n_pkts;
458 }
459
460 int tx_pkt_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
461 {
462         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
463
464         if (likely(n_kept))
465                 return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
466         return 0;
467 }
468
469 int tx_pkt_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
470 {
471         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
472
473         TASK_STATS_ADD_TX(&tbase->aux->stats, n_kept);
474         tbase->ws_mbuf->idx[0].nb_rx = n_kept;
475         struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
476         for (uint16_t i = 0; i < n_kept; ++i) {
477                 tx_mbuf[i] = mbufs[i];
478         }
479         return 0;
480 }
481
482 int tx_pkt_never_discard_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
483 {
484         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
485         tbase->ws_mbuf->idx[0].nb_rx = n_pkts;
486         struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
487         for (uint16_t i = 0; i < n_pkts; ++i) {
488                 tx_mbuf[i] = mbufs[i];
489         }
490         return 0;
491 }
492
493 int tx_pkt_no_drop_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
494 {
495         int ret = 0;
496         buf_pkt_all(tbase, mbufs, n_pkts, out);
497
498         const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
499         uint16_t prod, cons;
500
501         for (uint8_t i = 0; i < nb_bufs; ++i) {
502                 prod = tbase->ws_mbuf->idx[i].prod;
503                 cons = tbase->ws_mbuf->idx[i].cons;
504
505                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
506                         tbase->flags &= ~FLAG_TX_FLUSH;
507                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
508                         ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
509                 }
510         }
511         return ret;
512 }
513
514 int tx_pkt_no_drop_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
515 {
516         int ret = 0;
517         buf_pkt_all(tbase, mbufs, n_pkts, out);
518
519         const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
520         uint16_t prod, cons;
521
522         for (uint8_t i = 0; i < nb_bufs; ++i) {
523                 prod = tbase->ws_mbuf->idx[i].prod;
524                 cons = tbase->ws_mbuf->idx[i].cons;
525
526                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
527                         tbase->flags &= ~FLAG_TX_FLUSH;
528                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
529                         ret += ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
530                 }
531         }
532         return ret;
533 }
534
535 int tx_pkt_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
536 {
537         int ret = 0;
538         buf_pkt_all(tbase, mbufs, n_pkts, out);
539
540         const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
541         uint16_t prod, cons;
542
543         for (uint8_t i = 0; i < nb_bufs; ++i) {
544                 prod = tbase->ws_mbuf->idx[i].prod;
545                 cons = tbase->ws_mbuf->idx[i].cons;
546
547                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
548                         tbase->flags &= ~FLAG_TX_FLUSH;
549                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
550                         ret += txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
551                 }
552         }
553         return ret;
554 }
555
556 int tx_pkt_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
557 {
558         int ret = 0;
559         buf_pkt_all(tbase, mbufs, n_pkts, out);
560
561         const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
562         uint16_t prod, cons;
563         for (uint8_t i = 0; i < nb_bufs; ++i) {
564                 prod = tbase->ws_mbuf->idx[i].prod;
565                 cons = tbase->ws_mbuf->idx[i].cons;
566
567                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
568                         tbase->flags &= ~FLAG_TX_FLUSH;
569                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
570                         ret+= ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
571                 }
572         }
573         return ret;
574 }
575
576 static inline void trace_one_rx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf)
577 {
578         struct rte_mbuf tmp;
579         /* For each packet being transmitted, find which
580            buffer represent the packet as it was before
581            processing. */
582         uint32_t j = 0;
583         uint32_t len = sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr)/sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr[0]);
584         for (;j < len; ++j) {
585                 if (tbase->aux->task_rt_dump.pkt_mbuf_addr[j] == mbuf)
586                         break;
587         }
588         if (j != len) {
589 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
590                 tmp.data_off = 0;
591 #endif
592                 rte_pktmbuf_data_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
593                 rte_pktmbuf_pkt_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
594                 tmp.buf_addr = tbase->aux->task_rt_dump.pkt_cpy[j];
595                 plogdx_info(&tmp, "Trace RX: ");
596         }
597 }
598
599 static inline void trace_one_tx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf, uint8_t *out, uint32_t i)
600 {
601         if (out) {
602                 switch(out[i]) {
603                 case 0xFE:
604                         plogdx_info(mbuf, "Handled: ");
605                         break;
606                 case 0xFF:
607                         plogdx_info(mbuf, "Dropped: ");
608                         break;
609                 default:
610                         plogdx_info(mbuf, "TX[%d]: ", out[i]);
611                         break;
612                 }
613         } else if (tbase->aux->tx_pkt_orig == tx_pkt_drop_all) {
614                 plogdx_info(mbuf, "Dropped: ");
615         } else
616                 plogdx_info(mbuf, "TX[0]: ");
617 }
618
619 static void unset_trace(struct task_base *tbase)
620 {
621         if (0 == tbase->aux->task_rt_dump.n_trace) {
622                 if (tbase->tx_pkt == tx_pkt_l3) {
623                         tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
624                         tbase->aux->tx_pkt_orig = NULL;
625                 } else {
626                         tbase->tx_pkt = tbase->aux->tx_pkt_orig;
627                         tbase->aux->tx_pkt_orig = NULL;
628                 }
629                 tbase->aux->task_rt_dump.cur_trace = 0;
630                 task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
631         }
632 }
633
634 int tx_pkt_trace(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
635 {
636         int ret = 0;
637         if (tbase->aux->task_rt_dump.cur_trace == 0) {
638                 // No packet received since dumping...
639                 tbase->aux->task_rt_dump.n_print_tx = tbase->aux->task_rt_dump.n_trace;
640                 if (tbase->aux->task_rt_dump.n_trace < n_pkts) {
641                         tbase->aux->task_rt_dump.n_trace = 0;
642                         tbase->aux->task_rt_dump.cur_trace = 0;
643                         task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
644                 } else {
645                         tbase->aux->task_rt_dump.n_trace -= n_pkts;
646                 }
647                 ret = tx_pkt_dump(tbase, mbufs, n_pkts, out);
648                 tbase->aux->task_rt_dump.n_print_tx = 0;
649                 return ret;
650         }
651         plog_info("Tracing %d pkts\n", tbase->aux->task_rt_dump.cur_trace);
652         uint32_t cur_trace = (n_pkts < tbase->aux->task_rt_dump.cur_trace) ? n_pkts: tbase->aux->task_rt_dump.cur_trace;
653         for (uint32_t i = 0; i < cur_trace; ++i) {
654                 trace_one_rx_pkt(tbase, mbufs[i]);
655                 trace_one_tx_pkt(tbase, mbufs[i], out, i);
656
657         }
658         ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
659
660         unset_trace(tbase);
661         return ret;
662 }
663
664 int tx_pkt_dump(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
665 {
666         uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
667         int ret = 0;
668
669         n_dump = n_pkts < n_dump? n_pkts : n_dump;
670         for (uint32_t i = 0; i < n_dump; ++i) {
671                 if (out) {
672                         switch (out[i]) {
673                         case 0xFE:
674                                 plogdx_info(mbufs[i], "Handled: ");
675                                 break;
676                         case 0xFF:
677                                 plogdx_info(mbufs[i], "Dropped: ");
678                                 break;
679                         default:
680                                 plogdx_info(mbufs[i], "TX[%d]: ", out[i]);
681                                 break;
682                         }
683                 } else
684                         plogdx_info(mbufs[i], "TX: ");
685         }
686         tbase->aux->task_rt_dump.n_print_tx -= n_dump;
687
688         ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
689
690         if (0 == tbase->aux->task_rt_dump.n_print_tx) {
691                 if (tbase->tx_pkt == tx_pkt_l3) {
692                         tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
693                         tbase->aux->tx_pkt_orig = NULL;
694                 } else {
695                         tbase->tx_pkt = tbase->aux->tx_pkt_orig;
696                         tbase->aux->tx_pkt_orig = NULL;
697                 }
698         }
699         return ret;
700 }
701
702 /* Gather the distribution of the number of packets that have been
703    xmitted from one TX call. Since the value is only modified by the
704    task that xmits the packet, no atomic operation is needed. */
705 int tx_pkt_distr(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
706 {
707         if (likely(n_pkts < TX_BUCKET_SIZE))
708                 tbase->aux->tx_bucket[n_pkts]++;
709         else
710                 tbase->aux->tx_bucket[TX_BUCKET_SIZE - 1]++;
711         return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
712 }
713
714 int tx_pkt_bw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
715 {
716         uint32_t tx_bytes = 0;
717         uint32_t drop_bytes = 0;
718
719         for (uint16_t i = 0; i < n_pkts; ++i) {
720                 if (!out || out[i] < OUT_HANDLED)
721                         tx_bytes += mbuf_wire_size(mbufs[i]);
722                 else
723                         drop_bytes += mbuf_wire_size(mbufs[i]);
724         }
725
726         TASK_STATS_ADD_TX_BYTES(&tbase->aux->stats, tx_bytes);
727         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
728         return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
729 }
730
731 int tx_pkt_drop_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
732 {
733         for (uint16_t j = 0; j < n_pkts; ++j) {
734                 rte_pktmbuf_free(mbufs[j]);
735         }
736         if (out == NULL)
737                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts);
738         else {
739                 for (uint16_t j = 0; j < n_pkts; ++j) {
740                         if (out[j] == OUT_HANDLED)
741                                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
742                         else
743                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
744                 }
745         }
746         return n_pkts;
747 }
748
749 static inline int tx_ring_all(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
750 {
751         if (tbase->aux->task_rt_dump.cur_trace) {
752                 trace_one_rx_pkt(tbase, mbuf);
753         }
754         mbuf->udata64 = ((uint64_t)ip << 32) | (core_id << 16) | (task_id << 8) | command;
755         return rte_ring_enqueue(ring, mbuf);
756 }
757
758 void tx_ring_cti(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
759 {
760         plogx_dbg("\tSending command %s with ip %x to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], ip, ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
761         int ret = tx_ring_all(tbase, ring, command,  mbuf, core_id, task_id, ip);
762         if (unlikely(ret != 0)) {
763                 plogx_dbg("\tFail to send command %s with ip %x to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], ip, ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
764                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
765                 rte_pktmbuf_free(mbuf);
766         }
767 }
768
769 void tx_ring_ip(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf, uint32_t ip)
770 {
771         plogx_dbg("\tSending command %s with ip %x to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ip, ring, mbuf, rte_ring_free_count(ring));
772         int ret = tx_ring_all(tbase, ring, command,  mbuf, 0, 0, ip);
773         if (unlikely(ret != 0)) {
774                 plogx_dbg("\tFail to send command %s with ip %x to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ip, ring, mbuf, rte_ring_free_count(ring));
775                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
776                 rte_pktmbuf_free(mbuf);
777         }
778 }
779
780 void tx_ring(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf)
781 {
782         plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
783         int ret = tx_ring_all(tbase, ring, command,  mbuf, 0, 0, 0);
784         if (unlikely(ret != 0)) {
785                 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
786                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
787                 rte_pktmbuf_free(mbuf);
788         }
789 }