Code rewrite and Python3 support
[samplevnf.git] / VNFs / DPPD-PROX / tx_pkt.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_ethdev.h>
18 #include <rte_version.h>
19
20 #include "rx_pkt.h"
21 #include "tx_pkt.h"
22 #include "task_base.h"
23 #include "stats.h"
24 #include "prefetch.h"
25 #include "prox_assert.h"
26 #include "log.h"
27 #include "mbuf_utils.h"
28 #include "handle_master.h"
29
30 static void buf_pkt_single(struct task_base *tbase, struct rte_mbuf *mbuf, const uint8_t out)
31 {
32         const uint16_t prod = tbase->ws_mbuf->idx[out].prod++;
33         tbase->ws_mbuf->mbuf[out][prod & WS_MBUF_MASK] = mbuf;
34 }
35
36 static inline void buf_pkt_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
37 {
38         for (uint16_t j = 0; j < n_pkts; ++j) {
39                 if (unlikely(out[j] >= OUT_HANDLED)) {
40                         rte_pktmbuf_free(mbufs[j]);
41                         if (out[j] == OUT_HANDLED)
42                                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
43                         else
44                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
45                 }
46                 else {
47                         buf_pkt_single(tbase, mbufs[j], out[j]);
48                 }
49         }
50 }
51 #define MAX_PMD_TX 32
52
53 int tx_pkt_l3(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
54 {
55         uint32_t ip_dst;
56         int first = 0, ret, ok = 0, rc;
57         const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
58         struct rte_mbuf *arp_mbuf = NULL;       // used when one need to send both an ARP and a mbuf
59
60         for (int j = 0; j < n_pkts; j++) {
61                 if ((out) && (out[j] >= OUT_HANDLED))
62                         continue;
63                 if (unlikely((rc = write_dst_mac(tbase, mbufs[j], &ip_dst)) != SEND_MBUF)) {
64                         if (j - first) {
65                                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
66                                 ok += ret;
67                         }
68                         first = j + 1;
69                         switch(rc) {
70                         case SEND_ARP:
71                                 // We re-use the mbuf - no need to create a arp_mbuf and delete the existing mbuf
72                                 mbufs[j]->port = tbase->l3.reachable_port_id;
73                                 tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, REQ_MAC_TO_CTRL, mbufs[j], tbase->l3.core_id, tbase->l3.task_id, ip_dst);
74                                 break;
75                         case SEND_MBUF_AND_ARP:
76                                 // We send the mbuf and an ARP - we need to allocate another mbuf for ARP
77                                 ret = rte_mempool_get(tbase->l3.arp_pool, (void **)&arp_mbuf);
78                                 if (likely(ret == 0))   {
79                                         arp_mbuf->port = tbase->l3.reachable_port_id;
80                                         tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, REQ_MAC_TO_CTRL, arp_mbuf, tbase->l3.core_id, tbase->l3.task_id, ip_dst);
81                                 } else {
82                                         plog_err("Failed to get a mbuf from arp mempool\n");
83                                         // We still send the initial mbuf
84                                 }
85                                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + j, 1, out);
86                                 break;
87                         case DROP_MBUF:
88                                 tx_drop(mbufs[j]);
89                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
90                                 break;
91                         }
92                 }
93         }
94         if (n_pkts - first) {
95                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
96                 ok += ret;
97         }
98         return ok;
99 }
100
101 /* The following help functions also report stats. Therefore we need
102    to pass the task_base struct. */
103 static inline int txhw_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
104 {
105         uint16_t ntx;
106         int ret;
107
108         /* TX vector mode can't transmit more than 32 packets */
109         if (n_pkts > MAX_PMD_TX) {
110                 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, MAX_PMD_TX);
111                 ntx += rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs + ntx, n_pkts - ntx);
112         } else {
113                 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
114         }
115         TASK_STATS_ADD_TX(&tbase->aux->stats, ntx);
116
117         ret =  n_pkts - ntx;
118         if (ntx < n_pkts) {
119                 plog_dbg("Failed to send %d packets from %p\n", ret, mbufs[0]);
120                 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts - ntx);
121                 if (tbase->tx_pkt == tx_pkt_bw) {
122                         uint32_t drop_bytes = 0;
123                         do {
124                                 drop_bytes += mbuf_wire_size(mbufs[ntx]);
125                                 rte_pktmbuf_free(mbufs[ntx++]);
126                         } while (ntx < n_pkts);
127                         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
128                 }
129                 else {
130                         do {
131                                 rte_pktmbuf_free(mbufs[ntx++]);
132                         } while (ntx < n_pkts);
133                 }
134         }
135         return ret;
136 }
137
138 static inline int txhw_no_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
139 {
140         uint16_t ret;
141         uint16_t n = n_pkts;
142
143         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
144         do {
145                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
146                 mbufs += ret;
147                 n_pkts -= ret;
148         }
149         while (n_pkts);
150         return (n != ret);
151 }
152
153 static inline int ring_enq_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
154 {
155         int ret = 0;
156         /* return 0 on succes, -ENOBUFS on failure */
157         // Rings can be single or multiproducer (ctrl rings are multi producer)
158 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
159         if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts))) {
160 #else
161         if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0)) {
162 #endif
163                 ret = n_pkts;
164                 if (tbase->tx_pkt == tx_pkt_bw) {
165                         uint32_t drop_bytes = 0;
166                         for (uint16_t i = 0; i < n_pkts; ++i) {
167                                 drop_bytes += mbuf_wire_size(mbufs[i]);
168                                 rte_pktmbuf_free(mbufs[i]);
169                         }
170                         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
171                         TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
172                 }
173                 else {
174                         for (uint16_t i = 0; i < n_pkts; ++i)
175                                 rte_pktmbuf_free(mbufs[i]);
176                         TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
177                 }
178         }
179         else {
180                 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
181         }
182         return ret;
183 }
184
185 static inline int ring_enq_no_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
186 {
187         int i = 0;
188 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
189         while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts)) {
190 #else
191         while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0) {
192 #endif
193                 i++;
194         };
195         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
196         return (i != 0);
197 }
198
199 void flush_queues_hw(struct task_base *tbase)
200 {
201         uint16_t prod, cons;
202
203         for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
204                 prod = tbase->ws_mbuf->idx[i].prod;
205                 cons = tbase->ws_mbuf->idx[i].cons;
206
207                 if (prod != cons) {
208                         tbase->ws_mbuf->idx[i].prod = 0;
209                         tbase->ws_mbuf->idx[i].cons = 0;
210                         txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
211                 }
212         }
213
214         tbase->flags &= ~FLAG_TX_FLUSH;
215 }
216
217 void flush_queues_sw(struct task_base *tbase)
218 {
219         uint16_t prod, cons;
220
221         for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
222                 prod = tbase->ws_mbuf->idx[i].prod;
223                 cons = tbase->ws_mbuf->idx[i].cons;
224
225                 if (prod != cons) {
226                         tbase->ws_mbuf->idx[i].prod = 0;
227                         tbase->ws_mbuf->idx[i].cons = 0;
228                         ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
229                 }
230         }
231         tbase->flags &= ~FLAG_TX_FLUSH;
232 }
233
234 void flush_queues_no_drop_hw(struct task_base *tbase)
235 {
236         uint16_t prod, cons;
237
238         for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
239                 prod = tbase->ws_mbuf->idx[i].prod;
240                 cons = tbase->ws_mbuf->idx[i].cons;
241
242                 if (prod != cons) {
243                         tbase->ws_mbuf->idx[i].prod = 0;
244                         tbase->ws_mbuf->idx[i].cons = 0;
245                         txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
246                 }
247         }
248
249         tbase->flags &= ~FLAG_TX_FLUSH;
250 }
251
252 void flush_queues_no_drop_sw(struct task_base *tbase)
253 {
254         uint16_t prod, cons;
255
256         for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
257                 prod = tbase->ws_mbuf->idx[i].prod;
258                 cons = tbase->ws_mbuf->idx[i].cons;
259
260                 if (prod != cons) {
261                         tbase->ws_mbuf->idx[i].prod = 0;
262                         tbase->ws_mbuf->idx[i].cons = 0;
263                         ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
264                 }
265         }
266         tbase->flags &= ~FLAG_TX_FLUSH;
267 }
268
269 /* "try" functions try to send packets to sw/hw w/o failing or blocking;
270    They return if ring/queue is full and are used by aggregators.
271    "try" functions do not have drop/no drop flavors
272    They are only implemented in never_discard mode (as by default they
273    use only one outgoing ring. */
274 uint16_t tx_try_self(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
275 {
276         if (n_pkts < 64) {
277                 tx_pkt_never_discard_self(tbase, mbufs, n_pkts, NULL);
278                 return n_pkts;
279         } else {
280                 tx_pkt_never_discard_self(tbase, mbufs, 64, NULL);
281                 return 64;
282         }
283 }
284
285 uint16_t tx_try_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
286 {
287         const int bulk_size = 64;
288         uint16_t ret = bulk_size, sent = 0, n_bulks;
289         n_bulks = n_pkts >> __builtin_ctz(bulk_size);
290
291         for (int i = 0; i < n_bulks; i++) {
292 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
293                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size);
294 #else
295                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size, NULL);
296 #endif
297                 mbufs += ret;
298                 sent += ret;
299                 if (ret != bulk_size)
300                         break;
301         }
302         if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
303 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
304                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)));
305 #else
306                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)), NULL);
307 #endif
308                 mbufs += ret;
309                 sent += ret;
310         }
311         TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
312         return sent;
313 }
314
315 uint16_t tx_try_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
316 {
317         const int bulk_size = 64;
318         uint16_t ret = bulk_size, n_bulks, sent = 0;
319         n_bulks = n_pkts >>  __builtin_ctz(bulk_size);
320
321         const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
322         for (int i = 0; i < n_bulks; i++) {
323                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, bulk_size);
324                 mbufs += ret;
325                 sent += ret;
326                 if (ret != bulk_size)
327                         break;
328         }
329         if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
330                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, (n_pkts & (bulk_size - 1)));
331                 mbufs += ret;
332                 sent += ret;
333         }
334         TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
335         return sent;
336 }
337
338 int tx_pkt_no_drop_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
339 {
340         return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
341 }
342
343 int tx_pkt_no_drop_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
344 {
345         static uint8_t fake_out[MAX_PKT_BURST] = {0};
346         int ret = 0;
347         if (n_pkts == MAX_PKT_BURST) {
348                 // First xmit what was queued
349                 uint16_t prod, cons;
350
351                 prod = tbase->ws_mbuf->idx[0].prod;
352                 cons = tbase->ws_mbuf->idx[0].cons;
353
354                 if ((uint16_t)(prod - cons)){
355                         tbase->flags &= ~FLAG_TX_FLUSH;
356                         tbase->ws_mbuf->idx[0].prod = 0;
357                         tbase->ws_mbuf->idx[0].cons = 0;
358                         ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
359                 }
360                 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
361         } else {
362                 ret+= tx_pkt_no_drop_hw(tbase, mbufs, n_pkts, fake_out);
363         }
364         return ret;
365 }
366
367 int tx_pkt_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
368 {
369         return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
370 }
371
372 int tx_pkt_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
373 {
374         static uint8_t fake_out[MAX_PKT_BURST] = {0};
375         int ret = 0;
376         if (n_pkts == MAX_PKT_BURST) {
377                 // First xmit what was queued
378                 uint16_t prod, cons;
379
380                 prod = tbase->ws_mbuf->idx[0].prod;
381                 cons = tbase->ws_mbuf->idx[0].cons;
382
383                 if ((uint16_t)(prod - cons)){
384                         tbase->flags &= ~FLAG_TX_FLUSH;
385                         tbase->ws_mbuf->idx[0].prod = 0;
386                         tbase->ws_mbuf->idx[0].cons = 0;
387                         ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
388                 }
389                 ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
390         } else {
391                 ret+= tx_pkt_hw(tbase, mbufs, n_pkts, fake_out);
392         }
393         return ret;
394 }
395
396 /* Transmit to hw using tx_params_hw_sw structure
397    This function is used  to transmit to hw when tx_params_hw_sw should be used
398    i.e. when the task needs to transmit both to hw and sw */
399 int tx_pkt_no_drop_never_discard_hw1_no_pointer(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
400 {
401         txhw_no_drop(&tbase->tx_params_hw_sw.tx_port_queue, mbufs, n_pkts, tbase);
402         return 0;
403 }
404
405 int tx_pkt_no_drop_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
406 {
407         return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
408 }
409
410 int tx_pkt_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
411 {
412         return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
413 }
414
415 static uint16_t tx_pkt_free_dropped(__attribute__((unused)) struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
416 {
417         uint64_t v = 0;
418         uint16_t i;
419         /* The most probable and most important optimize case is if
420            the no packets should be dropped. */
421         for (i = 0; i + 8 < n_pkts; i += 8) {
422                 v |= *((uint64_t*)(&out[i]));
423         }
424         for (; i < n_pkts; ++i) {
425                 v |= out[i];
426         }
427
428         if (unlikely(v)) {
429                 /* At least some packets need to be dropped, so the
430                    mbufs array needs to be updated. */
431                 uint16_t n_kept = 0;
432                 uint16_t n_discard = 0;
433                 for (uint16_t i = 0; i < n_pkts; ++i) {
434                         if (unlikely(out[i] >= OUT_HANDLED)) {
435                                 rte_pktmbuf_free(mbufs[i]);
436                                 n_discard += out[i] == OUT_DISCARD;
437                                 continue;
438                         }
439                         mbufs[n_kept++] = mbufs[i];
440                 }
441                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, n_discard);
442                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts - n_kept - n_discard);
443                 return n_kept;
444         }
445         return n_pkts;
446 }
447
448 int tx_pkt_no_drop_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
449 {
450         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
451         int ret = 0;
452
453         if (likely(n_kept))
454                 ret = txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
455         return ret;
456 }
457
458 int tx_pkt_no_drop_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
459 {
460         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
461         int ret = 0;
462
463         if (likely(n_kept))
464                 ret = ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
465         return ret;
466 }
467
468 int tx_pkt_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
469 {
470         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
471
472         if (likely(n_kept))
473                 return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
474         return n_pkts;
475 }
476
477 int tx_pkt_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
478 {
479         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
480
481         if (likely(n_kept))
482                 return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
483         return 0;
484 }
485
486 int tx_pkt_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
487 {
488         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
489
490         TASK_STATS_ADD_TX(&tbase->aux->stats, n_kept);
491         tbase->ws_mbuf->idx[0].nb_rx = n_kept;
492         struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
493         for (uint16_t i = 0; i < n_kept; ++i) {
494                 tx_mbuf[i] = mbufs[i];
495         }
496         return 0;
497 }
498
499 int tx_pkt_never_discard_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
500 {
501         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
502         tbase->ws_mbuf->idx[0].nb_rx = n_pkts;
503         struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
504         for (uint16_t i = 0; i < n_pkts; ++i) {
505                 tx_mbuf[i] = mbufs[i];
506         }
507         return 0;
508 }
509
510 int tx_pkt_no_drop_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
511 {
512         int ret = 0;
513         buf_pkt_all(tbase, mbufs, n_pkts, out);
514
515         const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
516         uint16_t prod, cons;
517
518         for (uint8_t i = 0; i < nb_bufs; ++i) {
519                 prod = tbase->ws_mbuf->idx[i].prod;
520                 cons = tbase->ws_mbuf->idx[i].cons;
521
522                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
523                         tbase->flags &= ~FLAG_TX_FLUSH;
524                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
525                         ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
526                 }
527         }
528         return ret;
529 }
530
531 int tx_pkt_no_drop_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
532 {
533         int ret = 0;
534         buf_pkt_all(tbase, mbufs, n_pkts, out);
535
536         const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
537         uint16_t prod, cons;
538
539         for (uint8_t i = 0; i < nb_bufs; ++i) {
540                 prod = tbase->ws_mbuf->idx[i].prod;
541                 cons = tbase->ws_mbuf->idx[i].cons;
542
543                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
544                         tbase->flags &= ~FLAG_TX_FLUSH;
545                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
546                         ret += ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
547                 }
548         }
549         return ret;
550 }
551
552 int tx_pkt_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
553 {
554         int ret = 0;
555         buf_pkt_all(tbase, mbufs, n_pkts, out);
556
557         const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
558         uint16_t prod, cons;
559
560         for (uint8_t i = 0; i < nb_bufs; ++i) {
561                 prod = tbase->ws_mbuf->idx[i].prod;
562                 cons = tbase->ws_mbuf->idx[i].cons;
563
564                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
565                         tbase->flags &= ~FLAG_TX_FLUSH;
566                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
567                         ret += txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
568                 }
569         }
570         return ret;
571 }
572
573 int tx_pkt_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
574 {
575         int ret = 0;
576         buf_pkt_all(tbase, mbufs, n_pkts, out);
577
578         const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
579         uint16_t prod, cons;
580         for (uint8_t i = 0; i < nb_bufs; ++i) {
581                 prod = tbase->ws_mbuf->idx[i].prod;
582                 cons = tbase->ws_mbuf->idx[i].cons;
583
584                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
585                         tbase->flags &= ~FLAG_TX_FLUSH;
586                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
587                         ret+= ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
588                 }
589         }
590         return ret;
591 }
592
593 static inline void trace_one_rx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf)
594 {
595         struct rte_mbuf tmp;
596         /* For each packet being transmitted, find which
597            buffer represent the packet as it was before
598            processing. */
599         uint32_t j = 0;
600         uint32_t len = sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr)/sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr[0]);
601         for (;j < len; ++j) {
602                 if (tbase->aux->task_rt_dump.pkt_mbuf_addr[j] == mbuf)
603                         break;
604         }
605         if (j != len) {
606 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
607                 tmp.data_off = 0;
608 #endif
609                 rte_pktmbuf_data_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
610                 rte_pktmbuf_pkt_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
611                 tmp.buf_addr = tbase->aux->task_rt_dump.pkt_cpy[j];
612                 plogdx_info(&tmp, "Trace RX: ");
613         }
614 }
615
616 static inline void trace_one_tx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf, uint8_t *out, uint32_t i)
617 {
618         if (out) {
619                 switch(out[i]) {
620                 case 0xFE:
621                         plogdx_info(mbuf, "Handled: ");
622                         break;
623                 case 0xFF:
624                         plogdx_info(mbuf, "Dropped: ");
625                         break;
626                 default:
627                         plogdx_info(mbuf, "TX[%d]: ", out[i]);
628                         break;
629                 }
630         } else if (tbase->aux->tx_pkt_orig == tx_pkt_drop_all) {
631                 plogdx_info(mbuf, "Dropped: ");
632         } else
633                 plogdx_info(mbuf, "TX[0]: ");
634 }
635
636 static void unset_trace(struct task_base *tbase)
637 {
638         if (0 == tbase->aux->task_rt_dump.n_trace) {
639                 if (tbase->tx_pkt == tx_pkt_l3) {
640                         tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
641                         tbase->aux->tx_pkt_orig = NULL;
642                 } else {
643                         tbase->tx_pkt = tbase->aux->tx_pkt_orig;
644                         tbase->aux->tx_pkt_orig = NULL;
645                 }
646                 tbase->aux->task_rt_dump.cur_trace = 0;
647                 task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
648         }
649 }
650
651 int tx_pkt_trace(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
652 {
653         int ret = 0;
654         if (tbase->aux->task_rt_dump.cur_trace == 0) {
655                 // No packet received since dumping...
656                 tbase->aux->task_rt_dump.n_print_tx = tbase->aux->task_rt_dump.n_trace;
657                 if (tbase->aux->task_rt_dump.n_trace < n_pkts) {
658                         tbase->aux->task_rt_dump.n_trace = 0;
659                         tbase->aux->task_rt_dump.cur_trace = 0;
660                         task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
661                 } else {
662                         tbase->aux->task_rt_dump.n_trace -= n_pkts;
663                 }
664                 ret = tx_pkt_dump(tbase, mbufs, n_pkts, out);
665                 tbase->aux->task_rt_dump.n_print_tx = 0;
666                 return ret;
667         }
668         plog_info("Tracing %d pkts\n", tbase->aux->task_rt_dump.cur_trace);
669         uint32_t cur_trace = (n_pkts < tbase->aux->task_rt_dump.cur_trace) ? n_pkts: tbase->aux->task_rt_dump.cur_trace;
670         for (uint32_t i = 0; i < cur_trace; ++i) {
671                 trace_one_rx_pkt(tbase, mbufs[i]);
672                 trace_one_tx_pkt(tbase, mbufs[i], out, i);
673
674         }
675         ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
676
677         unset_trace(tbase);
678         return ret;
679 }
680
681 int tx_pkt_dump(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
682 {
683         uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
684         int ret = 0;
685
686         n_dump = n_pkts < n_dump? n_pkts : n_dump;
687         for (uint32_t i = 0; i < n_dump; ++i) {
688                 if (out) {
689                         switch (out[i]) {
690                         case 0xFE:
691                                 plogdx_info(mbufs[i], "Handled: ");
692                                 break;
693                         case 0xFF:
694                                 plogdx_info(mbufs[i], "Dropped: ");
695                                 break;
696                         default:
697                                 plogdx_info(mbufs[i], "TX[%d]: ", out[i]);
698                                 break;
699                         }
700                 } else
701                         plogdx_info(mbufs[i], "TX: ");
702         }
703         tbase->aux->task_rt_dump.n_print_tx -= n_dump;
704
705         ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
706
707         if (0 == tbase->aux->task_rt_dump.n_print_tx) {
708                 if (tbase->tx_pkt == tx_pkt_l3) {
709                         tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
710                         tbase->aux->tx_pkt_orig = NULL;
711                 } else {
712                         tbase->tx_pkt = tbase->aux->tx_pkt_orig;
713                         tbase->aux->tx_pkt_orig = NULL;
714                 }
715         }
716         return ret;
717 }
718
719 /* Gather the distribution of the number of packets that have been
720    xmitted from one TX call. Since the value is only modified by the
721    task that xmits the packet, no atomic operation is needed. */
722 int tx_pkt_distr(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
723 {
724         if (likely(n_pkts < TX_BUCKET_SIZE))
725                 tbase->aux->tx_bucket[n_pkts]++;
726         else
727                 tbase->aux->tx_bucket[TX_BUCKET_SIZE - 1]++;
728         return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
729 }
730
731 int tx_pkt_bw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
732 {
733         uint32_t tx_bytes = 0;
734         uint32_t drop_bytes = 0;
735
736         for (uint16_t i = 0; i < n_pkts; ++i) {
737                 if (!out || out[i] < OUT_HANDLED)
738                         tx_bytes += mbuf_wire_size(mbufs[i]);
739                 else
740                         drop_bytes += mbuf_wire_size(mbufs[i]);
741         }
742
743         TASK_STATS_ADD_TX_BYTES(&tbase->aux->stats, tx_bytes);
744         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
745         return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
746 }
747
748 int tx_pkt_drop_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
749 {
750         for (uint16_t j = 0; j < n_pkts; ++j) {
751                 rte_pktmbuf_free(mbufs[j]);
752         }
753         if (out == NULL)
754                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts);
755         else {
756                 for (uint16_t j = 0; j < n_pkts; ++j) {
757                         if (out[j] == OUT_HANDLED)
758                                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
759                         else
760                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
761                 }
762         }
763         return n_pkts;
764 }
765 static inline void dump_pkts(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
766 {
767         uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
768         uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
769
770         if (unlikely(n_dump)) {
771                 n_dump = n_pkts < n_dump? n_pkts : n_dump;
772                 for (uint32_t i = 0; i < n_dump; ++i) {
773                         plogdx_info(mbufs[i], "TX: ");
774                 }
775                 tbase->aux->task_rt_dump.n_print_tx -= n_dump;
776         } else if (unlikely(n_trace)) {
777                 n_trace = n_pkts < n_trace? n_pkts : n_trace;
778                 for (uint32_t i = 0; i < n_trace; ++i) {
779                         plogdx_info(mbufs[i], "TX: ");
780                 }
781                 tbase->aux->task_rt_dump.n_trace -= n_trace;
782         }
783 }
784
785 // ctrlplane packets are slow path, hence cost of checking if dump ortrace is needed in not too important
786 // easier to have this implementation than an implementation similar to dataplane tx
787 int tx_ctrlplane_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
788 {
789         dump_pkts(tbase, mbufs, n_pkts);
790         return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
791 }
792
793 int tx_ctrlplane_sw(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
794 {
795         dump_pkts(tbase, mbufs, n_pkts);
796         return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
797 }
798
799 static inline int tx_ring_all(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
800 {
801         if (tbase->aux->task_rt_dump.cur_trace) {
802                 trace_one_rx_pkt(tbase, mbuf);
803         }
804         mbuf->udata64 = ((uint64_t)ip << 32) | (core_id << 16) | (task_id << 8) | command;
805         return rte_ring_enqueue(ring, mbuf);
806 }
807
808 void tx_ring_cti(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
809 {
810         plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
811         int ret = tx_ring_all(tbase, ring, command,  mbuf, core_id, task_id, ip);
812         if (unlikely(ret != 0)) {
813                 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
814                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
815                 rte_pktmbuf_free(mbuf);
816         }
817 }
818
819 void tx_ring_ip(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf, uint32_t ip)
820 {
821         plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
822         int ret = tx_ring_all(tbase, ring, command,  mbuf, 0, 0, ip);
823         if (unlikely(ret != 0)) {
824                 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
825                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
826                 rte_pktmbuf_free(mbuf);
827         }
828 }
829
830 void tx_ring(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf)
831 {
832         plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
833         int ret = tx_ring_all(tbase, ring, command,  mbuf, 0, 0, 0);
834         if (unlikely(ret != 0)) {
835                 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
836                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
837                 rte_pktmbuf_free(mbuf);
838         }
839 }