Support packets in flight
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
index 0c0e7b5..04a4848 100644 (file)
@@ -86,6 +86,15 @@ struct rx_pkt_meta_data {
        uint32_t bytes_after_in_bulk;
 };
 
+struct loss_buffer {
+       uint32_t packet_id;
+       uint32_t n;
+};
+
+struct flows {
+       uint32_t packet_id;
+};
+
 struct task_lat {
        struct task_base base;
        uint64_t limit;
@@ -111,11 +120,19 @@ struct task_lat {
        struct rx_pkt_meta_data *rx_pkt_meta;
        // Following fields are only used when starting or stopping, not in general runtime
        uint64_t *prev_tx_packet_index;
+       FILE *fp_loss;
        FILE *fp_rx;
        FILE *fp_tx;
        struct prox_port_cfg *port;
        uint64_t *bytes_to_tsc;
        uint64_t *previous_packet;
+       uint32_t loss_buffer_size;
+       struct loss_buffer *loss_buffer;
+       uint32_t loss_id;
+       uint32_t packet_id_in_flow_pos;
+       int32_t flow_id_pos;
+       uint32_t flow_count;
+       struct flows *flows;
 };
 /* This function calculate the difference between rx and tx_time
  * Both values are uint32_t (see handle_lat_bulk)
@@ -400,7 +417,14 @@ static void lat_stop(struct task_base *tbase)
        if (task->unique_id_pos) {
                task_lat_count_remaining_lost_packets(task);
                task_lat_reset_eld(task);
+               memset(task->previous_packet, 0, sizeof(task->previous_packet) * task->generator_count);
        }
+       if (task->loss_id && task->fp_loss) {
+               for (uint i = 0; i < task->loss_id; i++) {
+                       fprintf(task->fp_loss, "packet %d: %d\n", task->loss_buffer[i].packet_id, task->loss_buffer[i].n);
+               }
+       }
+       task->lat_test->lost_packets = 0;
        if (task->latency_buffer)
                lat_write_latency_to_file(task);
 }
@@ -474,6 +498,15 @@ static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
        lat_test->buckets[bucket_id]++;
 }
 
+static void lat_test_check_flow_ordering(struct task_lat *task, struct lat_test *lat_test, int32_t flow_id, uint32_t packet_id)
+{
+       if (packet_id < task->flows[flow_id].packet_id) {
+               lat_test->mis_ordered++;
+               lat_test->extent += task->flows[flow_id].packet_id - packet_id;
+       }
+       task->flows[flow_id].packet_id = packet_id;
+}
+
 static void lat_test_check_ordering(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
 {
        if (packet_id < task->previous_packet[generator_id]) {
@@ -535,6 +568,7 @@ static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index,
 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
 {
        struct task_lat *task = (struct task_lat *)tbase;
+       static int max_flows_printed = 0;
        int rc;
 
        if (n_pkts == 0) {
@@ -623,6 +657,24 @@ static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uin
 
                uint8_t generator_id;
                uint32_t packet_id;
+               int32_t flow_id = -1;
+               if (task->flow_id_pos) {
+                       flow_id = *(int32_t *)(hdr + task->flow_id_pos);
+                       if (unlikely(flow_id >= (int32_t)(task->flow_count))) {
+                               flow_id = -1;
+                               if (!max_flows_printed) {
+                                       plog_info("Too many flows - increase flow count (only printed once)\n");
+                                       max_flows_printed = 1;
+                               }
+                       }
+
+               }
+               if (task->packet_id_in_flow_pos && (flow_id != -1)) {
+                       uint32_t packet_id_in_flow;
+                       struct unique_id *unique_id = (struct unique_id *)(hdr + task->packet_id_in_flow_pos);
+                       unique_id_get(unique_id, &generator_id, &packet_id_in_flow);
+                       lat_test_check_flow_ordering(task, task->lat_test, flow_id + generator_id * task->generator_count, packet_id_in_flow);
+               }
                if (task->unique_id_pos) {
                        struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
                        unique_id_get(unique_id, &generator_id, &packet_id);
@@ -634,9 +686,18 @@ static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uin
                                // Skip unexpected packet
                                continue;
                        }
-                       lat_test_check_ordering(task, task->lat_test, packet_id, generator_id);
+                       if (flow_id == -1) {
+                               lat_test_check_ordering(task, task->lat_test, packet_id, generator_id);
+                       }
                        lat_test_check_duplicate(task, task->lat_test, packet_id, generator_id);
-                       lat_test_add_lost(task->lat_test, task_lat_early_loss_detect(task, packet_id, generator_id));
+                       uint32_t loss =  task_lat_early_loss_detect(task, packet_id, generator_id);
+                       if (loss) {
+                               lat_test_add_lost(task->lat_test, loss);
+                               if (task->loss_id < task->loss_buffer_size) {
+                                       task->loss_buffer[task->loss_id].packet_id = packet_id;
+                                       task->loss_buffer[task->loss_id++].n = loss;
+                               }
+                       }
                } else {
                        generator_id = 0;
                        packet_id = task->rx_packet_index;
@@ -756,6 +817,8 @@ static void init_task_lat(struct task_base *tbase, struct task_args *targ)
        task->accur_pos = targ->accur_pos;
        task->sig_pos = targ->sig_pos;
        task->sig = targ->sig;
+       task->packet_id_in_flow_pos = targ->packet_id_in_flow_pos;
+       task->flow_id_pos = targ->flow_id_pos;
 
        task->unique_id_pos = targ->packet_id_pos;
        task->latency_buffer_size = targ->latency_buffer_size;
@@ -831,9 +894,20 @@ static void init_task_lat(struct task_base *tbase, struct task_args *targ)
                                (uint8_t)(port - prox_port_cfg), 8 * bytes_per_hz / 1000000);
                }
        }
+       task->loss_buffer_size = targ->loss_buffer_size;
+       if (task->loss_buffer_size) {
+               char name[256];
+               sprintf(name, "loss_%u.txt", targ->lconf->id);
+               task->fp_loss = fopen(name, "w+");
+               PROX_PANIC(task->fp_loss == NULL, "Failed to open %s\n", name);
+
+               task->loss_buffer = prox_zmalloc(task->loss_buffer_size * sizeof(struct loss_buffer), rte_lcore_to_socket_id(targ->lconf->id));
+               PROX_PANIC(task->loss_buffer == NULL,
+                       "Failed to allocate %lu bytes (in huge pages) for loss_buffer\n", task->loss_buffer_size * sizeof(struct loss_buffer));
+       }
        task->bytes_to_tsc = prox_zmalloc(max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST, rte_lcore_to_socket_id(targ->lconf->id));
        PROX_PANIC(task->bytes_to_tsc == NULL,
-               "Failed to allocate %u bytes (in huge pages) for bytes_to_tsc\n", max_frame_size);
+               "Failed to allocate %lu bytes (in huge pages) for bytes_to_tsc\n", max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST);
 
         // There are cases where hz estimate might be slighly over-estimated
         // This results in too much extrapolation
@@ -844,6 +918,13 @@ static void init_task_lat(struct task_base *tbase, struct task_args *targ)
                else
                        task->bytes_to_tsc[i] = (rte_get_tsc_hz() * i * 0.99) / bytes_per_hz;
        }
+       task->flow_count = targ->flow_count;
+       PROX_PANIC(task->flow_id_pos && (task->flow_count == 0), "flow_count must be configured when flow_id_pos is set\n");
+       if (task->flow_count) {
+               task->flows = prox_zmalloc(task->flow_count * sizeof(struct flows) * task->generator_count, rte_lcore_to_socket_id(targ->lconf->id));
+               PROX_PANIC(task->flows == NULL,
+                       "Failed to allocate %lu bytes (in huge pages) for flows\n", task->flow_count * sizeof(struct flows) * task->generator_count);
+       }
 }
 
 static struct task_init task_init_lat = {