Preparation for packet mis-ordering stats
[samplevnf.git] / VNFs / DPPD-PROX / tx_pkt.c
index c6f6010..551056a 100644 (file)
@@ -1,5 +1,5 @@
 /*
-// Copyright (c) 2010-2017 Intel Corporation
+// Copyright (c) 2010-2020 Intel Corporation
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -25,6 +25,8 @@
 #include "prox_assert.h"
 #include "log.h"
 #include "mbuf_utils.h"
+#include "handle_master.h"
+#include "defines.h"
 
 static void buf_pkt_single(struct task_base *tbase, struct rte_mbuf *mbuf, const uint8_t out)
 {
@@ -49,9 +51,131 @@ static inline void buf_pkt_all(struct task_base *tbase, struct rte_mbuf **mbufs,
 }
 #define MAX_PMD_TX 32
 
+void store_packet(struct task_base *tbase, struct rte_mbuf *mbuf)
+{
+       // If buffer is full, drop the first mbuf
+       if (tbase->aux->mbuf)
+               tx_drop(tbase->aux->mbuf);
+       tbase->aux->mbuf = mbuf;
+}
+
+int tx_pkt_ndp(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
+{
+       struct ipv6_addr ip_dst;
+       int first = 0, ret, ok = 0, rc;
+       const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
+       struct rte_mbuf *mbuf = NULL;       // used when one need to send both an ARP and a mbuf
+       uint16_t vlan;
+       uint64_t tsc = rte_rdtsc();
+
+       for (int j = 0; j < n_pkts; j++) {
+               if ((out) && (out[j] >= OUT_HANDLED))
+                       continue;
+               if (unlikely((rc = write_ip6_dst_mac(tbase, mbufs[j], &ip_dst, &vlan, tsc)) != SEND_MBUF)) {
+                       if (j - first) {
+                               ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
+                               ok += ret;
+                       }
+                       first = j + 1;
+                       switch(rc) {
+                       case SEND_ARP_ND:
+                               // Original mbuf (packet) is stored to be sent later -> need to allocate new mbuf
+                               ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&mbuf);
+                               if (likely(ret == 0))   {
+                                       store_packet(tbase, mbufs[j]);
+                                       mbuf->port = tbase->l3.reachable_port_id;
+                                       tx_ring_cti6(tbase, tbase->l3.ctrl_plane_ring, IP6_REQ_MAC_TO_MASTER, mbuf, tbase->l3.core_id, tbase->l3.task_id, &ip_dst, vlan);
+                               } else {
+                                       plog_err("Failed to get a mbuf from arp/nd mempool\n");
+                                       tx_drop(mbufs[j]);
+                                       TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
+                               }
+                               break;
+                       case SEND_MBUF_AND_ARP_ND:
+                               // We send the mbuf and an ND - we need to allocate another mbuf for ND
+                               ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&mbuf);
+                               if (likely(ret == 0))   {
+                                       mbuf->port = tbase->l3.reachable_port_id;
+                                       tx_ring_cti6(tbase, tbase->l3.ctrl_plane_ring, IP6_REQ_MAC_TO_MASTER, mbuf, tbase->l3.core_id, tbase->l3.task_id, &ip_dst, vlan);
+                               } else {
+                                       plog_err("Failed to get a mbuf from arp/nd mempool\n");
+                                       // We still send the initial mbuf
+                               }
+                               ret = tbase->aux->tx_pkt_l2(tbase, mbufs + j, 1, out);
+                               break;
+                       case DROP_MBUF:
+                               tx_drop(mbufs[j]);
+                               TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
+                               break;
+                       }
+               }
+       }
+       if (n_pkts - first) {
+               ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
+               ok += ret;
+       }
+       return ok;
+}
+int tx_pkt_l3(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
+{
+       uint32_t ip_dst;
+       uint16_t vlan;
+       int first = 0, ret, ok = 0, rc;
+       const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
+       struct rte_mbuf *arp_mbuf = NULL;       // used when one need to send both an ARP and a mbuf
+       uint64_t *time;
+       uint64_t tsc = rte_rdtsc();
+
+       for (int j = 0; j < n_pkts; j++) {
+               if ((out) && (out[j] >= OUT_HANDLED))
+                       continue;
+               if (unlikely((rc = write_dst_mac(tbase, mbufs[j], &ip_dst, &vlan, &time, tsc)) != SEND_MBUF)) {
+                       if (j - first) {
+                               ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
+                               ok += ret;
+                       }
+                       first = j + 1;
+                       switch(rc) {
+                       case SEND_ARP_ND:
+                               // We re-use the mbuf - no need to create a arp_mbuf and delete the existing mbuf
+                               mbufs[j]->port = tbase->l3.reachable_port_id;
+                               if (tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, IP4_REQ_MAC_TO_MASTER, mbufs[j], tbase->l3.core_id, tbase->l3.task_id, ip_dst, vlan) == 0)
+                                       update_arp_ndp_retransmit_timeout(&tbase->l3, time, 1000);
+                               else
+                                       update_arp_ndp_retransmit_timeout(&tbase->l3, time, 100);
+                               break;
+                       case SEND_MBUF_AND_ARP_ND:
+                               // We send the mbuf and an ARP - we need to allocate another mbuf for ARP
+                               ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&arp_mbuf);
+                               if (likely(ret == 0))   {
+                                       arp_mbuf->port = tbase->l3.reachable_port_id;
+                                       if (tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, IP4_REQ_MAC_TO_MASTER, arp_mbuf, tbase->l3.core_id, tbase->l3.task_id, ip_dst, vlan) == 0)
+                                               update_arp_ndp_retransmit_timeout(&tbase->l3, time, 1000);
+                                       else
+                                               update_arp_ndp_retransmit_timeout(&tbase->l3, time, 100);
+                               } else {
+                                       plog_err("Failed to get a mbuf from arp mempool\n");
+                                       // We still send the initial mbuf
+                               }
+                               ret = tbase->aux->tx_pkt_l2(tbase, mbufs + j, 1, out);
+                               break;
+                       case DROP_MBUF:
+                               tx_drop(mbufs[j]);
+                               TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
+                               break;
+                       }
+               }
+       }
+       if (n_pkts - first) {
+               ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
+               ok += ret;
+       }
+       return ok;
+}
+
 /* The following help functions also report stats. Therefore we need
    to pass the task_base struct. */
