Added initial support for NDP (IPv6)
[samplevnf.git] / VNFs / DPPD-PROX / tx_pkt.c
1 /*
2 // Copyright (c) 2010-2020 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_ethdev.h>
18 #include <rte_version.h>
19
20 #include "rx_pkt.h"
21 #include "tx_pkt.h"
22 #include "task_base.h"
23 #include "stats.h"
24 #include "prefetch.h"
25 #include "prox_assert.h"
26 #include "log.h"
27 #include "mbuf_utils.h"
28 #include "handle_master.h"
29 #include "defines.h"
30
31 static void buf_pkt_single(struct task_base *tbase, struct rte_mbuf *mbuf, const uint8_t out)
32 {
33         const uint16_t prod = tbase->ws_mbuf->idx[out].prod++;
34         tbase->ws_mbuf->mbuf[out][prod & WS_MBUF_MASK] = mbuf;
35 }
36
37 static inline void buf_pkt_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
38 {
39         for (uint16_t j = 0; j < n_pkts; ++j) {
40                 if (unlikely(out[j] >= OUT_HANDLED)) {
41                         rte_pktmbuf_free(mbufs[j]);
42                         if (out[j] == OUT_HANDLED)
43                                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
44                         else
45                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
46                 }
47                 else {
48                         buf_pkt_single(tbase, mbufs[j], out[j]);
49                 }
50         }
51 }
52 #define MAX_PMD_TX 32
53
54 void store_packet(struct task_base *tbase, struct rte_mbuf *mbuf)
55 {
56         // If buffer is full, drop the first mbuf
57         if (tbase->aux->mbuf)
58                 tx_drop(tbase->aux->mbuf);
59         tbase->aux->mbuf = mbuf;
60 }
61
62 int tx_pkt_ndp(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
63 {
64         // TODO NDP
65         struct ipv6_addr ip_dst;
66         int first = 0, ret, ok = 0, rc;
67         const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
68         struct rte_mbuf *mbuf = NULL;       // used when one need to send both an ARP and a mbuf
69
70         for (int j = 0; j < n_pkts; j++) {
71                 if ((out) && (out[j] >= OUT_HANDLED))
72                         continue;
73                 if (unlikely((rc = write_ip6_dst_mac(tbase, mbufs[j], &ip_dst)) != SEND_MBUF)) {
74                         if (j - first) {
75                                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
76                                 ok += ret;
77                         }
78                         first = j + 1;
79                         switch(rc) {
80                         case SEND_ARP_ND:
81                                 // Original mbuf (packet) is stored to be sent later -> need to allocate new mbuf
82                                 ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&mbuf);
83                                 if (likely(ret == 0))   {
84                                         store_packet(tbase, mbufs[j]);
85                                         mbuf->port = tbase->l3.reachable_port_id;
86                                         tx_ring_cti6(tbase, tbase->l3.ctrl_plane_ring, IP6_REQ_MAC_TO_MASTER, mbuf, tbase->l3.core_id, tbase->l3.task_id, &ip_dst);
87                                 } else {
88                                         plog_err("Failed to get a mbuf from arp/nd mempool\n");
89                                         tx_drop(mbufs[j]);
90                                         TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
91                                 }
92                                 break;
93                         case SEND_MBUF_AND_ARP_ND:
94                                 // We send the mbuf and an ND - we need to allocate another mbuf for ND
95                                 ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&mbuf);
96                                 if (likely(ret == 0))   {
97                                         mbuf->port = tbase->l3.reachable_port_id;
98                                         tx_ring_cti6(tbase, tbase->l3.ctrl_plane_ring, IP6_REQ_MAC_TO_MASTER, mbuf, tbase->l3.core_id, tbase->l3.task_id, &ip_dst);
99                                 } else {
100                                         plog_err("Failed to get a mbuf from arp/nd mempool\n");
101                                         // We still send the initial mbuf
102                                 }
103                                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + j, 1, out);
104                                 break;
105                         case DROP_MBUF:
106                                 tx_drop(mbufs[j]);
107                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
108                                 break;
109                         }
110                 }
111         }
112         if (n_pkts - first) {
113                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
114                 ok += ret;
115         }
116         return ok;
117 }
118 int tx_pkt_l3(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
119 {
120         uint32_t ip_dst;
121         int first = 0, ret, ok = 0, rc;
122         const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
123         struct rte_mbuf *arp_mbuf = NULL;       // used when one need to send both an ARP and a mbuf
124         uint64_t *time;
125         uint64_t tsc = rte_rdtsc();
126
127         for (int j = 0; j < n_pkts; j++) {
128                 if ((out) && (out[j] >= OUT_HANDLED))
129                         continue;
130                 if (unlikely((rc = write_dst_mac(tbase, mbufs[j], &ip_dst, &time, tsc)) != SEND_MBUF)) {
131                         if (j - first) {
132                                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
133                                 ok += ret;
134                         }
135                         first = j + 1;
136                         switch(rc) {
137                         case SEND_ARP_ND:
138                                 // We re-use the mbuf - no need to create a arp_mbuf and delete the existing mbuf
139                                 mbufs[j]->port = tbase->l3.reachable_port_id;
140                                 if (tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, IP4_REQ_MAC_TO_MASTER, mbufs[j], tbase->l3.core_id, tbase->l3.task_id, ip_dst) == 0)
141                                         update_arp_ndp_retransmit_timeout(&tbase->l3, time, 1000);
142                                 else
143                                         update_arp_ndp_retransmit_timeout(&tbase->l3, time, 100);
144                                 break;
145                         case SEND_MBUF_AND_ARP_ND:
146                                 // We send the mbuf and an ARP - we need to allocate another mbuf for ARP
147                                 ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&arp_mbuf);
148                                 if (likely(ret == 0))   {
149                                         arp_mbuf->port = tbase->l3.reachable_port_id;
150                                         if (tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, IP4_REQ_MAC_TO_MASTER, arp_mbuf, tbase->l3.core_id, tbase->l3.task_id, ip_dst) == 0)
151                                                 update_arp_ndp_retransmit_timeout(&tbase->l3, time, 1000);
152                                         else
153                                                 update_arp_ndp_retransmit_timeout(&tbase->l3, time, 100);
154                                 } else {
155                                         plog_err("Failed to get a mbuf from arp mempool\n");
156                                         // We still send the initial mbuf
157                                 }
158                                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + j, 1, out);
159                                 break;
160                         case DROP_MBUF:
161                                 tx_drop(mbufs[j]);
162                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
163                                 break;
164                         }
165                 }
166         }
167         if (n_pkts - first) {
168                 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
169                 ok += ret;
170         }
171         return ok;
172 }
173
174 /* The following help functions also report stats. Therefore we need
175    to pass the task_base struct. */
176 static inline int txhw_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
177 {
178         uint16_t ntx;
179         int ret;
180
181         /* TX vector mode can't transmit more than 32 packets */
182         if (n_pkts > MAX_PMD_TX) {
183                 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, MAX_PMD_TX);
184                 ntx += rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs + ntx, n_pkts - ntx);
185         } else {
186                 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
187         }
188         TASK_STATS_ADD_TX(&tbase->aux->stats, ntx);
189
190         ret =  n_pkts - ntx;
191         if (ntx < n_pkts) {
192                 plog_dbg("Failed to send %d packets from %p\n", ret, mbufs[0]);
193                 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts - ntx);
194                 if (tbase->tx_pkt == tx_pkt_bw) {
195                         uint32_t drop_bytes = 0;
196                         do {
197                                 drop_bytes += mbuf_wire_size(mbufs[ntx]);
198                                 rte_pktmbuf_free(mbufs[ntx++]);
199                         } while (ntx < n_pkts);
200                         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
201                 }
202                 else {
203                         do {
204                                 rte_pktmbuf_free(mbufs[ntx++]);
205                         } while (ntx < n_pkts);
206                 }
207         }
208         return ret;
209 }
210
211 static inline int txhw_no_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
212 {
213         uint16_t ret;
214         uint16_t n = n_pkts;
215
216         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
217         do {
218                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
219                 mbufs += ret;
220                 n_pkts -= ret;
221         }
222         while (n_pkts);
223         return (n != ret);
224 }
225
226 static inline int ring_enq_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
227 {
228         int ret = 0;
229         /* return 0 on succes, -ENOBUFS on failure */
230         // Rings can be single or multiproducer (ctrl rings are multi producer)
231 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
232         if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts))) {
233 #else
234         if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0)) {
235 #endif
236                 ret = n_pkts;
237                 if (tbase->tx_pkt == tx_pkt_bw) {
238                         uint32_t drop_bytes = 0;
239                         for (uint16_t i = 0; i < n_pkts; ++i) {
240                                 drop_bytes += mbuf_wire_size(mbufs[i]);
241                                 rte_pktmbuf_free(mbufs[i]);
242                         }
243                         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
244                         TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
245                 }
246                 else {
247                         for (uint16_t i = 0; i < n_pkts; ++i)
248                                 rte_pktmbuf_free(mbufs[i]);
249                         TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
250                 }
251         }
252         else {
253                 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
254         }
255         return ret;
256 }
257
258 static inline int ring_enq_no_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
259 {
260         int i = 0;
261 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
262         while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts)) {
263 #else
264         while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0) {
265 #endif
266                 i++;
267         };
268         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
269         return (i != 0);
270 }
271
272 void flush_queues_hw(struct task_base *tbase)
273 {
274         uint16_t prod, cons;
275
276         for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
277                 prod = tbase->ws_mbuf->idx[i].prod;
278                 cons = tbase->ws_mbuf->idx[i].cons;
279
280                 if (prod != cons) {
281                         tbase->ws_mbuf->idx[i].prod = 0;
282                         tbase->ws_mbuf->idx[i].cons = 0;
283                         txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
284                 }
285         }
286
287         tbase->flags &= ~FLAG_TX_FLUSH;
288 }
289
290 void flush_queues_sw(struct task_base *tbase)
291 {
292         uint16_t prod, cons;
293
294         for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
295                 prod = tbase->ws_mbuf->idx[i].prod;
296                 cons = tbase->ws_mbuf->idx[i].cons;
297
298                 if (prod != cons) {
299                         tbase->ws_mbuf->idx[i].prod = 0;
300                         tbase->ws_mbuf->idx[i].cons = 0;
301                         ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
302                 }
303         }
304         tbase->flags &= ~FLAG_TX_FLUSH;
305 }
306
307 void flush_queues_no_drop_hw(struct task_base *tbase)
308 {
309         uint16_t prod, cons;
310
311         for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
312                 prod = tbase->ws_mbuf->idx[i].prod;
313                 cons = tbase->ws_mbuf->idx[i].cons;
314
315                 if (prod != cons) {
316                         tbase->ws_mbuf->idx[i].prod = 0;
317                         tbase->ws_mbuf->idx[i].cons = 0;
318                         txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
319                 }
320         }
321
322         tbase->flags &= ~FLAG_TX_FLUSH;
323 }
324
325 void flush_queues_no_drop_sw(struct task_base *tbase)
326 {
327         uint16_t prod, cons;
328
329         for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
330                 prod = tbase->ws_mbuf->idx[i].prod;
331                 cons = tbase->ws_mbuf->idx[i].cons;
332
333                 if (prod != cons) {
334                         tbase->ws_mbuf->idx[i].prod = 0;
335                         tbase->ws_mbuf->idx[i].cons = 0;
336                         ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
337                 }
338         }
339         tbase->flags &= ~FLAG_TX_FLUSH;
340 }
341
342 /* "try" functions try to send packets to sw/hw w/o failing or blocking;
343    They return if ring/queue is full and are used by aggregators.
344    "try" functions do not have drop/no drop flavors
345    They are only implemented in never_discard mode (as by default they
346    use only one outgoing ring. */
347 uint16_t tx_try_self(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
348 {
349         if (n_pkts < 64) {
350                 tx_pkt_never_discard_self(tbase, mbufs, n_pkts, NULL);
351                 return n_pkts;
352         } else {
353                 tx_pkt_never_discard_self(tbase, mbufs, 64, NULL);
354                 return 64;
355         }
356 }
357
358 uint16_t tx_try_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
359 {
360         const int bulk_size = 64;
361         uint16_t ret = bulk_size, sent = 0, n_bulks;
362         n_bulks = n_pkts >> __builtin_ctz(bulk_size);
363
364         for (int i = 0; i < n_bulks; i++) {
365 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
366                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size);
367 #else
368                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size, NULL);
369 #endif
370                 mbufs += ret;
371                 sent += ret;
372                 if (ret != bulk_size)
373                         break;
374         }
375         if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
376 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
377                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)));
378 #else
379                 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)), NULL);
380 #endif
381                 mbufs += ret;
382                 sent += ret;
383         }
384         TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
385         return sent;
386 }
387
388 uint16_t tx_try_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
389 {
390         const int bulk_size = 64;
391         uint16_t ret = bulk_size, n_bulks, sent = 0;
392         n_bulks = n_pkts >>  __builtin_ctz(bulk_size);
393
394         const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
395         for (int i = 0; i < n_bulks; i++) {
396                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, bulk_size);
397                 mbufs += ret;
398                 sent += ret;
399                 if (ret != bulk_size)
400                         break;
401         }
402         if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
403                 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, (n_pkts & (bulk_size - 1)));
404                 mbufs += ret;
405                 sent += ret;
406         }
407         TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
408         return sent;
409 }
410
411 int tx_pkt_no_drop_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
412 {
413         return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
414 }
415
416 int tx_pkt_no_drop_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
417 {
418         static uint8_t fake_out[MAX_PKT_BURST] = {0};
419         int ret = 0;
420         if (n_pkts == MAX_PKT_BURST) {
421                 // First xmit what was queued
422                 uint16_t prod, cons;
423
424                 prod = tbase->ws_mbuf->idx[0].prod;
425                 cons = tbase->ws_mbuf->idx[0].cons;
426
427                 if ((uint16_t)(prod - cons)){
428                         tbase->flags &= ~FLAG_TX_FLUSH;
429                         tbase->ws_mbuf->idx[0].prod = 0;
430                         tbase->ws_mbuf->idx[0].cons = 0;
431                         ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
432                 }
433                 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
434         } else {
435                 ret+= tx_pkt_no_drop_hw(tbase, mbufs, n_pkts, fake_out);
436         }
437         return ret;
438 }
439
440 int tx_pkt_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
441 {
442         return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
443 }
444
445 int tx_pkt_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
446 {
447         static uint8_t fake_out[MAX_PKT_BURST] = {0};
448         int ret = 0;
449         if (n_pkts == MAX_PKT_BURST) {
450                 // First xmit what was queued
451                 uint16_t prod, cons;
452
453                 prod = tbase->ws_mbuf->idx[0].prod;
454                 cons = tbase->ws_mbuf->idx[0].cons;
455
456                 if ((uint16_t)(prod - cons)){
457                         tbase->flags &= ~FLAG_TX_FLUSH;
458                         tbase->ws_mbuf->idx[0].prod = 0;
459                         tbase->ws_mbuf->idx[0].cons = 0;
460                         ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
461                 }
462                 ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
463         } else {
464                 ret+= tx_pkt_hw(tbase, mbufs, n_pkts, fake_out);
465         }
466         return ret;
467 }
468
469 /* Transmit to hw using tx_params_hw_sw structure
470    This function is used  to transmit to hw when tx_params_hw_sw should be used
471    i.e. when the task needs to transmit both to hw and sw */
472 int tx_pkt_no_drop_never_discard_hw1_no_pointer(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
473 {
474         txhw_no_drop(&tbase->tx_params_hw_sw.tx_port_queue, mbufs, n_pkts, tbase);
475         return 0;
476 }
477
478 int tx_pkt_no_drop_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
479 {
480         return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
481 }
482
483 int tx_pkt_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
484 {
485         return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
486 }
487
488 static uint16_t tx_pkt_free_dropped(__attribute__((unused)) struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
489 {
490         uint64_t v = 0;
491         uint16_t i;
492         /* The most probable and most important optimize case is if
493            the no packets should be dropped. */
494         for (i = 0; i + 8 < n_pkts; i += 8) {
495                 v |= *((uint64_t*)(&out[i]));
496         }
497         for (; i < n_pkts; ++i) {
498                 v |= out[i];
499         }
500
501         if (unlikely(v)) {
502                 /* At least some packets need to be dropped, so the
503                    mbufs array needs to be updated. */
504                 uint16_t n_kept = 0;
505                 uint16_t n_discard = 0;
506                 for (uint16_t i = 0; i < n_pkts; ++i) {
507                         if (unlikely(out[i] >= OUT_HANDLED)) {
508                                 rte_pktmbuf_free(mbufs[i]);
509                                 n_discard += out[i] == OUT_DISCARD;
510                                 continue;
511                         }
512                         mbufs[n_kept++] = mbufs[i];
513                 }
514                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, n_discard);
515                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts - n_kept - n_discard);
516                 return n_kept;
517         }
518         return n_pkts;
519 }
520
521 int tx_pkt_no_drop_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
522 {
523         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
524         int ret = 0;
525
526         if (likely(n_kept))
527                 ret = txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
528         return ret;
529 }
530
531 int tx_pkt_no_drop_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
532 {
533         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
534         int ret = 0;
535
536         if (likely(n_kept))
537                 ret = ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
538         return ret;
539 }
540
541 int tx_pkt_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
542 {
543         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
544
545         if (likely(n_kept))
546                 return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
547         return n_pkts;
548 }
549
550 int tx_pkt_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
551 {
552         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
553
554         if (likely(n_kept))
555                 return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
556         return 0;
557 }
558
559 int tx_pkt_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
560 {
561         const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
562
563         TASK_STATS_ADD_TX(&tbase->aux->stats, n_kept);
564         tbase->ws_mbuf->idx[0].nb_rx = n_kept;
565         struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
566         for (uint16_t i = 0; i < n_kept; ++i) {
567                 tx_mbuf[i] = mbufs[i];
568         }
569         return 0;
570 }
571
572 int tx_pkt_never_discard_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
573 {
574         TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
575         tbase->ws_mbuf->idx[0].nb_rx = n_pkts;
576         struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
577         for (uint16_t i = 0; i < n_pkts; ++i) {
578                 tx_mbuf[i] = mbufs[i];
579         }
580         return 0;
581 }
582
583 int tx_pkt_no_drop_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
584 {
585         int ret = 0;
586         buf_pkt_all(tbase, mbufs, n_pkts, out);
587
588         const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
589         uint16_t prod, cons;
590
591         for (uint8_t i = 0; i < nb_bufs; ++i) {
592                 prod = tbase->ws_mbuf->idx[i].prod;
593                 cons = tbase->ws_mbuf->idx[i].cons;
594
595                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
596                         tbase->flags &= ~FLAG_TX_FLUSH;
597                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
598                         ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
599                 }
600         }
601         return ret;
602 }
603
604 int tx_pkt_no_drop_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
605 {
606         int ret = 0;
607         buf_pkt_all(tbase, mbufs, n_pkts, out);
608
609         const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
610         uint16_t prod, cons;
611
612         for (uint8_t i = 0; i < nb_bufs; ++i) {
613                 prod = tbase->ws_mbuf->idx[i].prod;
614                 cons = tbase->ws_mbuf->idx[i].cons;
615
616                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
617                         tbase->flags &= ~FLAG_TX_FLUSH;
618                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
619                         ret += ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
620                 }
621         }
622         return ret;
623 }
624
625 int tx_pkt_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
626 {
627         int ret = 0;
628         buf_pkt_all(tbase, mbufs, n_pkts, out);
629
630         const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
631         uint16_t prod, cons;
632
633         for (uint8_t i = 0; i < nb_bufs; ++i) {
634                 prod = tbase->ws_mbuf->idx[i].prod;
635                 cons = tbase->ws_mbuf->idx[i].cons;
636
637                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
638                         tbase->flags &= ~FLAG_TX_FLUSH;
639                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
640                         ret += txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
641                 }
642         }
643         return ret;
644 }
645
646 int tx_pkt_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
647 {
648         int ret = 0;
649         buf_pkt_all(tbase, mbufs, n_pkts, out);
650
651         const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
652         uint16_t prod, cons;
653         for (uint8_t i = 0; i < nb_bufs; ++i) {
654                 prod = tbase->ws_mbuf->idx[i].prod;
655                 cons = tbase->ws_mbuf->idx[i].cons;
656
657                 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
658                         tbase->flags &= ~FLAG_TX_FLUSH;
659                         tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
660                         ret+= ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
661                 }
662         }
663         return ret;
664 }
665
666 static inline void trace_one_rx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf)
667 {
668         struct rte_mbuf tmp;
669         /* For each packet being transmitted, find which
670            buffer represent the packet as it was before
671            processing. */
672         uint32_t j = 0;
673         uint32_t len = sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr)/sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr[0]);
674         for (;j < len; ++j) {
675                 if (tbase->aux->task_rt_dump.pkt_mbuf_addr[j] == mbuf)
676                         break;
677         }
678         if (j != len) {
679 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
680                 tmp.data_off = 0;
681 #endif
682                 rte_pktmbuf_data_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
683                 rte_pktmbuf_pkt_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
684                 tmp.buf_addr = tbase->aux->task_rt_dump.pkt_cpy[j];
685                 plogdx_info(&tmp, "Trace RX: ");
686         }
687 }
688
689 static inline void trace_one_tx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf, uint8_t *out, uint32_t i)
690 {
691         if (out) {
692                 switch(out[i]) {
693                 case 0xFE:
694                         plogdx_info(mbuf, "Handled: ");
695                         break;
696                 case 0xFF:
697                         plogdx_info(mbuf, "Dropped: ");
698                         break;
699                 default:
700                         plogdx_info(mbuf, "TX[%d]: ", out[i]);
701                         break;
702                 }
703         } else if (tbase->aux->tx_pkt_orig == tx_pkt_drop_all) {
704                 plogdx_info(mbuf, "Dropped: ");
705         } else
706                 plogdx_info(mbuf, "TX[0]: ");
707 }
708
709 static void unset_trace(struct task_base *tbase)
710 {
711         if (0 == tbase->aux->task_rt_dump.n_trace) {
712                 if ((tbase->tx_pkt == tx_pkt_l3) || (tbase->tx_pkt == tx_pkt_ndp)){
713                         tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
714                         tbase->aux->tx_pkt_orig = NULL;
715                 } else {
716                         tbase->tx_pkt = tbase->aux->tx_pkt_orig;
717                         tbase->aux->tx_pkt_orig = NULL;
718                 }
719                 tbase->aux->task_rt_dump.cur_trace = 0;
720                 task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
721         }
722 }
723
724 int tx_pkt_trace(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
725 {
726         int ret = 0;
727         if (tbase->aux->task_rt_dump.cur_trace == 0) {
728                 // No packet received since dumping...
729                 tbase->aux->task_rt_dump.n_print_tx = tbase->aux->task_rt_dump.n_trace;
730                 if (tbase->aux->task_rt_dump.n_trace < n_pkts) {
731                         tbase->aux->task_rt_dump.n_trace = 0;
732                         tbase->aux->task_rt_dump.cur_trace = 0;
733                         task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
734                 } else {
735                         tbase->aux->task_rt_dump.n_trace -= n_pkts;
736                 }
737                 ret = tx_pkt_dump(tbase, mbufs, n_pkts, out);
738                 tbase->aux->task_rt_dump.n_print_tx = 0;
739                 return ret;
740         }
741         plog_info("Tracing %d pkts\n", tbase->aux->task_rt_dump.cur_trace);
742         uint32_t cur_trace = (n_pkts < tbase->aux->task_rt_dump.cur_trace) ? n_pkts: tbase->aux->task_rt_dump.cur_trace;
743         for (uint32_t i = 0; i < cur_trace; ++i) {
744                 trace_one_rx_pkt(tbase, mbufs[i]);
745                 trace_one_tx_pkt(tbase, mbufs[i], out, i);
746
747         }
748         ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
749
750         unset_trace(tbase);
751         return ret;
752 }
753
754 int tx_pkt_dump(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
755 {
756         uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
757         int ret = 0;
758
759         n_dump = n_pkts < n_dump? n_pkts : n_dump;
760         for (uint32_t i = 0; i < n_dump; ++i) {
761                 if (out) {
762                         switch (out[i]) {
763                         case 0xFE:
764                                 plogdx_info(mbufs[i], "Handled: ");
765                                 break;
766                         case 0xFF:
767                                 plogdx_info(mbufs[i], "Dropped: ");
768                                 break;
769                         default:
770                                 plogdx_info(mbufs[i], "TX[%d]: ", out[i]);
771                                 break;
772                         }
773                 } else
774                         plogdx_info(mbufs[i], "TX: ");
775         }
776         tbase->aux->task_rt_dump.n_print_tx -= n_dump;
777
778         ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
779
780         if (0 == tbase->aux->task_rt_dump.n_print_tx) {
781                 if ((tbase->tx_pkt == tx_pkt_l3) || (tbase->tx_pkt == tx_pkt_ndp)) {
782                         tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
783                         tbase->aux->tx_pkt_orig = NULL;
784                 } else {
785                         tbase->tx_pkt = tbase->aux->tx_pkt_orig;
786                         tbase->aux->tx_pkt_orig = NULL;
787                 }
788         }
789         return ret;
790 }
791
792 /* Gather the distribution of the number of packets that have been
793    xmitted from one TX call. Since the value is only modified by the
794    task that xmits the packet, no atomic operation is needed. */
795 int tx_pkt_distr(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
796 {
797         if (likely(n_pkts < TX_BUCKET_SIZE))
798                 tbase->aux->tx_bucket[n_pkts]++;
799         else
800                 tbase->aux->tx_bucket[TX_BUCKET_SIZE - 1]++;
801         return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
802 }
803
804 int tx_pkt_bw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
805 {
806         uint32_t tx_bytes = 0;
807         uint32_t drop_bytes = 0;
808
809         for (uint16_t i = 0; i < n_pkts; ++i) {
810                 if (!out || out[i] < OUT_HANDLED)
811                         tx_bytes += mbuf_wire_size(mbufs[i]);
812                 else
813                         drop_bytes += mbuf_wire_size(mbufs[i]);
814         }
815
816         TASK_STATS_ADD_TX_BYTES(&tbase->aux->stats, tx_bytes);
817         TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
818         return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
819 }
820
821 int tx_pkt_drop_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
822 {
823         for (uint16_t j = 0; j < n_pkts; ++j) {
824                 rte_pktmbuf_free(mbufs[j]);
825         }
826         if (out == NULL)
827                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts);
828         else {
829                 for (uint16_t j = 0; j < n_pkts; ++j) {
830                         if (out[j] == OUT_HANDLED)
831                                 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
832                         else
833                                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
834                 }
835         }
836         return n_pkts;
837 }
838 static inline void dump_pkts(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
839 {
840         uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
841         uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
842
843         if (unlikely(n_dump)) {
844                 n_dump = n_pkts < n_dump? n_pkts : n_dump;
845                 for (uint32_t i = 0; i < n_dump; ++i) {
846                         plogdx_info(mbufs[i], "TX: ");
847                 }
848                 tbase->aux->task_rt_dump.n_print_tx -= n_dump;
849         } else if (unlikely(n_trace)) {
850                 n_trace = n_pkts < n_trace? n_pkts : n_trace;
851                 for (uint32_t i = 0; i < n_trace; ++i) {
852                         plogdx_info(mbufs[i], "TX: ");
853                 }
854                 tbase->aux->task_rt_dump.n_trace -= n_trace;
855         }
856 }
857
858 // ctrlplane packets are slow path, hence cost of checking if dump ortrace is needed in not too important
859 // easier to have this implementation than an implementation similar to dataplane tx
860 int tx_ctrlplane_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
861 {
862         dump_pkts(tbase, mbufs, n_pkts);
863         return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
864 }
865
866 int tx_ctrlplane_sw(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
867 {
868         dump_pkts(tbase, mbufs, n_pkts);
869         return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
870 }
871
872 static inline int tx_ring_all(struct task_base *tbase, struct rte_ring *ring, uint8_t command,  struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
873 {
874         if (tbase->aux->task_rt_dump.cur_trace) {
875                 trace_one_rx_pkt(tbase, mbuf);
876         }
877         ctrl_ring_set_command_core_task_ip(mbuf, ((uint64_t)ip << 32) | (core_id << 16) | (task_id << 8) | command);
878         return rte_ring_enqueue(ring, mbuf);
879 }
880
881 int tx_ring_cti(struct task_base *tbase, struct rte_ring *ring, uint8_t command,  struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
882 {
883         plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
884         int ret = tx_ring_all(tbase, ring, command,  mbuf, core_id, task_id, ip);
885         if (unlikely(ret != 0)) {
886                 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
887                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
888                 rte_pktmbuf_free(mbuf);
889         }
890         return ret;
891 }
892
893 void tx_ring_ip(struct task_base *tbase, struct rte_ring *ring, uint8_t command,  struct rte_mbuf *mbuf, uint32_t ip)
894 {
895         plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
896         int ret = tx_ring_all(tbase, ring, command,  mbuf, 0, 0, ip);
897         if (unlikely(ret != 0)) {
898                 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
899                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
900                 rte_pktmbuf_free(mbuf);
901         }
902 }
903
904 void tx_ring(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf)
905 {
906         plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
907         int ret = tx_ring_all(tbase, ring, command,  mbuf, 0, 0, 0);
908         if (unlikely(ret != 0)) {
909                 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
910                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
911                 rte_pktmbuf_free(mbuf);
912         }
913 }
914
915 void tx_ring_route(struct task_base *tbase, struct rte_ring *ring, int add, struct rte_mbuf *mbuf, uint32_t ip, uint32_t gateway_ip, uint32_t prefix)
916 {
917         uint8_t command;
918         if (add)
919                 command = ROUTE_ADD_FROM_MASTER;
920         else
921                 command = ROUTE_DEL_FROM_MASTER;
922
923         plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
924         ctrl_ring_set_command(mbuf, command);
925         ctrl_ring_set_ip(mbuf, ip);
926         ctrl_ring_set_gateway_ip(mbuf, gateway_ip);
927         ctrl_ring_set_prefix(mbuf, prefix);
928         if (tbase->aux->task_rt_dump.cur_trace) {
929                 trace_one_rx_pkt(tbase, mbuf);
930         }
931         int ret = rte_ring_enqueue(ring, mbuf);
932         if (unlikely(ret != 0)) {
933                 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
934                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
935                 rte_pktmbuf_free(mbuf);
936         }
937 }
938
939 void tx_ring_cti6(struct task_base *tbase, struct rte_ring *ring, uint8_t command,  struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, struct ipv6_addr *ip)
940 {
941         int ret;
942         plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
943         if (tbase->aux->task_rt_dump.cur_trace) {
944                 trace_one_rx_pkt(tbase, mbuf);
945         }
946         ctrl_ring_set_command_core_task_ip(mbuf, (core_id << 16) | (task_id << 8) | command);
947         ctrl_ring_set_ipv6_addr(mbuf, ip);
948         ret = rte_ring_enqueue(ring, mbuf);
949
950         if (unlikely(ret != 0)) {
951                 plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
952                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
953                 rte_pktmbuf_free(mbuf);
954         }
955 }
956
957 void tx_ring_ip6(struct task_base *tbase, struct rte_ring *ring, uint8_t command,  struct rte_mbuf *mbuf, struct ipv6_addr *ip)
958 {
959         int ret;
960         plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
961         if (tbase->aux->task_rt_dump.cur_trace) {
962                 trace_one_rx_pkt(tbase, mbuf);
963         }
964         ctrl_ring_set_command(mbuf, command);
965         ctrl_ring_set_ipv6_addr(mbuf, ip);
966         ret = rte_ring_enqueue(ring, mbuf);
967
968         if (unlikely(ret != 0)) {
969                 plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
970                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
971                 rte_pktmbuf_free(mbuf);
972         }
973 }
974
975 void tx_ring_ip6_data(struct task_base *tbase, struct rte_ring *ring, uint8_t command,  struct rte_mbuf *mbuf, struct ipv6_addr *ip, uint64_t data)
976 {
977         int ret;
978         plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
979         if (tbase->aux->task_rt_dump.cur_trace) {
980                 trace_one_rx_pkt(tbase, mbuf);
981         }
982         ctrl_ring_set_command(mbuf, command);
983         ctrl_ring_set_ipv6_addr(mbuf, ip);
984         ctrl_ring_set_data(mbuf, data);
985         ret = rte_ring_enqueue(ring, mbuf);
986
987         if (unlikely(ret != 0)) {
988                 plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
989                 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
990                 rte_pktmbuf_free(mbuf);
991         }
992
993 }