Merge "Fix memory leak introduced by 4a65cd84"
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     10
38 #define ACCURACY_BUFFER_SIZE    64
39
40 struct lat_info {
41         uint32_t rx_packet_index;
42         uint64_t tx_packet_index;
43         uint32_t tx_err;
44         uint32_t rx_err;
45         uint64_t rx_time;
46         uint64_t tx_time;
47         uint16_t port_queue_id;
48 #ifdef LAT_DEBUG
49         uint16_t id_in_bulk;
50         uint16_t bulk_size;
51         uint64_t begin;
52         uint64_t after;
53         uint64_t before;
54 #endif
55 };
56
57 struct delayed_latency_entry {
58         uint32_t rx_packet_id;
59         uint32_t tx_packet_id;
60         uint32_t packet_id;
61         uint8_t generator_id;
62         uint64_t pkt_rx_time;
63         uint64_t pkt_tx_time;
64         uint64_t rx_time_err;
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
68 {
69         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70         if (delayed_latency_entry->packet_id == packet_id)
71                 return delayed_latency_entry;
72         else
73                 return NULL;
74 }
75
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
77 {
78         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79         delayed_latency_entry->packet_id = packet_id;
80         return delayed_latency_entry;
81 }
82
83 struct rx_pkt_meta_data {
84         uint8_t  *hdr;
85         uint32_t pkt_tx_time;
86         uint32_t bytes_after_in_bulk;
87 };
88
89 struct task_lat {
90         struct task_base base;
91         uint64_t limit;
92         uint64_t rx_packet_index;
93         uint64_t last_pkts_tsc;
94         struct delayed_latency_entry **delayed_latency_entries;
95         struct lat_info *latency_buffer;
96         uint32_t latency_buffer_idx;
97         uint32_t latency_buffer_size;
98         uint64_t begin;
99         uint16_t lat_pos;
100         uint16_t unique_id_pos;
101         uint16_t accur_pos;
102         uint16_t sig_pos;
103         uint32_t sig;
104         volatile uint16_t use_lt; /* which lt to use, */
105         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
106         struct lat_test lt[2];
107         struct lat_test *lat_test;
108         uint32_t generator_count;
109         struct early_loss_detect *eld;
110         struct rx_pkt_meta_data *rx_pkt_meta;
111         uint64_t link_speed;
112         // Following fields are only used when starting or stopping, not in general runtime
113         uint64_t *prev_tx_packet_index;
114         FILE *fp_rx;
115         FILE *fp_tx;
116         struct prox_port_cfg *port;
117 };
118 /* This function calculate the difference between rx and tx_time
119  * Both values are uint32_t (see handle_lat_bulk)
120  * rx time should be higher than tx_time...except every UINT32_MAX
121  * cycles, when rx_time overflows.
122  * As the return value is also uint32_t, returning (rx_time - tx_time)
123  * is also fine when it overflows.
124  */
125 static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
126 {
127         return rx_time - tx_time;
128 }
129
130 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
131 {
132         if (task->use_lt == task->using_lt)
133                 return &task->lt[!task->using_lt];
134         return NULL;
135 }
136
137 void task_lat_use_other_latency_meassurement(struct task_lat *task)
138 {
139         task->use_lt = !task->using_lt;
140 }
141
142 static void task_lat_update_lat_test(struct task_lat *task)
143 {
144         if (task->use_lt != task->using_lt) {
145                 task->using_lt = task->use_lt;
146                 task->lat_test = &task->lt[task->using_lt];
147                 task->lat_test->accuracy_limit_tsc = task->limit;
148         }
149 }
150
151 static int compare_tx_time(const void *val1, const void *val2)
152 {
153         const struct lat_info *ptr1 = val1;
154         const struct lat_info *ptr2 = val2;
155
156         return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
157 }
158
159 static int compare_tx_packet_index(const void *val1, const void *val2)
160 {
161         const struct lat_info *ptr1 = val1;
162         const struct lat_info *ptr2 = val2;
163
164         return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
165 }
166
167 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
168 {
169         uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
170         uint32_t small = UINT32_MAX >> 1;
171
172         lat++;
173
174         /* Buffer is sorted so far by RX time.
175          * We might have packets being reordered by SUT.
176          *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
177          * Note that:
178          *      - overflow only happens if receiving and storing 4 billions packets...
179          *      - a absolute difference of less than 2 billion packets is not considered as an overflow
180          */
181         for (uint32_t i = 1; i < count; i++) {
182                 tx_packet_index = lat->tx_packet_index;
183                 if (tx_packet_index > old_tx_packet_index) {
184                         if (tx_packet_index - old_tx_packet_index < small) {
185                                 // The diff is small => increasing index count
186                         } else {
187                                 // The diff is big => it is more likely that the previous packet was overflow
188                                 n_overflow--;
189                         }
190                 } else {
191                         if (old_tx_packet_index - tx_packet_index < small) {
192                                 // The diff is small => packet reorder
193                         } else {
194                                 // The diff is big => it is more likely that this is an overflow
195                                 n_overflow++;
196                         }
197                 }
198                 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
199                 old_tx_packet_index = tx_packet_index;
200                 lat++;
201         }
202 }
203
204 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
205 {
206         uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
207         uint32_t small = UINT32_MAX >> 1;
208         lat++;
209
210         /*
211          * Same algorithm as above, but with time.
212          * Note that:
213          *      - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
214          *      - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
215          *              => algorithm does not work if receiving less than 1 packet every 2 seconds
216          */
217         for (uint32_t i = 1; i < count; i++) {
218                 tx_time = lat->tx_time;
219                 if (tx_time > old_tx_time) {
220                         if (tx_time - old_tx_time > small) {
221                                 n_overflow--;
222                         }
223                 } else {
224                         if (old_tx_time - tx_time > small) {
225                                 n_overflow++;
226                         }
227                 }
228                 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
229                 old_tx_time = tx_time;
230                 lat++;
231         }
232 }
233
234 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
235 {
236         struct lat_test *lat_test = task->lat_test;
237
238         for (uint32_t j = 0; j < task->generator_count; j++) {
239                 struct early_loss_detect *eld = &task->eld[j];
240
241                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
242         }
243 }
244
245 static void task_lat_reset_eld(struct task_lat *task)
246 {
247         for (uint32_t j = 0; j < task->generator_count; j++) {
248                 early_loss_detect_reset(&task->eld[j]);
249         }
250 }
251
252 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
253 {
254         uint64_t min_tsc = UINT64_MAX;
255
256         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
257                 if (min_tsc > task->latency_buffer[i].tx_time)
258                         min_tsc = task->latency_buffer[i].tx_time;
259         }
260
261         return min_tsc << LATENCY_ACCURACY;
262 }
263
264 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
265 {
266         uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
267
268         return lat << LATENCY_ACCURACY;
269 }
270
271 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
272 {
273         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
274 }
275
276 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
277 {
278         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
279 }
280
281 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
282 {
283         return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
284 }
285
286 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
287 {
288         return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
289 }
290
291 static void lat_write_latency_to_file(struct task_lat *task)
292 {
293         uint64_t min_tsc;
294         uint64_t n_loss;
295
296         min_tsc = lat_latency_buffer_get_min_tsc(task);
297
298         // Dumping all packet statistics
299         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
300         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
301         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
302                 struct lat_info *lat_info = &task->latency_buffer[i];
303                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
304                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
305                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
306
307                 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
308                         lat_info->rx_packet_index,
309                         lat_info->port_queue_id,
310                         lat_info->tx_packet_index,
311                         tsc_to_nsec(lat_tsc),
312                         tsc_to_nsec(rx_tsc - min_tsc),
313                         tsc_to_nsec(tx_tsc - min_tsc));
314         }
315
316         // To detect dropped packets, we need to sort them based on TX
317         if (task->unique_id_pos) {
318                 plogx_info("Adapting tx_packet_index\n");
319                 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
320                 plogx_info("Sorting packets based on tx_packet_index\n");
321                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
322                 plogx_info("Sorted packets based on packet_index\n");
323         } else {
324                 plogx_info("Adapting tx_time\n");
325                 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
326                 plogx_info("Sorting packets based on tx_time\n");
327                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
328                 plogx_info("Sorted packets based on packet_time\n");
329         }
330
331         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
332         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
333         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
334
335         for (uint32_t i = 0; i < task->generator_count;i++)
336                 task->prev_tx_packet_index[i] = -1;
337
338         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
339                 struct lat_info *lat_info = &task->latency_buffer[i];
340                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
341                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
342                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
343                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
344                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
345
346                 /* Packet n + ACCURACY_BUFFER_SIZE delivers the TX error for packet n,
347                    hence the last ACCURACY_BUFFER_SIZE packets do no have TX error. */
348                 if (i + ACCURACY_BUFFER_SIZE >= task->latency_buffer_idx) {
349                         tx_err_tsc = 0;
350                 }
351
352                 if (lat_info->port_queue_id >= task->generator_count) {
353                         plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
354                                 lat_info->port_queue_id, lat_info->tx_packet_index);
355                         continue;
356                 }
357                 // Log dropped packet
358                 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
359                 if (n_loss)
360                         fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
361                                 lat_info->port_queue_id,
362                                 lat_info->tx_packet_index - n_loss, n_loss);
363                 // Log next packet
364                 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
365                         lat_info->port_queue_id,
366                         lat_info->tx_packet_index,
367                         lat_info->rx_packet_index,
368                         tsc_to_nsec(lat_tsc),
369                         tsc_to_nsec(tx_tsc - min_tsc),
370                         tsc_to_nsec(rx_tsc - min_tsc),
371                         tsc_to_nsec(tx_err_tsc),
372                         tsc_to_nsec(rx_err_tsc));
373 #ifdef LAT_DEBUG
374                 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
375                         lat_info->id_in_bulk,
376                         lat_info->bulk_size,
377                         tsc_to_nsec(lat_info->begin - min_tsc),
378                         tsc_to_nsec(lat_info->before - min_tsc),
379                         tsc_to_nsec(lat_info->after - min_tsc));
380 #endif
381                 fprintf(task->fp_tx, "\n");
382                 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
383         }
384         fflush(task->fp_rx);
385         fflush(task->fp_tx);
386         task->latency_buffer_idx = 0;
387 }
388
389 static void lat_stop(struct task_base *tbase)
390 {
391         struct task_lat *task = (struct task_lat *)tbase;
392
393         if (task->unique_id_pos) {
394                 task_lat_count_remaining_lost_packets(task);
395                 task_lat_reset_eld(task);
396         }
397         if (task->latency_buffer)
398                 lat_write_latency_to_file(task);
399 }
400
401 #ifdef LAT_DEBUG
402 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
403 {
404         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
405
406         lat_info->bulk_size = bulk_size;
407         lat_info->id_in_bulk = id_in_bulk;
408         lat_info->begin = task->begin;
409         lat_info->before = task->base.aux->tsc_rx.before;
410         lat_info->after = task->base.aux->tsc_rx.after;
411 }
412 #endif
413
414 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
415 {
416         struct lat_info *lat_info;
417
418         /* If unique_id_pos is specified then latency is stored per
419            packet being sent. Lost packets are detected runtime, and
420            latency stored for those packets will be 0 */
421         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
422         lat_info->rx_packet_index = rx_packet_index;
423         lat_info->tx_packet_index = packet_id;
424         lat_info->port_queue_id = generator_id;
425         lat_info->rx_time = rx_time;
426         lat_info->tx_time = tx_time;
427         lat_info->rx_err = rx_err;
428         lat_info->tx_err = tx_err;
429 }
430
431 static uint32_t task_lat_early_loss_detect(struct task_lat *task, struct unique_id *unique_id)
432 {
433         struct early_loss_detect *eld;
434         uint8_t generator_id;
435         uint32_t packet_index;
436
437         unique_id_get(unique_id, &generator_id, &packet_index);
438
439         if (generator_id >= task->generator_count)
440                 return 0;
441
442         eld = &task->eld[generator_id];
443
444         return early_loss_detect_add(eld, packet_index);
445 }
446
447 static uint64_t tsc_extrapolate_backward(uint64_t link_speed, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
448 {
449         uint64_t tsc = tsc_from - (rte_get_tsc_hz()*bytes)/link_speed;
450         if (likely(tsc > tsc_minimum))
451                 return tsc;
452         else
453                 return tsc_minimum;
454 }
455
456 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
457 {
458         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
459         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
460
461         bucket_id = bucket_id < bucket_count? bucket_id : bucket_count;
462         lat_test->buckets[bucket_id]++;
463 }
464
465 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
466 {
467         lat_test->lost_packets += lost_packets;
468 }
469
470 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
471 {
472         if (error > lat_test->accuracy_limit_tsc)
473                 return;
474         lat_test->tot_pkts++;
475
476         lat_test->tot_lat += lat_tsc;
477         lat_test->tot_lat_error += error;
478
479         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
480         lat_test->var_lat += lat_tsc * lat_tsc;
481         lat_test->var_lat_error += 2 * lat_tsc * error;
482         lat_test->var_lat_error += error * error;
483
484         if (lat_tsc > lat_test->max_lat) {
485                 lat_test->max_lat = lat_tsc;
486                 lat_test->max_lat_error = error;
487         }
488         if (lat_tsc < lat_test->min_lat) {
489                 lat_test->min_lat = lat_tsc;
490                 lat_test->min_lat_error = error;
491         }
492
493 #ifdef LATENCY_HISTOGRAM
494         lat_test_histogram_add(lat_test, lat_tsc);
495 #endif
496 }
497
498 static int task_lat_can_store_latency(struct task_lat *task)
499 {
500         return task->latency_buffer_idx < task->latency_buffer_size;
501 }
502
503 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
504 {
505         if (tx_time == 0)
506                 return;
507         uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
508
509         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
510
511         if (task_lat_can_store_latency(task)) {
512                 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
513         }
514 }
515
516 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
517 {
518         struct task_lat *task = (struct task_lat *)tbase;
519         uint64_t rx_time_err;
520
521         uint32_t pkt_rx_time, pkt_tx_time;
522
523         // If link is down, link_speed is 0
524         if (unlikely(task->link_speed == 0)) {
525                 if (task->port && task->port->link_speed != 0) {
526                         task->link_speed = task->port->link_speed * 125000L;
527                         plog_info("\tPort %u: link speed is %ld Mbps\n",
528                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
529                 } else if (n_pkts) {
530                         return task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
531                 } else {
532                         return 0;
533                 }
534         }
535
536         if (n_pkts == 0) {
537                 task->begin = tbase->aux->tsc_rx.before;
538                 return 0;
539         }
540
541         task_lat_update_lat_test(task);
542
543         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
544         uint32_t tx_time_err = 0;
545
546         /* Go once through all received packets and read them.  If
547            packet has just been modified by another core, the cost of
548            latency will be partialy amortized though the bulk size */
549         for (uint16_t j = 0; j < n_pkts; ++j) {
550                 struct rte_mbuf *mbuf = mbufs[j];
551                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
552         }
553
554         if (task->sig) {
555                 for (uint16_t j = 0; j < n_pkts; ++j) {
556                         if (*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig)
557                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
558                         else
559                                 task->rx_pkt_meta[j].pkt_tx_time = 0;
560                 }
561         } else {
562                 for (uint16_t j = 0; j < n_pkts; ++j) {
563                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
564                 }
565         }
566
567         uint32_t bytes_total_in_bulk = 0;
568         // Find RX time of first packet, for RX accuracy
569         for (uint16_t j = 0; j < n_pkts; ++j) {
570                 uint16_t flipped = n_pkts - 1 - j;
571
572                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
573                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
574         }
575
576         pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
577         if ((uint32_t)((task->begin >> LATENCY_ACCURACY)) > pkt_rx_time) {
578                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
579                 rx_time_err = pkt_rx_time - (uint32_t)(task->last_pkts_tsc >> LATENCY_ACCURACY);
580         } else {
581                 rx_time_err = pkt_rx_time - (uint32_t)(task->begin >> LATENCY_ACCURACY);
582         }
583
584         struct unique_id *unique_id = NULL;
585         struct delayed_latency_entry *delayed_latency_entry;
586         uint32_t packet_id, generator_id;
587
588         for (uint16_t j = 0; j < n_pkts; ++j) {
589                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
590                 uint8_t *hdr = rx_pkt_meta->hdr;
591
592                 pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
593                 pkt_tx_time = rx_pkt_meta->pkt_tx_time;
594
595                 if (task->unique_id_pos) {
596                         unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
597
598                         uint32_t n_loss = task_lat_early_loss_detect(task, unique_id);
599                         packet_id = unique_id->packet_id;
600                         generator_id = unique_id->generator_id;
601                         lat_test_add_lost(task->lat_test, n_loss);
602                 } else {
603                         packet_id = task->rx_packet_index;
604                         generator_id = 0;
605                 }
606                 task->lat_test->tot_all_pkts++;
607
608                 /* If accuracy is enabled, latency is reported with a
609                    delay of ACCURACY_BUFFER_SIZE packets since the generator puts the
610                    accuracy for packet N into packet N + ACCURACY_BUFFER_SIZE. The delay
611                    ensures that all reported latencies have both rx
612                    and tx error. */
613                 if (task->accur_pos) {
614                         tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
615
616                         delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_BUFFER_SIZE);
617
618                         if (delayed_latency_entry) {
619                                 task_lat_store_lat(task,
620                                                    delayed_latency_entry->rx_packet_id,
621                                                    delayed_latency_entry->pkt_rx_time,
622                                                    delayed_latency_entry->pkt_tx_time,
623                                                    delayed_latency_entry->rx_time_err,
624                                                    tx_time_err,
625                                                    delayed_latency_entry->tx_packet_id,
626                                                    delayed_latency_entry->generator_id);
627                         }
628
629                         delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
630                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
631                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
632                         delayed_latency_entry->rx_time_err = rx_time_err;
633                         delayed_latency_entry->rx_packet_id = task->rx_packet_index;
634                         delayed_latency_entry->tx_packet_id = packet_id;
635                         delayed_latency_entry->generator_id = generator_id;
636                 } else {
637                         task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
638                 }
639                 task->rx_packet_index++;
640         }
641         int ret;
642         ret = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
643         task->begin = tbase->aux->tsc_rx.before;
644         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
645         return ret;
646 }
647
648 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
649 {
650         const int socket_id = rte_lcore_to_socket_id(core_id);
651         char name[256];
652         size_t latency_buffer_mem_size = 0;
653
654         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
655                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
656
657         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
658
659         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
660         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
661
662         sprintf(name, "latency.rx_%u.txt", core_id);
663         task->fp_rx = fopen(name, "w+");
664         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
665
666         sprintf(name, "latency.tx_%u.txt", core_id);
667         task->fp_tx = fopen(name, "w+");
668         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
669
670         task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
671         PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
672 }
673
674 static void task_init_generator_count(struct task_lat *task)
675 {
676         uint8_t *generator_count = prox_sh_find_system("generator_count");
677
678         if (generator_count == NULL) {
679                 task->generator_count = 1;
680                 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
681         } else
682                 task->generator_count = *generator_count;
683         plog_info("\tLatency using %u generators\n", task->generator_count);
684 }
685
686 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
687 {
688         size_t eld_mem_size;
689
690         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
691         task->eld = prox_zmalloc(eld_mem_size, socket_id);
692         PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
693 }
694
695 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
696 {
697         task->limit = nsec_to_tsc(accuracy_limit_nsec);
698 }
699
700 static void lat_start(struct task_base *tbase)
701 {
702         struct task_lat *task = (struct task_lat *)tbase;
703
704         if (task->port) {
705                 // task->port->link_speed reports the link speed in Mbps e.g. 40k for a 40 Gbps NIC.
706                 // task->link_speed reports link speed in Bytes per sec.
707                 // It can be 0 if link is down, and must hence be updated in fast path.
708                 task->link_speed = task->port->link_speed * 125000L;
709                 if (task->link_speed)
710                         plog_info("\tPort %u: link speed is %ld Mbps\n",
711                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
712                 else
713                         plog_info("\tPort %u: link speed is %ld Mbps - link might be down\n",
714                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
715         }
716 }
717
718 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
719 {
720         struct task_lat *task = (struct task_lat *)tbase;
721         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
722
723         task->lat_pos = targ->lat_pos;
724         task->accur_pos = targ->accur_pos;
725         task->sig_pos = targ->sig_pos;
726         task->sig = targ->sig;
727
728         task->unique_id_pos = targ->packet_id_pos;
729         task->latency_buffer_size = targ->latency_buffer_size;
730
731         task_init_generator_count(task);
732
733         if (task->latency_buffer_size) {
734                 init_task_lat_latency_buffer(task, targ->lconf->id);
735         }
736
737         if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
738                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
739         }
740
741         if (task->accur_pos) {
742                 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
743                 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
744                 for (uint i = 0; i < task->generator_count; i++) {
745                         task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
746                         PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
747                 }
748                 if (task->unique_id_pos == 0) {
749                         /* When using accuracy feature, the accuracy from TX is written ACCURACY_BUFFER_SIZE packets later
750                         * We can only retrieve the good packet if a packet id is written to it.
751                         * Otherwise we will use the packet RECEIVED ACCURACY_BUFFER_SIZE packets ago which is OK if
752                         * packets are not re-ordered. If packets are re-ordered, then the matching between
753                         * the tx accuracy znd the latency is wrong.
754                         */
755                         plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
756                 }
757         }
758
759         task->lt[0].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
760         task->lt[1].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
761         if (task->unique_id_pos) {
762                 task_lat_init_eld(task, socket_id);
763                 task_lat_reset_eld(task);
764         }
765         task->lat_test = &task->lt[task->using_lt];
766
767         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
768         task->rx_pkt_meta = prox_zmalloc(MAX_RX_PKT_ALL * sizeof(*task->rx_pkt_meta), socket_id);
769         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
770
771         task->link_speed = UINT64_MAX;
772         if (targ->nb_rxports) {
773                 // task->port structure is only used while starting handle_lat to get the link_speed.
774                 // link_speed can not be quiried at init as the port has not been initialized yet.
775                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
776                 task->port = port;
777         }
778 }
779
780 static struct task_init task_init_lat = {
781         .mode_str = "lat",
782         .init = init_task_lat,
783         .handle = handle_lat_bulk,
784         .start = lat_start,
785         .stop = lat_stop,
786         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_RX_ALL | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
787         .size = sizeof(struct task_lat)
788 };
789
790 __attribute__((constructor)) static void reg_task_lat(void)
791 {
792         reg_task(&task_init_lat);
793 }