-static inline int txhw_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
+static inline int txhw_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
 {
        uint16_t ntx;
        int ret;
@@ -63,10 +187,11 @@ static inline int txhw_drop(const struct port_queue *port_queue, struct rte_mbuf
        } else {
                ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
        }
-
        TASK_STATS_ADD_TX(&tbase->aux->stats, ntx);
+
        ret =  n_pkts - ntx;
        if (ntx < n_pkts) {
+               plog_dbg("Failed to send %d packets from %p\n", ret, mbufs[0]);
                TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts - ntx);
                if (tbase->tx_pkt == tx_pkt_bw) {
                        uint32_t drop_bytes = 0;
@@ -85,13 +210,12 @@ static inline int txhw_drop(const struct port_queue *port_queue, struct rte_mbuf
        return ret;
 }
 
-static inline int txhw_no_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
+static inline int txhw_no_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
 {
        uint16_t ret;
        uint16_t n = n_pkts;
 
        TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
-
        do {
                ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
                mbufs += ret;
@@ -265,11 +389,11 @@ uint16_t tx_try_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n
 
 uint16_t tx_try_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
 {
-       const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
        const int bulk_size = 64;
        uint16_t ret = bulk_size, n_bulks, sent = 0;
        n_bulks = n_pkts >>  __builtin_ctz(bulk_size);
 
+       const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
        for (int i = 0; i < n_bulks; i++) {
                ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, bulk_size);
                mbufs += ret;
@@ -541,59 +665,91 @@ int tx_pkt_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts,
        return ret;
 }
 
+static inline void trace_one_rx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf)
+{
+       struct rte_mbuf tmp;
+       /* For each packet being transmitted, find which
+          buffer represent the packet as it was before
+          processing. */
+       uint32_t j = 0;
+       uint32_t len = sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr)/sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr[0]);
+       for (;j < len; ++j) {
+               if (tbase->aux->task_rt_dump.pkt_mbuf_addr[j] == mbuf)
+                       break;
+       }
+       if (j != len) {
+#if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
+               tmp.data_off = 0;
+#endif
+               rte_pktmbuf_data_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
+               rte_pktmbuf_pkt_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
+               tmp.buf_addr = tbase->aux->task_rt_dump.pkt_cpy[j];
+               plogdx_info(&tmp, "Trace RX: ");
+       }
+}
+
+static inline void trace_one_tx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf, uint8_t *out, uint32_t i)
+{
+       if (out) {
+               switch(out[i]) {
+               case 0xFE:
+                       plogdx_info(mbuf, "Handled: ");
+                       break;
+               case 0xFF:
+                       plogdx_info(mbuf, "Dropped: ");
+                       break;
+               default:
+                       plogdx_info(mbuf, "TX[%d]: ", out[i]);
+                       break;
+               }
+       } else if (tbase->aux->tx_pkt_orig == tx_pkt_drop_all) {
+               plogdx_info(mbuf, "Dropped: ");
+       } else
+               plogdx_info(mbuf, "TX[0]: ");
+}
+
+static void unset_trace(struct task_base *tbase)
+{
+       if (0 == tbase->aux->task_rt_dump.n_trace) {
+               if ((tbase->tx_pkt == tx_pkt_l3) || (tbase->tx_pkt == tx_pkt_ndp)){
+                       tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
+                       tbase->aux->tx_pkt_orig = NULL;
+               } else {
+                       tbase->tx_pkt = tbase->aux->tx_pkt_orig;
+                       tbase->aux->tx_pkt_orig = NULL;
+               }
+               tbase->aux->task_rt_dump.cur_trace = 0;
+               task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
+       }
+}
+
 int tx_pkt_trace(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
 {
        int ret = 0;
        if (tbase->aux->task_rt_dump.cur_trace == 0) {
                // No packet received since dumping...
-               // So the transmitted packets should not be linked to received packets
                tbase->aux->task_rt_dump.n_print_tx = tbase->aux->task_rt_dump.n_trace;
-               tbase->aux->task_rt_dump.n_trace = 0;
-               task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
-               return tx_pkt_dump(tbase, mbufs, n_pkts, out);
+               if (tbase->aux->task_rt_dump.n_trace < n_pkts) {
+                       tbase->aux->task_rt_dump.n_trace = 0;
+                       tbase->aux->task_rt_dump.cur_trace = 0;
+                       task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
+               } else {
+                       tbase->aux->task_rt_dump.n_trace -= n_pkts;
+               }
+               ret = tx_pkt_dump(tbase, mbufs, n_pkts, out);
+               tbase->aux->task_rt_dump.n_print_tx = 0;
+               return ret;
        }
        plog_info("Tracing %d pkts\n", tbase->aux->task_rt_dump.cur_trace);
+       uint32_t cur_trace = (n_pkts < tbase->aux->task_rt_dump.cur_trace) ? n_pkts: tbase->aux->task_rt_dump.cur_trace;
+       for (uint32_t i = 0; i < cur_trace; ++i) {
+               trace_one_rx_pkt(tbase, mbufs[i]);
+               trace_one_tx_pkt(tbase, mbufs[i], out, i);
 
-       for (uint32_t i = 0; i < tbase->aux->task_rt_dump.cur_trace; ++i) {
-               struct rte_mbuf tmp;
-               /* For each packet being transmitted, find which
-                  buffer represent the packet as it was before
-                  processing. */
-               uint32_t j = 0;
-               uint32_t len = sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr)/sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr[0]);
-               for (;j < len; ++j) {
-                       if (tbase->aux->task_rt_dump.pkt_mbuf_addr[j] == mbufs[i])
-                               break;
-               }
-               if (j == len) {
-                       plog_info("Trace RX: missing!\n");
-               }
-               else {
-#if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
-                       tmp.data_off = 0;
-#endif
-                       rte_pktmbuf_data_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
-                       rte_pktmbuf_pkt_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
-                       tmp.buf_addr = tbase->aux->task_rt_dump.pkt_cpy[j];
-                       plogd_info(&tmp, "Trace RX: ");
-               }
-
-               if (out) {
-                       if (out[i] != 0xFF)
-                               plogd_info(mbufs[i], "Trace TX[%d]: ", out[i]);
-                       else
-                               plogd_info(mbufs[i], "Trace Dropped: ");
-               } else
-                       plogd_info(mbufs[i], "Trace TX: ");
        }
        ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
 
-       /* Unset by TX when n_trace = 0 */
-       if (0 == tbase->aux->task_rt_dump.n_trace) {
-               tbase->tx_pkt = tbase->aux->tx_pkt_orig;
-               tbase->aux->tx_pkt_orig = NULL;
-               task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
-       }
+       unset_trace(tbase);
        return ret;
 }
 
