51c1afa721807c031bc35b10fbadd3958e0d9b2d
[samplevnf.git] / VNFs / DPPD-PROX / tx_pkt.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_ethdev.h>
18 #include <rte_version.h>
19
20 #include "rx_pkt.h"
21 #include "tx_pkt.h"
22 #include "task_base.h"
23 #include "stats.h"
24 #include "prefetch.h"
25 #include "prox_assert.h"
26 #include "log.h"
27 #include "mbuf_utils.h"
28 #include "handle_master.h"
29
30 static void buf_pkt_single(struct task_base *tbase, struct rte_mbuf *mbuf, const uint8_t out)
31 {
32         const uint16_t prod = tbase->ws_mbuf->idx[out].prod++;
33         tbase->ws_mbuf->mbuf[out][prod & WS_MBUF_MASK] = mbuf;
34 }
35
36 static inline void buf_pkt_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
37 {
38         for (uint16_t j = 0; j < n_pkts; ++j) {
39                 if (unlikely(out[j] >= OUT_HANDLED)) {
40                         rte_pktmbuf_free(mbufs[j]);
41                         if (out[j] == OUT_HANDLED)
42                                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
43                         else
44                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
45                 }
46                 else {
47                         buf_pkt_single(tbase, mbufs[j], out[j]);
48                 }
49         }
50 }
51 #define MAX_PMD_TX 32
52
53 int tx_pkt_l3(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
54 {
55         uint32_t ip_dst;
56         int first = 0, ret, ok = 0, rc;
57         const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
58         struct rte_mbuf *arp_mbuf = NULL;       // used when one need to send both an ARP and a mbuf
59         uint64_t *time;
60         uint64_t tsc = rte_rdtsc();
61
62         for (int j = 0; j < n_pkts; j++) {
63                 if ((out) && (out[j] >= OUT_HANDLED))
64                         continue;
65                 if (unlikely((rc = write_dst_mac(tbase, mbufs[j], &ip_dst, &time, tsc)) != SEND_MBUF)) {
66                         if (j - first) {
67                                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
68                                 ok += ret;
69                         }
70                         first = j + 1;
71                         switch(rc) {
72                         case SEND_ARP:
73                                 // We re-use the mbuf - no need to create a arp_mbuf and delete the existing mbuf
74                                 mbufs[j]->port = tbase->l3.reachable_port_id;
75                                 if (tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, REQ_MAC_TO_CTRL, mbufs[j], tbase->l3.core_id, tbase->l3.task_id, ip_dst) == 0)
76                                         update_arp_update_time(&tbase->l3, time, 1000);
77                                 else
78                                         update_arp_update_time(&tbase->l3, time, 100);
79                                 break;
80                         case SEND_MBUF_AND_ARP:
81                                 // We send the mbuf and an ARP - we need to allocate another mbuf for ARP
82                                 ret = rte_mempool_get(tbase->l3.arp_pool, (void **)&arp_mbuf);
83                                 if (likely(ret == 0))   {
84                                         arp_mbuf->port = tbase->l3.reachable_port_id;
85                                         if (tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, REQ_MAC_TO_CTRL, arp_mbuf, tbase->l3.core_id, tbase->l3.task_id, ip_dst) == 0)
86                                                 update_arp_update_time(&tbase->l3, time, 1000);
87                                         else
88                                                 update_arp_update_time(&tbase->l3, time, 100);
89                                 } else {
90                                         plog_err("Failed to get a mbuf from arp mempool\n");
91                                         // We still send the initial mbuf
92                                 }
93                                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + j, 1, out);
94                                 break;
95                         case DROP_MBUF:
96                                 tx_drop(mbufs[j]);
97                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
98                                 break;
99                         }
100                 }
101         }
102         if (n_pkts - first) {
103                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
104                 ok += ret;
105         }
106         return ok;
107 }
108
109 /* The following help functions also report stats. Therefore we need
110    to pass the task_base struct. */
111 static inline int txhw_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
112 {
113         uint16_t ntx;
114         int ret;
115
116         /* TX vector mode can't transmit more than 32 packets */
117         if (n_pkts > MAX_PMD_TX) {
118                 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, MAX_PMD_TX);
119                 ntx += rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs + ntx, n_pkts - ntx);
120         } else {
121                 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
122         }
123         TASK_STATS_ADD_TX(&tbase->aux->stats, ntx);
124
125         ret =  n_pkts - ntx;
126         if (ntx < n_pkts) {
127                 plog_dbg("Failed to send %d packets from %p\n", ret, mbufs[0]);
128                 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts - ntx);
129                 if (tbase->tx_pkt == tx_pkt_bw) {
130                         uint32_t drop_bytes = 0;
131                         do {
132                                 drop_bytes += mbuf_wire_size(mbufs[ntx]);
133                                 rte_pktmbuf_free(mbufs[ntx++]);
134                         } while (ntx < n_pkts);
135                         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
136                 }
137                 else {
138                         do {
139                                 rte_pktmbuf_free(mbufs[ntx++]);
140                         } while (ntx < n_pkts);
141                 }
142         }
143         return ret;
144 }
145
146 static inline int txhw_no_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
147 {
148         uint16_t ret;
149         uint16_t n = n_pkts;
150
151         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
152         do {
153                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
154                 mbufs += ret;
155                 n_pkts -= ret;
156         }
157         while (n_pkts);
158         return (n != ret);
159 }
160
161 static inline int ring_enq_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
162 {
163         int ret = 0;
164         /* return 0 on succes, -ENOBUFS on failure */
165         // Rings can be single or multiproducer (ctrl rings are multi producer)
166 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
167         if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts))) {
168 #else
169         if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0)) {
170 #endif
171                 ret = n_pkts;
172                 if (tbase->tx_pkt == tx_pkt_bw) {
173                         uint32_t drop_bytes = 0;
174                         for (uint16_t i = 0; i < n_pkts; ++i) {
175                                 drop_bytes += mbuf_wire_size(mbufs[i]);
176                                 rte_pktmbuf_free(mbufs[i]);
177                         }
178                         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
179                         TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
180                 }
181                 else {
182                         for (uint16_t i = 0; i < n_pkts; ++i)
183                                 rte_pktmbuf_free(mbufs[i]);
184                         TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
185                 }
186         }
187         else {
188                 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
189         }
190         return ret;
191 }
192
193 static inline int ring_enq_no_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
194 {
195         int i = 0;
196 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
197         while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts)) {
198 #else
199         while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0) {
200 #endif
201                 i++;
202         };
203         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
204         return (i != 0);
205 }
206
207 void flush_queues_hw(struct task_base *tbase)
208 {
209         uint16_t prod, cons;
210
211         for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
212                 prod = tbase->ws_mbuf->idx[i].prod;
213                 cons = tbase->ws_mbuf->idx[i].cons;
214
215                 if (prod != cons) {
216                         tbase->ws_mbuf->idx[i].prod = 0;
217                         tbase->ws_mbuf->idx[i].cons = 0;
218                         txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
219                 }
220         }
221
222         tbase->flags &= ~FLAG_TX_FLUSH;
223 }
224
225 void flush_queues_sw(struct task_base *tbase)
226 {
227         uint16_t prod, cons;
228
229         for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
230                 prod = tbase->ws_mbuf->idx[i].prod;
231                 cons = tbase->ws_mbuf->idx[i].cons;
232
233                 if (prod != cons) {
234                         tbase->ws_mbuf->idx[i].prod = 0;
235                         tbase->ws_mbuf->idx[i].cons = 0;
236                         ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
237                 }
238         }
239         tbase->flags &= ~FLAG_TX_FLUSH;
240 }
241
242 void flush_queues_no_drop_hw(struct task_base *tbase)
243 {
244         uint16_t prod, cons;
245
246         for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
247                 prod = tbase->ws_mbuf->idx[i].prod;
248                 cons = tbase->ws_mbuf->idx[i].cons;
249
250                 if (prod != cons) {
251                         tbase->ws_mbuf->idx[i].prod = 0;
252                         tbase->ws_mbuf->idx[i].cons = 0;
253                         txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
254                 }
255         }
256
257         tbase->flags &= ~FLAG_TX_FLUSH;
258 }
259
260 void flush_queues_no_drop_sw(struct task_base *tbase)
261 {
262         uint16_t prod, cons;
263
264         for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
265                 prod = tbase->ws_mbuf->idx[i].prod;
266                 cons = tbase->ws_mbuf->idx[i].cons;
267
268                 if (prod != cons) {
269                         tbase->ws_mbuf->idx[i].prod = 0;
270                         tbase->ws_mbuf->idx[i].cons = 0;
271                         ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
272                 }
273         }
274         tbase->flags &= ~FLAG_TX_FLUSH;
275 }
276
277 /* "try" functions try to send packets to sw/hw w/o failing or blocking;
278    They return if ring/queue is full and are used by aggregators.
279    "try" functions do not have drop/no drop flavors
280    They are only implemented in never_discard mode (as by default they
281    use only one outgoing ring. */
282 uint16_t tx_try_self(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
283 {
284         if (n_pkts < 64) {
285                 tx_pkt_never_discard_self(tbase, mbufs, n_pkts, NULL);
286                 return n_pkts;
287         } else {
288                 tx_pkt_never_discard_self(tbase, mbufs, 64, NULL);
289                 return 64;
290         }
291 }
292
293 uint16_t tx_try_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
294 {
295         const int bulk_size = 64;
296         uint16_t ret = bulk_size, sent = 0, n_bulks;
297         n_bulks = n_pkts >> __builtin_ctz(bulk_size);
298
299         for (int i = 0; i < n_bulks; i++) {
300 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
301                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size);
302 #else
303                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size, NULL);
304 #endif
305                 mbufs += ret;
306                 sent += ret;
307                 if (ret != bulk_size)
308                         break;
309         }
310         if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
311 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
312                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)));
313 #else
314                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)), NULL);
315 #endif
316                 mbufs += ret;
317                 sent += ret;
318         }
319         TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
320         return sent;
321 }
322
323 uint16_t tx_try_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
324 {
325         const int bulk_size = 64;
326         uint16_t ret = bulk_size, n_bulks, sent = 0;
327         n_bulks = n_pkts >>  __builtin_ctz(bulk_size);
328
329         const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
330         for (int i = 0; i < n_bulks; i++) {
331                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, bulk_size);
332                 mbufs += ret;
333                 sent += ret;
334                 if (ret != bulk_size)
335                         break;
336         }
337         if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
338                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, (n_pkts & (bulk_size - 1)));
339                 mbufs += ret;
340                 sent += ret;
341         }
342         TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
343         return sent;
344 }
345
346 int tx_pkt_no_drop_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
347 {
348         return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
349 }
350
351 int tx_pkt_no_drop_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
352 {
353         static uint8_t fake_out[MAX_PKT_BURST] = {0};
354         int ret = 0;
355         if (n_pkts == MAX_PKT_BURST) {
356                 // First xmit what was queued
357                 uint16_t prod, cons;
358
359                 prod = tbase->ws_mbuf->idx[0].prod;
360                 cons = tbase->ws_mbuf->idx[0].cons;
361
362                 if ((uint16_t)(prod - cons)){
363                         tbase->flags &= ~FLAG_TX_FLUSH;
364                         tbase->ws_mbuf->idx[0].prod = 0;
365                         tbase->ws_mbuf->idx[0].cons = 0;
366                         ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
367                 }
368                 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
369         } else {
370                 ret+= tx_pkt_no_drop_hw(tbase, mbufs, n_pkts, fake_out);
371         }
372         return ret;
373 }
374
375 int tx_pkt_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
376 {
377         return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
378 }
379
380 int tx_pkt_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
381 {
382         static uint8_t fake_out[MAX_PKT_BURST] = {0};
383         int ret = 0;
384         if (n_pkts == MAX_PKT_BURST) {
385                 // First xmit what was queued
386                 uint16_t prod, cons;
387
388                 prod = tbase->ws_mbuf->idx[0].prod;
389                 cons = tbase->ws_mbuf->idx[0].cons;
390
391                 if ((uint16_t)(prod - cons)){
392                         tbase->flags &= ~FLAG_TX_FLUSH;
393                         tbase->ws_mbuf->idx[0].prod = 0;
394                         tbase->ws_mbuf->idx[0].cons = 0;
395                         ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
396                 }
397                 ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
398         } else {
399                 ret+= tx_pkt_hw(tbase, mbufs, n_pkts, fake_out);
400         }
401         return ret;
402 }
403
404 /* Transmit to hw using tx_params_hw_sw structure
405    This function is used  to transmit to hw when tx_params_hw_sw should be used
406    i.e. when the task needs to transmit both to hw and sw */
407 int tx_pkt_no_drop_never_discard_hw1_no_pointer(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
408 {
409         txhw_no_drop(&tbase->tx_params_hw_sw.tx_port_queue, mbufs, n_pkts, tbase);
410         return 0;
411 }
412
413 int tx_pkt_no_drop_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
414 {
415         return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
416 }
417
418 int tx_pkt_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
419 {
420         return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
421 }
422
423 static uint16_t tx_pkt_free_dropped(__attribute__((unused)) struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
424 {
425         uint64_t v = 0;
426         uint16_t i;
427         /* The most probable and most important optimize case is if
428            the no packets should be dropped. */
429         for (i = 0; i + 8 < n_pkts; i += 8) {
430                 v |= *((uint64_t*)(&out[i]));
431         }
432         for (; i < n_pkts; ++i) {
433                 v |= out[i];
434         }
435
436         if (unlikely(v)) {
437                 /* At least some packets need to be dropped, so the
438                    mbufs array needs to be updated. */
439                 uint16_t n_kept = 0;
440                 uint16_t n_discard = 0;
441                 for (uint16_t i = 0; i < n_pkts; ++i) {
442                         if (unlikely(out[i] >= OUT_HANDLED)) {
443                                 rte_pktmbuf_free(mbufs[i]);
444                                 n_discard += out[i] == OUT_DISCARD;
445                                 continue;
446                         }
447                         mbufs[n_kept++] = mbufs[i];
448                 }
449                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, n_discard);
450                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts - n_kept - n_discard);
451                 return n_kept;
452         }
453         return n_pkts;
454 }
455
456 int tx_pkt_no_drop_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
457 {
458         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
459         int ret = 0;
460
461         if (likely(n_kept))
462                 ret = txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
463         return ret;
464 }
465
466 int tx_pkt_no_drop_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
467 {
468         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
469         int ret = 0;
470
471         if (likely(n_kept))
472                 ret = ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
473         return ret;
474 }
475
476 int tx_pkt_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
477 {
478         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
479
480         if (likely(n_kept))
481                 return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
482         return n_pkts;
483 }
484
485 int tx_pkt_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
486 {
487         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
488
489         if (likely(n_kept))
490                 return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
491         return 0;
492 }
493
494 int tx_pkt_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
495 {
496         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
497
498         TASK_STATS_ADD_TX(&tbase->aux->stats, n_kept);
499         tbase->ws_mbuf->idx[0].nb_rx = n_kept;
500         struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
501         for (uint16_t i = 0; i < n_kept; ++i) {
502                 tx_mbuf[i] = mbufs[i];
503         }
504         return 0;
505 }
506
507 int tx_pkt_never_discard_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
508 {
509         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
510         tbase->ws_mbuf->idx[0].nb_rx = n_pkts;
511         struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
512         for (uint16_t i = 0; i < n_pkts; ++i) {
513                 tx_mbuf[i] = mbufs[i];
514         }
515         return 0;
516 }
517
518 int tx_pkt_no_drop_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
519 {
520         int ret = 0;
521         buf_pkt_all(tbase, mbufs, n_pkts, out);
522
523         const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
524         uint16_t prod, cons;
525
526         for (uint8_t i = 0; i < nb_bufs; ++i) {
527                 prod = tbase->ws_mbuf->idx[i].prod;
528                 cons = tbase->ws_mbuf->idx[i].cons;
529
530                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
531                         tbase->flags &= ~FLAG_TX_FLUSH;
532                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
533                         ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
534                 }
535         }
536         return ret;
537 }
538
539 int tx_pkt_no_drop_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
540 {
541         int ret = 0;
542         buf_pkt_all(tbase, mbufs, n_pkts, out);
543
544         const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
545         uint16_t prod, cons;
546
547         for (uint8_t i = 0; i < nb_bufs; ++i) {
548                 prod = tbase->ws_mbuf->idx[i].prod;
549                 cons = tbase->ws_mbuf->idx[i].cons;
550
551                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
552                         tbase->flags &= ~FLAG_TX_FLUSH;
553                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
554                         ret += ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
555                 }
556         }
557         return ret;
558 }
559
560 int tx_pkt_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
561 {
562         int ret = 0;
563         buf_pkt_all(tbase, mbufs, n_pkts, out);
564
565         const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
566         uint16_t prod, cons;
567
568         for (uint8_t i = 0; i < nb_bufs; ++i) {
569                 prod = tbase->ws_mbuf->idx[i].prod;
570                 cons = tbase->ws_mbuf->idx[i].cons;
571
572                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
573                         tbase->flags &= ~FLAG_TX_FLUSH;
574                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
575                         ret += txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
576                 }
577         }
578         return ret;
579 }
580
581 int tx_pkt_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
582 {
583         int ret = 0;
584         buf_pkt_all(tbase, mbufs, n_pkts, out);
585
586         const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
587         uint16_t prod, cons;
588         for (uint8_t i = 0; i < nb_bufs; ++i) {
589                 prod = tbase->ws_mbuf->idx[i].prod;
590                 cons = tbase->ws_mbuf->idx[i].cons;
591
592                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
593                         tbase->flags &= ~FLAG_TX_FLUSH;
594                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
595                         ret+= ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
596                 }
597         }
598         return ret;
599 }
600
601 static inline void trace_one_rx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf)
602 {
603         struct rte_mbuf tmp;
604         /* For each packet being transmitted, find which
605            buffer represent the packet as it was before
606            processing. */
607         uint32_t j = 0;
608         uint32_t len = sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr)/sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr[0]);
609         for (;j < len; ++j) {
610                 if (tbase->aux->task_rt_dump.pkt_mbuf_addr[j] == mbuf)
611                         break;
612         }
613         if (j != len) {
614 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
615                 tmp.data_off = 0;
616 #endif
617                 rte_pktmbuf_data_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
618                 rte_pktmbuf_pkt_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
619                 tmp.buf_addr = tbase->aux->task_rt_dump.pkt_cpy[j];
620                 plogdx_info(&tmp, "Trace RX: ");
621         }
622 }
623
624 static inline void trace_one_tx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf, uint8_t *out, uint32_t i)
625 {
626         if (out) {
627                 switch(out[i]) {
628                 case 0xFE:
629                         plogdx_info(mbuf, "Handled: ");
630                         break;
631                 case 0xFF:
632                         plogdx_info(mbuf, "Dropped: ");
633                         break;
634                 default:
635                         plogdx_info(mbuf, "TX[%d]: ", out[i]);
636                         break;
637                 }
638         } else if (tbase->aux->tx_pkt_orig == tx_pkt_drop_all) {
639                 plogdx_info(mbuf, "Dropped: ");
640         } else
641                 plogdx_info(mbuf, "TX[0]: ");
642 }
643
644 static void unset_trace(struct task_base *tbase)
645 {
646         if (0 == tbase->aux->task_rt_dump.n_trace) {
647                 if (tbase->tx_pkt == tx_pkt_l3) {
648                         tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
649                         tbase->aux->tx_pkt_orig = NULL;
650                 } else {
651                         tbase->tx_pkt = tbase->aux->tx_pkt_orig;
652                         tbase->aux->tx_pkt_orig = NULL;
653                 }
654                 tbase->aux->task_rt_dump.cur_trace = 0;
655                 task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
656         }
657 }
658
659 int tx_pkt_trace(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
660 {
661         int ret = 0;
662         if (tbase->aux->task_rt_dump.cur_trace == 0) {
663                 // No packet received since dumping...
664                 tbase->aux->task_rt_dump.n_print_tx = tbase->aux->task_rt_dump.n_trace;
665                 if (tbase->aux->task_rt_dump.n_trace < n_pkts) {
666                         tbase->aux->task_rt_dump.n_trace = 0;
667                         tbase->aux->task_rt_dump.cur_trace = 0;
668                         task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
669                 } else {
670                         tbase->aux->task_rt_dump.n_trace -= n_pkts;
671                 }
672                 ret = tx_pkt_dump(tbase, mbufs, n_pkts, out);
673                 tbase->aux->task_rt_dump.n_print_tx = 0;
674                 return ret;
675         }
676         plog_info("Tracing %d pkts\n", tbase->aux->task_rt_dump.cur_trace);
677         uint32_t cur_trace = (n_pkts < tbase->aux->task_rt_dump.cur_trace) ? n_pkts: tbase->aux->task_rt_dump.cur_trace;
678         for (uint32_t i = 0; i < cur_trace; ++i) {
679                 trace_one_rx_pkt(tbase, mbufs[i]);
680                 trace_one_tx_pkt(tbase, mbufs[i], out, i);
681
682         }
683         ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
684
685         unset_trace(tbase);
686         return ret;
687 }
688
689 int tx_pkt_dump(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
690 {
691         uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
692         int ret = 0;
693
694         n_dump = n_pkts < n_dump? n_pkts : n_dump;
695         for (uint32_t i = 0; i < n_dump; ++i) {
696                 if (out) {
697                         switch (out[i]) {
698                         case 0xFE:
699                                 plogdx_info(mbufs[i], "Handled: ");
700                                 break;
701                         case 0xFF:
702                                 plogdx_info(mbufs[i], "Dropped: ");
703                                 break;
704                         default:
705                                 plogdx_info(mbufs[i], "TX[%d]: ", out[i]);
706                                 break;
707                         }
708                 } else
709                         plogdx_info(mbufs[i], "TX: ");
710         }
711         tbase->aux->task_rt_dump.n_print_tx -= n_dump;
712
713         ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
714
715         if (0 == tbase->aux->task_rt_dump.n_print_tx) {
716                 if (tbase->tx_pkt == tx_pkt_l3) {
717                         tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
718                         tbase->aux->tx_pkt_orig = NULL;
719                 } else {
720                         tbase->tx_pkt = tbase->aux->tx_pkt_orig;
721                         tbase->aux->tx_pkt_orig = NULL;
722                 }
723         }
724         return ret;
725 }
726
727 /* Gather the distribution of the number of packets that have been
728    xmitted from one TX call. Since the value is only modified by the
729    task that xmits the packet, no atomic operation is needed. */
730 int tx_pkt_distr(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
731 {
732         if (likely(n_pkts < TX_BUCKET_SIZE))
733                 tbase->aux->tx_bucket[n_pkts]++;
734         else
735                 tbase->aux->tx_bucket[TX_BUCKET_SIZE - 1]++;
736         return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
737 }
738
739 int tx_pkt_bw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
740 {
741         uint32_t tx_bytes = 0;
742         uint32_t drop_bytes = 0;
743
744         for (uint16_t i = 0; i < n_pkts; ++i) {
745                 if (!out || out[i] < OUT_HANDLED)
746                         tx_bytes += mbuf_wire_size(mbufs[i]);
747                 else
748                         drop_bytes += mbuf_wire_size(mbufs[i]);
749         }
750
751         TASK_STATS_ADD_TX_BYTES(&tbase->aux->stats, tx_bytes);
752         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
753         return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
754 }
755
756 int tx_pkt_drop_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
757 {
758         for (uint16_t j = 0; j < n_pkts; ++j) {
759                 rte_pktmbuf_free(mbufs[j]);
760         }
761         if (out == NULL)
762                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts);
763         else {
764                 for (uint16_t j = 0; j < n_pkts; ++j) {
765                         if (out[j] == OUT_HANDLED)
766                                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
767                         else
768                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
769                 }
770         }
771         return n_pkts;
772 }
773 static inline void dump_pkts(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
774 {
775         uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
776         uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
777
778         if (unlikely(n_dump)) {
779                 n_dump = n_pkts < n_dump? n_pkts : n_dump;
780                 for (uint32_t i = 0; i < n_dump; ++i) {
781                         plogdx_info(mbufs[i], "TX: ");
782                 }
783                 tbase->aux->task_rt_dump.n_print_tx -= n_dump;
784         } else if (unlikely(n_trace)) {
785                 n_trace = n_pkts < n_trace? n_pkts : n_trace;
786                 for (uint32_t i = 0; i < n_trace; ++i) {
787                         plogdx_info(mbufs[i], "TX: ");
788                 }
789                 tbase->aux->task_rt_dump.n_trace -= n_trace;
790         }
791 }
792
793 // ctrlplane packets are slow path, hence cost of checking if dump ortrace is needed in not too important
794 // easier to have this implementation than an implementation similar to dataplane tx
795 int tx_ctrlplane_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
796 {
797         dump_pkts(tbase, mbufs, n_pkts);
798         return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
799 }
800
801 int tx_ctrlplane_sw(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
802 {
803         dump_pkts(tbase, mbufs, n_pkts);
804         return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
805 }
806
807 static inline int tx_ring_all(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
808 {
809         if (tbase->aux->task_rt_dump.cur_trace) {
810                 trace_one_rx_pkt(tbase, mbuf);
811         }
812         mbuf->udata64 = ((uint64_t)ip << 32) | (core_id << 16) | (task_id << 8) | command;
813         return rte_ring_enqueue(ring, mbuf);
814 }
815
816 int tx_ring_cti(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
817 {
818         plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
819         int ret = tx_ring_all(tbase, ring, command,  mbuf, core_id, task_id, ip);
820         if (unlikely(ret != 0)) {
821                 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
822                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
823                 rte_pktmbuf_free(mbuf);
824         }
825         return ret;
826 }
827
828 void tx_ring_ip(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf, uint32_t ip)
829 {
830         plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
831         int ret = tx_ring_all(tbase, ring, command,  mbuf, 0, 0, ip);
832         if (unlikely(ret != 0)) {
833                 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
834                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
835                 rte_pktmbuf_free(mbuf);
836         }
837 }
838
839 void tx_ring(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf)
840 {
841         plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
842         int ret = tx_ring_all(tbase, ring, command,  mbuf, 0, 0, 0);
843         if (unlikely(ret != 0)) {
844                 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
845                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
846                 rte_pktmbuf_free(mbuf);
847         }
848 }
849
850 void tx_ring_route(struct task_base *tbase, struct rte_ring *ring, int add, struct rte_mbuf *mbuf, uint32_t ip, uint32_t gateway_ip, uint32_t prefix)
851 {
852         uint8_t command;
853         if (add)
854                 command = ROUTE_ADD_FROM_CTRL;
855         else
856                 command = ROUTE_DEL_FROM_CTRL;
857
858         plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
859         ctrl_ring_set_command(mbuf, command);
860         ctrl_ring_set_ip(mbuf, ip);
861         ctrl_ring_set_gateway_ip(mbuf, gateway_ip);
862         ctrl_ring_set_prefix(mbuf, prefix);
863         if (tbase->aux->task_rt_dump.cur_trace) {
864                 trace_one_rx_pkt(tbase, mbuf);
865         }
866         int ret = rte_ring_enqueue(ring, mbuf);
867         if (unlikely(ret != 0)) {
868                 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
869                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
870                 rte_pktmbuf_free(mbuf);
871         }
872 }
873
874 void ctrl_ring_set_command(struct rte_mbuf *mbuf, uint64_t udata64)
875 {
876         mbuf->udata64 = udata64;
877 }
878
879 uint64_t ctrl_ring_get_command(struct rte_mbuf *mbuf)
880 {
881         return mbuf->udata64;
882 }
883
884 void ctrl_ring_set_ip(struct rte_mbuf *mbuf, uint32_t udata32)
885 {
886         struct prox_headroom *prox_headroom = (struct prox_headroom *)(rte_pktmbuf_mtod(mbuf, uint8_t*) - sizeof(struct prox_headroom));
887         prox_headroom->ip = udata32;
888 }
889
890 uint32_t  ctrl_ring_get_ip(struct rte_mbuf *mbuf)
891 {
892         struct prox_headroom *prox_headroom = (struct prox_headroom *)(rte_pktmbuf_mtod(mbuf, uint8_t*) - sizeof(struct prox_headroom));
893         return prox_headroom->ip;
894 }
895
896 void ctrl_ring_set_gateway_ip(struct rte_mbuf *mbuf, uint32_t udata32)
897 {
898         struct prox_headroom *prox_headroom = (struct prox_headroom *)(rte_pktmbuf_mtod(mbuf, uint8_t*) - sizeof(struct prox_headroom));
899         prox_headroom->gateway_ip = udata32;
900 }
901
902 uint32_t ctrl_ring_get_gateway_ip(struct rte_mbuf *mbuf)
903 {
904         struct prox_headroom *prox_headroom = (struct prox_headroom *)(rte_pktmbuf_mtod(mbuf, uint8_t*) - sizeof(struct prox_headroom));
905         return prox_headroom->gateway_ip;
906 }
907
908 void ctrl_ring_set_prefix(struct rte_mbuf *mbuf, uint32_t udata32)
909 {
910         struct prox_headroom *prox_headroom = (struct prox_headroom *)(rte_pktmbuf_mtod(mbuf, uint8_t*) - sizeof(struct prox_headroom));
911         prox_headroom->prefix = udata32;
912 }
913
914 uint32_t ctrl_ring_get_prefix(struct rte_mbuf *mbuf)
915 {
916         struct prox_headroom *prox_headroom = (struct prox_headroom *)(rte_pktmbuf_mtod(mbuf, uint8_t*) - sizeof(struct prox_headroom));
917         return prox_headroom->prefix;
918 }