Support packets in flight 60/74060/3 master
authorLuc Provoost <luc.provoost@gmail.com>
Fri, 30 Jun 2023 15:42:13 +0000 (17:42 +0200)
committerLuc Provoost <luc.provoost@gmail.com>
Mon, 24 Jul 2023 13:35:30 +0000 (13:35 +0000)
The handle_esp_bulk function might enqueue packets on a QAT device.
These packets are not necessarily dequeued in the same call to the
handle_bulk function since they might be asynchronously processed.
Therefore, in some cases, we need to call this handle_bulk
function again later, even when no new packets are received on the rx
queue. To achieve this, the TASK_FEATURE_ZERO_RX flag is set.
The number of packets in flight are stored to decide if packets need
to be dequeued or not.
The handle_esp_bulk function will continue to dequeue packets till
none are left, even in case no new packets arrive.

Signed-off-by: Luc Provoost <luc.provoost@gmail.com>
Change-Id: I15e5b92c2413a1d3823557f70b1f437b35ca5a12

VNFs/DPPD-PROX/handle_esp.c
VNFs/DPPD-PROX/main.c
VNFs/DPPD-PROX/task_base.h
VNFs/DPPD-PROX/thread_generic.c

index be46c21..a78130b 100644 (file)
@@ -68,7 +68,8 @@ struct task_esp {
        struct rte_mempool *session_pool;
        struct rte_cryptodev_sym_session *sess;
        struct rte_crypto_op *ops_burst[NUM_OPS];
-       unsigned len;
+       unsigned len; //number of ops ready to be enqueued
+       uint32_t pkts_in_flight; // difference between enqueued and dequeued
        uint8_t (*handle_esp_finish)(struct task_esp *task,
                        struct rte_mbuf *mbuf, uint8_t status);
        uint8_t (*handle_esp_ah)(struct task_esp *task, struct rte_mbuf *mbuf,
@@ -434,6 +435,7 @@ static void init_task_esp_enc(struct task_base *tbase, struct task_args *targ)
        task->handle_esp_finish = handle_enc_finish;
        task->handle_esp_ah = handle_esp_ah_enc;
        task->len = 0;
+       task->pkts_in_flight = 0;
        sprintf(name, "core_%03u_crypto_pool", lcore_id);
        task->crypto_op_pool = rte_crypto_op_pool_create(name,
                        RTE_CRYPTO_OP_TYPE_SYMMETRIC, targ->nb_mbuf, 128,
@@ -532,6 +534,7 @@ static void init_task_esp_dec(struct task_base *tbase, struct task_args *targ)
        task->handle_esp_finish = handle_dec_finish;
        task->handle_esp_ah = handle_esp_ah_dec;
        task->len = 0;
+       task->pkts_in_flight = 0;
        sprintf(name, "core_%03u_crypto_pool", lcore_id);
        task->crypto_op_pool = rte_crypto_op_pool_create(name,
                        RTE_CRYPTO_OP_TYPE_SYMMETRIC, targ->nb_mbuf, 128,
@@ -622,6 +625,7 @@ static int crypto_send_burst(struct task_esp *task, uint16_t n)
     unsigned i = 0;
     ret = rte_cryptodev_enqueue_burst(task->cdev_id,
             task->qp_id, task->ops_burst, n);
+    task->pkts_in_flight += ret;
     if (unlikely(ret < n)) {
        for (i = 0; i < (n-ret); i++) {
                mbufs[i] = task->ops_burst[ret + i]->sym->m_src;
@@ -644,55 +648,67 @@ static int handle_esp_bulk(struct task_base *tbase, struct rte_mbuf **mbufs,
        struct rte_crypto_op *ops_burst[MAX_PKT_BURST];
        int nbr_tx_pkt = 0;
 
-       if (rte_crypto_op_bulk_alloc(task->crypto_op_pool,
-                       RTE_CRYPTO_OP_TYPE_SYMMETRIC,
-                       ops_burst, n_pkts) != n_pkts) {
-               plog_info("Failed to allocate crypto operations, discarding \
-                               %d packets\n", n_pkts);
-               for (j = 0; j < n_pkts; j++) {
-                       out[j] = OUT_DISCARD;
-               }
-               nbr_tx_pkt += task->base.tx_pkt(&task->base, mbufs, n_pkts,
-                               out);
-       } 
-       else {
-               for (j = 0; j < n_pkts; j++) {
-                       result = task->handle_esp_ah(task, mbufs[j],
-                                       ops_burst[j]);
-                       if (result == 0) {
-                               task->ops_burst[task->len] = ops_burst[j];
-                               task->len++;
-                               /* enough ops to be sent */
-                               if (task->len == MAX_PKT_BURST) {
-                                       nbr_tx_pkt += crypto_send_burst(task,
-                                                       (uint16_t) MAX_PKT_BURST);
-                                       task->len = 0;
+       if (likely(n_pkts != 0)) {
+               if (rte_crypto_op_bulk_alloc(task->crypto_op_pool,
+                               RTE_CRYPTO_OP_TYPE_SYMMETRIC,
+                               ops_burst, n_pkts) != n_pkts) {
+                       plog_info("Failed to allocate crypto operations, discarding \
+                                       %d packets\n", n_pkts);
+                       for (j = 0; j < n_pkts; j++) {
+                               out[j] = OUT_DISCARD;
+                       }
+                       nbr_tx_pkt += task->base.tx_pkt(&task->base, mbufs, n_pkts,
+                                       out);
+               } 
+               else {
+                       for (j = 0; j < n_pkts; j++) {
+                               result = task->handle_esp_ah(task, mbufs[j],
+                                               ops_burst[j]);
+                               if (result == 0) {
+                                       task->ops_burst[task->len] = ops_burst[j];
+                                       task->len++;
+                                       /* enough ops to be sent */
+                                       if (task->len == MAX_PKT_BURST) {
+                                               nbr_tx_pkt += crypto_send_burst(task,
+                                                               (uint16_t) MAX_PKT_BURST);
+                                               task->len = 0;
+                                       }
+                               }
+                               else {
+                                       drop_mbufs[idx] = mbufs[j];
+                                       out[idx] = result;
+                                       idx++;
+                                       rte_crypto_op_free(ops_burst[j]);
+                                       plog_info("Failed handle_esp_ah for 1 \
+                                                       packet\n");
                                }
                        }
-                       else {
-                               drop_mbufs[idx] = mbufs[j];
-                               out[idx] = result;
-                               idx++;
+                       if (idx) nbr_tx_pkt += task->base.tx_pkt(&task->base,
+                                       drop_mbufs, idx, out);
+               }
+       } else if (task->len) {
+               // No packets where received on the rx queue, but this handle
+               // function was called anyway since some packets where not yet
+               // enqueued. Hence they get enqueued here in order to minimize
+               // latency or in case no new packets will arrive
+               nbr_tx_pkt += crypto_send_burst(task, task->len);
+               task->len = 0;
+       }
+       if (task->pkts_in_flight) {
+               do {
+                       nb_deq = rte_cryptodev_dequeue_burst(task->cdev_id,
+                                       task->qp_id, ops_burst, MAX_PKT_BURST);
+                       task->pkts_in_flight -= nb_deq;
+                       for (j = 0; j < nb_deq; j++) {
+                               mbufs[j] = ops_burst[j]->sym->m_src;
+                               out[j] = task->handle_esp_finish(task, mbufs[j],
+                                               ops_burst[j]->status);
                                rte_crypto_op_free(ops_burst[j]);
-                               plog_info("Failed handle_esp_ah for 1 \
-                                               packet\n");
                        }
-               }
-               if (idx) nbr_tx_pkt += task->base.tx_pkt(&task->base,
-                               drop_mbufs, idx, out);
+                       nbr_tx_pkt += task->base.tx_pkt(&task->base, mbufs, nb_deq,
+                                       out);
+               } while (nb_deq == MAX_PKT_BURST);
        }
-       do {
-               nb_deq = rte_cryptodev_dequeue_burst(task->cdev_id,
-                               task->qp_id, ops_burst, MAX_PKT_BURST);
-               for (j = 0; j < nb_deq; j++) {
-                       mbufs[j] = ops_burst[j]->sym->m_src;
-                       out[j] = task->handle_esp_finish(task, mbufs[j],
-                                       ops_burst[j]->status);
-                       rte_crypto_op_free(ops_burst[j]);
-               }
-               nbr_tx_pkt += task->base.tx_pkt(&task->base, mbufs, nb_deq,
-                               out);
-       } while (nb_deq == MAX_PKT_BURST);
        return nbr_tx_pkt;
 }
 
@@ -701,6 +717,7 @@ struct task_init task_init_esp_enc = {
         .mode_str = "esp_enc",
         .init = init_task_esp_enc,
         .handle = handle_esp_bulk,
+       .flag_features = TASK_FEATURE_ZERO_RX,
         .size = sizeof(struct task_esp),
 };
 
@@ -709,6 +726,7 @@ struct task_init task_init_esp_dec = {
         .mode_str = "esp_dec",
         .init = init_task_esp_dec,
         .handle = handle_esp_bulk,
+       .flag_features = TASK_FEATURE_ZERO_RX,
         .size = sizeof(struct task_esp),
 };
 
index 66fe63e..61abe6e 100644 (file)
@@ -116,7 +116,7 @@ static void check_mixed_normal_pipeline(void)
        }
 }
 
-static void check_zero_rx(void)
+static void check_no_rx(void)
 {
        struct lcore_cfg *lconf = NULL;
        struct task_args *targ;
@@ -228,7 +228,7 @@ static void check_cfg_consistent(void)
 {
        check_nb_mbuf();
        check_missing_rx();
-       check_zero_rx();
+       check_no_rx();
        check_mixed_normal_pipeline();
 }
 
index 7e231fc..89e5bb9 100644 (file)
@@ -56,8 +56,8 @@
 #define TASK_FEATURE_RX_ALL                    0x8000
 #define TASK_FEATURE_TXQ_FLAGS_MULTIPLE_MEMPOOL        0x20000
 
-#define TBASE_FLAG_TX_FLUSH                  0x01
-#define TBASE_FLAG_NEVER_FLUSH               0x02
+#define TBASE_FLAG_TX_FLUSH             0x01
+#define TBASE_FLAG_NEVER_FLUSH          0x02
 // Task specific flags
 #define TBASE_FLAG_LUT_QINQ_HASH               0x08
 #define TBASE_FLAG_LUT_QINQ_RSS        0x10
index 14fb943..39964de 100644 (file)
@@ -213,7 +213,6 @@ int thread_generic(struct lcore_cfg *lconf)
                                        next[task_id] = t->handle_bulk(t, mbufs, nb_rx);
                                }
                        }
-
                }
        }
        return 0;