@@ -604,18 +760,33 @@ int tx_pkt_dump(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkt
 
        n_dump = n_pkts < n_dump? n_pkts : n_dump;
        for (uint32_t i = 0; i < n_dump; ++i) {
-               if (out)
-                       plogd_info(mbufs[i], "TX[%d]: ", out[i]);
-               else
-                       plogd_info(mbufs[i], "TX: ");
+               if (out) {
+                       switch (out[i]) {
+                       case 0xFE:
+                               plogdx_info(mbufs[i], "Handled: ");
+                               break;
+                       case 0xFF:
+                               plogdx_info(mbufs[i], "Dropped: ");
+                               break;
+                       default:
+                               plogdx_info(mbufs[i], "TX[%d]: ", out[i]);
+                               break;
+                       }
+               } else
+                       plogdx_info(mbufs[i], "TX: ");
        }
        tbase->aux->task_rt_dump.n_print_tx -= n_dump;
 
        ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
 
        if (0 == tbase->aux->task_rt_dump.n_print_tx) {
-               tbase->tx_pkt = tbase->aux->tx_pkt_orig;
-               tbase->aux->tx_pkt_orig = NULL;
+               if ((tbase->tx_pkt == tx_pkt_l3) || (tbase->tx_pkt == tx_pkt_ndp)) {
+                       tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
+                       tbase->aux->tx_pkt_orig = NULL;
+               } else {
+                       tbase->tx_pkt = tbase->aux->tx_pkt_orig;
+                       tbase->aux->tx_pkt_orig = NULL;
+               }
        }
        return ret;
 }
