Added initial support for BGP
[samplevnf.git] / VNFs / DPPD-PROX / tx_pkt.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_ethdev.h>
18 #include <rte_version.h>
19
20 #include "rx_pkt.h"
21 #include "tx_pkt.h"
22 #include "task_base.h"
23 #include "stats.h"
24 #include "prefetch.h"
25 #include "prox_assert.h"
26 #include "log.h"
27 #include "mbuf_utils.h"
28 #include "handle_master.h"
29
30 static void buf_pkt_single(struct task_base *tbase, struct rte_mbuf *mbuf, const uint8_t out)
31 {
32         const uint16_t prod = tbase->ws_mbuf->idx[out].prod++;
33         tbase->ws_mbuf->mbuf[out][prod & WS_MBUF_MASK] = mbuf;
34 }
35
36 static inline void buf_pkt_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
37 {
38         for (uint16_t j = 0; j < n_pkts; ++j) {
39                 if (unlikely(out[j] >= OUT_HANDLED)) {
40                         rte_pktmbuf_free(mbufs[j]);
41                         if (out[j] == OUT_HANDLED)
42                                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
43                         else
44                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
45                 }
46                 else {
47                         buf_pkt_single(tbase, mbufs[j], out[j]);
48                 }
49         }
50 }
51 #define MAX_PMD_TX 32
52
53 int tx_pkt_l3(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
54 {
55         uint32_t ip_dst;
56         int first = 0, ret, ok = 0, rc;
57         const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
58         struct rte_mbuf *arp_mbuf = NULL;       // used when one need to send both an ARP and a mbuf
59         uint64_t *time;
60
61         for (int j = 0; j < n_pkts; j++) {
62                 if ((out) && (out[j] >= OUT_HANDLED))
63                         continue;
64                 if (unlikely((rc = write_dst_mac(tbase, mbufs[j], &ip_dst, &time)) != SEND_MBUF)) {
65                         if (j - first) {
66                                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
67                                 ok += ret;
68                         }
69                         first = j + 1;
70                         switch(rc) {
71                         case SEND_ARP:
72                                 // We re-use the mbuf - no need to create a arp_mbuf and delete the existing mbuf
73                                 mbufs[j]->port = tbase->l3.reachable_port_id;
74                                 if (tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, REQ_MAC_TO_CTRL, mbufs[j], tbase->l3.core_id, tbase->l3.task_id, ip_dst) == 0)
75                                         update_arp_update_time(&tbase->l3, time, 1000);
76                                 else
77                                         update_arp_update_time(&tbase->l3, time, 100);
78                                 break;
79                         case SEND_MBUF_AND_ARP:
80                                 // We send the mbuf and an ARP - we need to allocate another mbuf for ARP
81                                 ret = rte_mempool_get(tbase->l3.arp_pool, (void **)&arp_mbuf);
82                                 if (likely(ret == 0))   {
83                                         arp_mbuf->port = tbase->l3.reachable_port_id;
84                                         if (tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, REQ_MAC_TO_CTRL, arp_mbuf, tbase->l3.core_id, tbase->l3.task_id, ip_dst) == 0)
85                                                 update_arp_update_time(&tbase->l3, time, 1000);
86                                         else
87                                                 update_arp_update_time(&tbase->l3, time, 100);
88                                 } else {
89                                         plog_err("Failed to get a mbuf from arp mempool\n");
90                                         // We still send the initial mbuf
91                                 }
92                                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + j, 1, out);
93                                 break;
94                         case DROP_MBUF:
95                                 tx_drop(mbufs[j]);
96                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
97                                 break;
98                         }
99                 }
100         }
101         if (n_pkts - first) {
102                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
103                 ok += ret;
104         }
105         return ok;
106 }
107
108 /* The following help functions also report stats. Therefore we need
109    to pass the task_base struct. */
110 static inline int txhw_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
111 {
112         uint16_t ntx;
113         int ret;
114
115         /* TX vector mode can't transmit more than 32 packets */
116         if (n_pkts > MAX_PMD_TX) {
117                 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, MAX_PMD_TX);
118                 ntx += rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs + ntx, n_pkts - ntx);
119         } else {
120                 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
121         }
122         TASK_STATS_ADD_TX(&tbase->aux->stats, ntx);
123
124         ret =  n_pkts - ntx;
125         if (ntx < n_pkts) {
126                 plog_dbg("Failed to send %d packets from %p\n", ret, mbufs[0]);
127                 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts - ntx);
128                 if (tbase->tx_pkt == tx_pkt_bw) {
129                         uint32_t drop_bytes = 0;
130                         do {
131                                 drop_bytes += mbuf_wire_size(mbufs[ntx]);
132                                 rte_pktmbuf_free(mbufs[ntx++]);
133                         } while (ntx < n_pkts);
134                         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
135                 }
136                 else {
137                         do {
138                                 rte_pktmbuf_free(mbufs[ntx++]);
139                         } while (ntx < n_pkts);
140                 }
141         }
142         return ret;
143 }
144
145 static inline int txhw_no_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
146 {
147         uint16_t ret;
148         uint16_t n = n_pkts;
149
150         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
151         do {
152                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
153                 mbufs += ret;
154                 n_pkts -= ret;
155         }
156         while (n_pkts);
157         return (n != ret);
158 }
159
160 static inline int ring_enq_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
161 {
162         int ret = 0;
163         /* return 0 on succes, -ENOBUFS on failure */
164         // Rings can be single or multiproducer (ctrl rings are multi producer)
165 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
166         if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts))) {
167 #else
168         if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0)) {
169 #endif
170                 ret = n_pkts;
171                 if (tbase->tx_pkt == tx_pkt_bw) {
172                         uint32_t drop_bytes = 0;
173                         for (uint16_t i = 0; i < n_pkts; ++i) {
174                                 drop_bytes += mbuf_wire_size(mbufs[i]);
175                                 rte_pktmbuf_free(mbufs[i]);
176                         }
177                         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
178                         TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
179                 }
180                 else {
181                         for (uint16_t i = 0; i < n_pkts; ++i)
182                                 rte_pktmbuf_free(mbufs[i]);
183                         TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
184                 }
185         }
186         else {
187                 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
188         }
189         return ret;
190 }
191
192 static inline int ring_enq_no_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
193 {
194         int i = 0;
195 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
196         while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts)) {
197 #else
198         while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0) {
199 #endif
200                 i++;
201         };
202         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
203         return (i != 0);
204 }
205
206 void flush_queues_hw(struct task_base *tbase)
207 {
208         uint16_t prod, cons;
209
210         for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
211                 prod = tbase->ws_mbuf->idx[i].prod;
212                 cons = tbase->ws_mbuf->idx[i].cons;
213
214                 if (prod != cons) {
215                         tbase->ws_mbuf->idx[i].prod = 0;
216                         tbase->ws_mbuf->idx[i].cons = 0;
217                         txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
218                 }
219         }
220
221         tbase->flags &= ~FLAG_TX_FLUSH;
222 }
223
224 void flush_queues_sw(struct task_base *tbase)
225 {
226         uint16_t prod, cons;
227
228         for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
229                 prod = tbase->ws_mbuf->idx[i].prod;
230                 cons = tbase->ws_mbuf->idx[i].cons;
231
232                 if (prod != cons) {
233                         tbase->ws_mbuf->idx[i].prod = 0;
234                         tbase->ws_mbuf->idx[i].cons = 0;
235                         ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
236                 }
237         }
238         tbase->flags &= ~FLAG_TX_FLUSH;
239 }
240
241 void flush_queues_no_drop_hw(struct task_base *tbase)
242 {
243         uint16_t prod, cons;
244
245         for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
246                 prod = tbase->ws_mbuf->idx[i].prod;
247                 cons = tbase->ws_mbuf->idx[i].cons;
248
249                 if (prod != cons) {
250                         tbase->ws_mbuf->idx[i].prod = 0;
251                         tbase->ws_mbuf->idx[i].cons = 0;
252                         txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
253                 }
254         }
255
256         tbase->flags &= ~FLAG_TX_FLUSH;
257 }
258
259 void flush_queues_no_drop_sw(struct task_base *tbase)
260 {
261         uint16_t prod, cons;
262
263         for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
264                 prod = tbase->ws_mbuf->idx[i].prod;
265                 cons = tbase->ws_mbuf->idx[i].cons;
266
267                 if (prod != cons) {
268                         tbase->ws_mbuf->idx[i].prod = 0;
269                         tbase->ws_mbuf->idx[i].cons = 0;
270                         ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
271                 }
272         }
273         tbase->flags &= ~FLAG_TX_FLUSH;
274 }
275
276 /* "try" functions try to send packets to sw/hw w/o failing or blocking;
277    They return if ring/queue is full and are used by aggregators.
278    "try" functions do not have drop/no drop flavors
279    They are only implemented in never_discard mode (as by default they
280    use only one outgoing ring. */
281 uint16_t tx_try_self(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
282 {
283         if (n_pkts < 64) {
284                 tx_pkt_never_discard_self(tbase, mbufs, n_pkts, NULL);
285                 return n_pkts;
286         } else {
287                 tx_pkt_never_discard_self(tbase, mbufs, 64, NULL);
288                 return 64;
289         }
290 }
291
292 uint16_t tx_try_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
293 {
294         const int bulk_size = 64;
295         uint16_t ret = bulk_size, sent = 0, n_bulks;
296         n_bulks = n_pkts >> __builtin_ctz(bulk_size);
297
298         for (int i = 0; i < n_bulks; i++) {
299 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
300                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size);
301 #else
302                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size, NULL);
303 #endif
304                 mbufs += ret;
305                 sent += ret;
306                 if (ret != bulk_size)
307                         break;
308         }
309         if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
310 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
311                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)));
312 #else
313                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)), NULL);
314 #endif
315                 mbufs += ret;
316                 sent += ret;
317         }
318         TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
319         return sent;
320 }
321
322 uint16_t tx_try_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
323 {
324         const int bulk_size = 64;
325         uint16_t ret = bulk_size, n_bulks, sent = 0;
326         n_bulks = n_pkts >>  __builtin_ctz(bulk_size);
327
328         const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
329         for (int i = 0; i < n_bulks; i++) {
330                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, bulk_size);
331                 mbufs += ret;
332                 sent += ret;
333                 if (ret != bulk_size)
334                         break;
335         }
336         if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
337                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, (n_pkts & (bulk_size - 1)));
338                 mbufs += ret;
339                 sent += ret;
340         }
341         TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
342         return sent;
343 }
344
345 int tx_pkt_no_drop_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
346 {
347         return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
348 }
349
350 int tx_pkt_no_drop_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
351 {
352         static uint8_t fake_out[MAX_PKT_BURST] = {0};
353         int ret = 0;
354         if (n_pkts == MAX_PKT_BURST) {
355                 // First xmit what was queued
356                 uint16_t prod, cons;
357
358                 prod = tbase->ws_mbuf->idx[0].prod;
359                 cons = tbase->ws_mbuf->idx[0].cons;
360
361                 if ((uint16_t)(prod - cons)){
362                         tbase->flags &= ~FLAG_TX_FLUSH;
363                         tbase->ws_mbuf->idx[0].prod = 0;
364                         tbase->ws_mbuf->idx[0].cons = 0;
365                         ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
366                 }
367                 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
368         } else {
369                 ret+= tx_pkt_no_drop_hw(tbase, mbufs, n_pkts, fake_out);
370         }
371         return ret;
372 }
373
374 int tx_pkt_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
375 {
376         return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
377 }
378
379 int tx_pkt_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
380 {
381         static uint8_t fake_out[MAX_PKT_BURST] = {0};
382         int ret = 0;
383         if (n_pkts == MAX_PKT_BURST) {
384                 // First xmit what was queued
385                 uint16_t prod, cons;
386
387                 prod = tbase->ws_mbuf->idx[0].prod;
388                 cons = tbase->ws_mbuf->idx[0].cons;
389
390                 if ((uint16_t)(prod - cons)){
391                         tbase->flags &= ~FLAG_TX_FLUSH;
392                         tbase->ws_mbuf->idx[0].prod = 0;
393                         tbase->ws_mbuf->idx[0].cons = 0;
394                         ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
395                 }
396                 ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
397         } else {
398                 ret+= tx_pkt_hw(tbase, mbufs, n_pkts, fake_out);
399         }
400         return ret;
401 }
402
403 /* Transmit to hw using tx_params_hw_sw structure
404    This function is used  to transmit to hw when tx_params_hw_sw should be used
405    i.e. when the task needs to transmit both to hw and sw */
406 int tx_pkt_no_drop_never_discard_hw1_no_pointer(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
407 {
408         txhw_no_drop(&tbase->tx_params_hw_sw.tx_port_queue, mbufs, n_pkts, tbase);
409         return 0;
410 }
411
412 int tx_pkt_no_drop_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
413 {
414         return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
415 }
416
417 int tx_pkt_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
418 {
419         return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
420 }
421
422 static uint16_t tx_pkt_free_dropped(__attribute__((unused)) struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
423 {
424         uint64_t v = 0;
425         uint16_t i;
426         /* The most probable and most important optimize case is if
427            the no packets should be dropped. */
428         for (i = 0; i + 8 < n_pkts; i += 8) {
429                 v |= *((uint64_t*)(&out[i]));
430         }
431         for (; i < n_pkts; ++i) {
432                 v |= out[i];
433         }
434
435         if (unlikely(v)) {
436                 /* At least some packets need to be dropped, so the
437                    mbufs array needs to be updated. */
438                 uint16_t n_kept = 0;
439                 uint16_t n_discard = 0;
440                 for (uint16_t i = 0; i < n_pkts; ++i) {
441                         if (unlikely(out[i] >= OUT_HANDLED)) {
442                                 rte_pktmbuf_free(mbufs[i]);
443                                 n_discard += out[i] == OUT_DISCARD;
444                                 continue;
445                         }
446                         mbufs[n_kept++] = mbufs[i];
447                 }
448                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, n_discard);
449                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts - n_kept - n_discard);
450                 return n_kept;
451         }
452         return n_pkts;
453 }
454
455 int tx_pkt_no_drop_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
456 {
457         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
458         int ret = 0;
459
460         if (likely(n_kept))
461                 ret = txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
462         return ret;
463 }
464
465 int tx_pkt_no_drop_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
466 {
467         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
468         int ret = 0;
469
470         if (likely(n_kept))
471                 ret = ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
472         return ret;
473 }
474
475 int tx_pkt_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
476 {
477         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
478
479         if (likely(n_kept))
480                 return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
481         return n_pkts;
482 }
483
484 int tx_pkt_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
485 {
486         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
487
488         if (likely(n_kept))
489                 return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
490         return 0;
491 }
492
493 int tx_pkt_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
494 {
495         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
496
497         TASK_STATS_ADD_TX(&tbase->aux->stats, n_kept);
498         tbase->ws_mbuf->idx[0].nb_rx = n_kept;
499         struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
500         for (uint16_t i = 0; i < n_kept; ++i) {
501                 tx_mbuf[i] = mbufs[i];
502         }
503         return 0;
504 }
505
506 int tx_pkt_never_discard_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
507 {
508         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
509         tbase->ws_mbuf->idx[0].nb_rx = n_pkts;
510         struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
511         for (uint16_t i = 0; i < n_pkts; ++i) {
512                 tx_mbuf[i] = mbufs[i];
513         }
514         return 0;
515 }
516
517 int tx_pkt_no_drop_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
518 {
519         int ret = 0;
520         buf_pkt_all(tbase, mbufs, n_pkts, out);
521
522         const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
523         uint16_t prod, cons;
524
525         for (uint8_t i = 0; i < nb_bufs; ++i) {
526                 prod = tbase->ws_mbuf->idx[i].prod;
527                 cons = tbase->ws_mbuf->idx[i].cons;
528
529                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
530                         tbase->flags &= ~FLAG_TX_FLUSH;
531                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
532                         ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
533                 }
534         }
535         return ret;
536 }
537
538 int tx_pkt_no_drop_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
539 {
540         int ret = 0;
541         buf_pkt_all(tbase, mbufs, n_pkts, out);
542
543         const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
544         uint16_t prod, cons;
545
546         for (uint8_t i = 0; i < nb_bufs; ++i) {
547                 prod = tbase->ws_mbuf->idx[i].prod;
548                 cons = tbase->ws_mbuf->idx[i].cons;
549
550                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
551                         tbase->flags &= ~FLAG_TX_FLUSH;
552                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
553                         ret += ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
554                 }
555         }
556         return ret;
557 }
558
559 int tx_pkt_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
560 {
561         int ret = 0;
562         buf_pkt_all(tbase, mbufs, n_pkts, out);
563
564         const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
565         uint16_t prod, cons;
566
567         for (uint8_t i = 0; i < nb_bufs; ++i) {
568                 prod = tbase->ws_mbuf->idx[i].prod;
569                 cons = tbase->ws_mbuf->idx[i].cons;
570
571                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
572                         tbase->flags &= ~FLAG_TX_FLUSH;
573                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
574                         ret += txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
575                 }
576         }
577         return ret;
578 }
579
580 int tx_pkt_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
581 {
582         int ret = 0;
583         buf_pkt_all(tbase, mbufs, n_pkts, out);
584
585         const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
586         uint16_t prod, cons;
587         for (uint8_t i = 0; i < nb_bufs; ++i) {
588                 prod = tbase->ws_mbuf->idx[i].prod;
589                 cons = tbase->ws_mbuf->idx[i].cons;
590
591                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
592                         tbase->flags &= ~FLAG_TX_FLUSH;
593                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
594                         ret+= ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
595                 }
596         }
597         return ret;
598 }
599
600 static inline void trace_one_rx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf)
601 {
602         struct rte_mbuf tmp;
603         /* For each packet being transmitted, find which
604            buffer represent the packet as it was before
605            processing. */
606         uint32_t j = 0;
607         uint32_t len = sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr)/sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr[0]);
608         for (;j < len; ++j) {
609                 if (tbase->aux->task_rt_dump.pkt_mbuf_addr[j] == mbuf)
610                         break;
611         }
612         if (j != len) {
613 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
614                 tmp.data_off = 0;
615 #endif
616                 rte_pktmbuf_data_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
617                 rte_pktmbuf_pkt_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
618                 tmp.buf_addr = tbase->aux->task_rt_dump.pkt_cpy[j];
619                 plogdx_info(&tmp, "Trace RX: ");
620         }
621 }
622
623 static inline void trace_one_tx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf, uint8_t *out, uint32_t i)
624 {
625         if (out) {
626                 switch(out[i]) {
627                 case 0xFE:
628                         plogdx_info(mbuf, "Handled: ");
629                         break;
630                 case 0xFF:
631                         plogdx_info(mbuf, "Dropped: ");
632                         break;
633                 default:
634                         plogdx_info(mbuf, "TX[%d]: ", out[i]);
635                         break;
636                 }
637         } else if (tbase->aux->tx_pkt_orig == tx_pkt_drop_all) {
638                 plogdx_info(mbuf, "Dropped: ");
639         } else
640                 plogdx_info(mbuf, "TX[0]: ");
641 }
642
643 static void unset_trace(struct task_base *tbase)
644 {
645         if (0 == tbase->aux->task_rt_dump.n_trace) {
646                 if (tbase->tx_pkt == tx_pkt_l3) {
647                         tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
648                         tbase->aux->tx_pkt_orig = NULL;
649                 } else {
650                         tbase->tx_pkt = tbase->aux->tx_pkt_orig;
651                         tbase->aux->tx_pkt_orig = NULL;
652                 }
653                 tbase->aux->task_rt_dump.cur_trace = 0;
654                 task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
655         }
656 }
657
658 int tx_pkt_trace(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
659 {
660         int ret = 0;
661         if (tbase->aux->task_rt_dump.cur_trace == 0) {
662                 // No packet received since dumping...
663                 tbase->aux->task_rt_dump.n_print_tx = tbase->aux->task_rt_dump.n_trace;
664                 if (tbase->aux->task_rt_dump.n_trace < n_pkts) {
665                         tbase->aux->task_rt_dump.n_trace = 0;
666                         tbase->aux->task_rt_dump.cur_trace = 0;
667                         task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
668                 } else {
669                         tbase->aux->task_rt_dump.n_trace -= n_pkts;
670                 }
671                 ret = tx_pkt_dump(tbase, mbufs, n_pkts, out);
672                 tbase->aux->task_rt_dump.n_print_tx = 0;
673                 return ret;
674         }
675         plog_info("Tracing %d pkts\n", tbase->aux->task_rt_dump.cur_trace);
676         uint32_t cur_trace = (n_pkts < tbase->aux->task_rt_dump.cur_trace) ? n_pkts: tbase->aux->task_rt_dump.cur_trace;
677         for (uint32_t i = 0; i < cur_trace; ++i) {
678                 trace_one_rx_pkt(tbase, mbufs[i]);
679                 trace_one_tx_pkt(tbase, mbufs[i], out, i);
680
681         }
682         ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
683
684         unset_trace(tbase);
685         return ret;
686 }
687
688 int tx_pkt_dump(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
689 {
690         uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
691         int ret = 0;
692
693         n_dump = n_pkts < n_dump? n_pkts : n_dump;
694         for (uint32_t i = 0; i < n_dump; ++i) {
695                 if (out) {
696                         switch (out[i]) {
697                         case 0xFE:
698                                 plogdx_info(mbufs[i], "Handled: ");
699                                 break;
700                         case 0xFF:
701                                 plogdx_info(mbufs[i], "Dropped: ");
702                                 break;
703                         default:
704                                 plogdx_info(mbufs[i], "TX[%d]: ", out[i]);
705                                 break;
706                         }
707                 } else
708                         plogdx_info(mbufs[i], "TX: ");
709         }
710         tbase->aux->task_rt_dump.n_print_tx -= n_dump;
711
712         ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
713
714         if (0 == tbase->aux->task_rt_dump.n_print_tx) {
715                 if (tbase->tx_pkt == tx_pkt_l3) {
716                         tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
717                         tbase->aux->tx_pkt_orig = NULL;
718                 } else {
719                         tbase->tx_pkt = tbase->aux->tx_pkt_orig;
720                         tbase->aux->tx_pkt_orig = NULL;
721                 }
722         }
723         return ret;
724 }
725
726 /* Gather the distribution of the number of packets that have been
727    xmitted from one TX call. Since the value is only modified by the
728    task that xmits the packet, no atomic operation is needed. */
729 int tx_pkt_distr(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
730 {
731         if (likely(n_pkts < TX_BUCKET_SIZE))
732                 tbase->aux->tx_bucket[n_pkts]++;
733         else
734                 tbase->aux->tx_bucket[TX_BUCKET_SIZE - 1]++;
735         return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
736 }
737
738 int tx_pkt_bw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
739 {
740         uint32_t tx_bytes = 0;
741         uint32_t drop_bytes = 0;
742
743         for (uint16_t i = 0; i < n_pkts; ++i) {
744                 if (!out || out[i] < OUT_HANDLED)
745                         tx_bytes += mbuf_wire_size(mbufs[i]);
746                 else
747                         drop_bytes += mbuf_wire_size(mbufs[i]);
748         }
749
750         TASK_STATS_ADD_TX_BYTES(&tbase->aux->stats, tx_bytes);
751         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
752         return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
753 }
754
755 int tx_pkt_drop_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
756 {
757         for (uint16_t j = 0; j < n_pkts; ++j) {
758                 rte_pktmbuf_free(mbufs[j]);
759         }
760         if (out == NULL)
761                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts);
762         else {
763                 for (uint16_t j = 0; j < n_pkts; ++j) {
764                         if (out[j] == OUT_HANDLED)
765                                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
766                         else
767                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
768                 }
769         }
770         return n_pkts;
771 }
772 static inline void dump_pkts(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
773 {
774         uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
775         uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
776
777         if (unlikely(n_dump)) {
778                 n_dump = n_pkts < n_dump? n_pkts : n_dump;
779                 for (uint32_t i = 0; i < n_dump; ++i) {
780                         plogdx_info(mbufs[i], "TX: ");
781                 }
782                 tbase->aux->task_rt_dump.n_print_tx -= n_dump;
783         } else if (unlikely(n_trace)) {
784                 n_trace = n_pkts < n_trace? n_pkts : n_trace;
785                 for (uint32_t i = 0; i < n_trace; ++i) {
786                         plogdx_info(mbufs[i], "TX: ");
787                 }
788                 tbase->aux->task_rt_dump.n_trace -= n_trace;
789         }
790 }
791
792 // ctrlplane packets are slow path, hence cost of checking if dump ortrace is needed in not too important
793 // easier to have this implementation than an implementation similar to dataplane tx
794 int tx_ctrlplane_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
795 {
796         dump_pkts(tbase, mbufs, n_pkts);
797         return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
798 }
799
800 int tx_ctrlplane_sw(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
801 {
802         dump_pkts(tbase, mbufs, n_pkts);
803         return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
804 }
805
806 static inline int tx_ring_all(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
807 {
808         if (tbase->aux->task_rt_dump.cur_trace) {
809                 trace_one_rx_pkt(tbase, mbuf);
810         }
811         mbuf->udata64 = ((uint64_t)ip << 32) | (core_id << 16) | (task_id << 8) | command;
812         return rte_ring_enqueue(ring, mbuf);
813 }
814
815 int tx_ring_cti(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
816 {
817         plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
818         int ret = tx_ring_all(tbase, ring, command,  mbuf, core_id, task_id, ip);
819         if (unlikely(ret != 0)) {
820                 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
821                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
822                 rte_pktmbuf_free(mbuf);
823         }
824         return ret;
825 }
826
827 void tx_ring_ip(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf, uint32_t ip)
828 {
829         plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
830         int ret = tx_ring_all(tbase, ring, command,  mbuf, 0, 0, ip);
831         if (unlikely(ret != 0)) {
832                 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
833                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
834                 rte_pktmbuf_free(mbuf);
835         }
836 }
837
838 void tx_ring(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf)
839 {
840         plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
841         int ret = tx_ring_all(tbase, ring, command,  mbuf, 0, 0, 0);
842         if (unlikely(ret != 0)) {
843                 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
844                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
845                 rte_pktmbuf_free(mbuf);
846         }
847 }
848
849 void tx_ring_route(struct task_base *tbase, struct rte_ring *ring, int add, struct rte_mbuf *mbuf, uint32_t ip, uint32_t gateway_ip, uint32_t prefix)
850 {
851         uint8_t command;
852         if (add)
853                 command = ROUTE_ADD_FROM_CTRL;
854         else
855                 command = ROUTE_DEL_FROM_CTRL;
856
857         plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
858         ctrl_ring_set_command(mbuf, command);
859         ctrl_ring_set_ip(mbuf, ip);
860         ctrl_ring_set_gateway_ip(mbuf, gateway_ip);
861         ctrl_ring_set_prefix(mbuf, prefix);
862         if (tbase->aux->task_rt_dump.cur_trace) {
863                 trace_one_rx_pkt(tbase, mbuf);
864         }
865         int ret = rte_ring_enqueue(ring, mbuf);
866         if (unlikely(ret != 0)) {
867                 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
868                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
869                 rte_pktmbuf_free(mbuf);
870         }
871 }
872
873 void ctrl_ring_set_command(struct rte_mbuf *mbuf, uint64_t udata64)
874 {
875         mbuf->udata64 = udata64;
876 }
877
878 uint64_t ctrl_ring_get_command(struct rte_mbuf *mbuf)
879 {
880         return mbuf->udata64;
881 }
882
883 void ctrl_ring_set_ip(struct rte_mbuf *mbuf, uint32_t udata32)
884 {
885         struct prox_headroom *prox_headroom = (struct prox_headroom *)(rte_pktmbuf_mtod(mbuf, uint8_t*) - sizeof(struct prox_headroom));
886         prox_headroom->ip = udata32;
887 }
888
889 uint32_t  ctrl_ring_get_ip(struct rte_mbuf *mbuf)
890 {
891         struct prox_headroom *prox_headroom = (struct prox_headroom *)(rte_pktmbuf_mtod(mbuf, uint8_t*) - sizeof(struct prox_headroom));
892         return prox_headroom->ip;
893 }
894
895 void ctrl_ring_set_gateway_ip(struct rte_mbuf *mbuf, uint32_t udata32)
896 {
897         struct prox_headroom *prox_headroom = (struct prox_headroom *)(rte_pktmbuf_mtod(mbuf, uint8_t*) - sizeof(struct prox_headroom));
898         prox_headroom->gateway_ip = udata32;
899 }
900
901 uint32_t ctrl_ring_get_gateway_ip(struct rte_mbuf *mbuf)
902 {
903         struct prox_headroom *prox_headroom = (struct prox_headroom *)(rte_pktmbuf_mtod(mbuf, uint8_t*) - sizeof(struct prox_headroom));
904         return prox_headroom->gateway_ip;
905 }
906
907 void ctrl_ring_set_prefix(struct rte_mbuf *mbuf, uint32_t udata32)
908 {
909         struct prox_headroom *prox_headroom = (struct prox_headroom *)(rte_pktmbuf_mtod(mbuf, uint8_t*) - sizeof(struct prox_headroom));
910         prox_headroom->prefix = udata32;
911 }
912
913 uint32_t ctrl_ring_get_prefix(struct rte_mbuf *mbuf)
914 {
915         struct prox_headroom *prox_headroom = (struct prox_headroom *)(rte_pktmbuf_mtod(mbuf, uint8_t*) - sizeof(struct prox_headroom));
916         return prox_headroom->prefix;
917 }