@@ -625,7 +796,10 @@ int tx_pkt_dump(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkt
    task that xmits the packet, no atomic operation is needed. */
 int tx_pkt_distr(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
 {
-       tbase->aux->tx_bucket[n_pkts]++;
+       if (likely(n_pkts < TX_BUCKET_SIZE))
+               tbase->aux->tx_bucket[n_pkts]++;
+       else
+               tbase->aux->tx_bucket[TX_BUCKET_SIZE - 1]++;
        return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
 }
 
@@ -663,3 +837,161 @@ int tx_pkt_drop_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n
        }
        return n_pkts;
 }
+static inline void dump_pkts(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
+{
+       uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
+       uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
+
+       if (unlikely(n_dump)) {
+               n_dump = n_pkts < n_dump? n_pkts : n_dump;
+               for (uint32_t i = 0; i < n_dump; ++i) {
+                       plogdx_info(mbufs[i], "TX: ");
+               }
+               tbase->aux->task_rt_dump.n_print_tx -= n_dump;
+       } else if (unlikely(n_trace)) {
+               n_trace = n_pkts < n_trace? n_pkts : n_trace;
+               for (uint32_t i = 0; i < n_trace; ++i) {
+                       plogdx_info(mbufs[i], "TX: ");
+               }
+               tbase->aux->task_rt_dump.n_trace -= n_trace;
+       }
+}
+
+// ctrlplane packets are slow path, hence cost of checking if dump ortrace is needed in not too important
+// easier to have this implementation than an implementation similar to dataplane tx
+int tx_ctrlplane_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
+{
+       dump_pkts(tbase, mbufs, n_pkts);
+       return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
+}
+
+int tx_ctrlplane_sw(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
+{
+       dump_pkts(tbase, mbufs, n_pkts);
+        return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
+}
+
+static inline int tx_ring_all(struct task_base *tbase, struct rte_ring *ring, uint8_t command,  struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
+{
+       if (tbase->aux->task_rt_dump.cur_trace) {
+               trace_one_rx_pkt(tbase, mbuf);
+       }
+       ctrl_ring_set_command_core_task_ip(mbuf, ((uint64_t)ip << 32) | (core_id << 16) | (task_id << 8) | command);
+       return rte_ring_enqueue(ring, mbuf);
+}
+
+int tx_ring_cti(struct task_base *tbase, struct rte_ring *ring, uint8_t command,  struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip, uint16_t vlan)
+{
+       plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
+       ctrl_ring_set_vlan(mbuf, vlan);
+       int ret = tx_ring_all(tbase, ring, command,  mbuf, core_id, task_id, ip);
+       if (unlikely(ret != 0)) {
+               plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
+               TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
+               rte_pktmbuf_free(mbuf);
+       }
+       return ret;
+}
+
+void tx_ring_ip(struct task_base *tbase, struct rte_ring *ring, uint8_t command,  struct rte_mbuf *mbuf, uint32_t ip)
+{
+       plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
+       int ret = tx_ring_all(tbase, ring, command,  mbuf, 0, 0, ip);
+       if (unlikely(ret != 0)) {
+               plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
+               TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
+               rte_pktmbuf_free(mbuf);
+       }
+}
+
+void tx_ring(struct task_base *tbase, struct rte_ring *ring, uint16_t command,  struct rte_mbuf *mbuf)
+{
+       plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
+       int ret = tx_ring_all(tbase, ring, command,  mbuf, 0, 0, 0);
+       if (unlikely(ret != 0)) {
+               plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
+               TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
+               rte_pktmbuf_free(mbuf);
+       }
+}
+
+void tx_ring_route(struct task_base *tbase, struct rte_ring *ring, int add, struct rte_mbuf *mbuf, uint32_t ip, uint32_t gateway_ip, uint32_t prefix)
+{
+       uint8_t command;
+       if (add)
+               command = ROUTE_ADD_FROM_MASTER;
+       else
+               command = ROUTE_DEL_FROM_MASTER;
+
+       plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
+       ctrl_ring_set_command(mbuf, command);
+       ctrl_ring_set_ip(mbuf, ip);
+       ctrl_ring_set_gateway_ip(mbuf, gateway_ip);
+       ctrl_ring_set_prefix(mbuf, prefix);
+       if (tbase->aux->task_rt_dump.cur_trace) {
+               trace_one_rx_pkt(tbase, mbuf);
+       }
+       int ret = rte_ring_enqueue(ring, mbuf);
+       if (unlikely(ret != 0)) {
+               plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
+               TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
+               rte_pktmbuf_free(mbuf);
+       }
+}
+
+void tx_ring_cti6(struct task_base *tbase, struct rte_ring *ring, uint8_t command,  struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, struct ipv6_addr *ip, uint16_t vlan)
+{
+       int ret;
+       plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
+       if (tbase->aux->task_rt_dump.cur_trace) {
+               trace_one_rx_pkt(tbase, mbuf);
+       }
+       ctrl_ring_set_command_core_task_ip(mbuf, (core_id << 16) | (task_id << 8) | command);
+       ctrl_ring_set_ipv6_addr(mbuf, ip);
+       ctrl_ring_set_vlan(mbuf, vlan);
+       ret = rte_ring_enqueue(ring, mbuf);
+
+       if (unlikely(ret != 0)) {
+               plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
+               TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
+               rte_pktmbuf_free(mbuf);
+       }
+}
+
+void tx_ring_ip6(struct task_base *tbase, struct rte_ring *ring, uint8_t command,  struct rte_mbuf *mbuf, struct ipv6_addr *ip)
+{
+       int ret;
+       plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
+       if (tbase->aux->task_rt_dump.cur_trace) {
+               trace_one_rx_pkt(tbase, mbuf);
+       }
+       ctrl_ring_set_command(mbuf, command);
+       ctrl_ring_set_ipv6_addr(mbuf, ip);
+       ret = rte_ring_enqueue(ring, mbuf);
+
+       if (unlikely(ret != 0)) {
+               plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
+               TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
+               rte_pktmbuf_free(mbuf);
+       }
+}
+
+void tx_ring_ip6_data(struct task_base *tbase, struct rte_ring *ring, uint8_t command,  struct rte_mbuf *mbuf, struct ipv6_addr *ip, uint64_t data)
+{
+       int ret;
+       plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
+       if (tbase->aux->task_rt_dump.cur_trace) {
+               trace_one_rx_pkt(tbase, mbuf);
+       }
+       ctrl_ring_set_command(mbuf, command);
+       ctrl_ring_set_ipv6_addr(mbuf, ip);
+       ctrl_ring_set_data(mbuf, data);
+       ret = rte_ring_enqueue(ring, mbuf);
+
+       if (unlikely(ret != 0)) {
+               plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
+               TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
+               rte_pktmbuf_free(mbuf);
+       }
+
+}