0ebe1e135ea8308d4c26fb604dcea9ac11203fc7
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     10
38 #define ACCURACY_BUFFER_SIZE    64
39
40 struct lat_info {
41         uint32_t rx_packet_index;
42         uint64_t tx_packet_index;
43         uint32_t tx_err;
44         uint32_t rx_err;
45         uint64_t rx_time;
46         uint64_t tx_time;
47         uint16_t port_queue_id;
48 #ifdef LAT_DEBUG
49         uint16_t id_in_bulk;
50         uint16_t bulk_size;
51         uint64_t begin;
52         uint64_t after;
53         uint64_t before;
54 #endif
55 };
56
57 struct delayed_latency_entry {
58         uint32_t rx_packet_id;
59         uint32_t tx_packet_id;
60         uint32_t packet_id;
61         uint8_t generator_id;
62         uint64_t pkt_rx_time;
63         uint64_t pkt_tx_time;
64         uint64_t rx_time_err;
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
68 {
69         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70         if (delayed_latency_entry->packet_id == packet_id)
71                 return delayed_latency_entry;
72         else
73                 return NULL;
74 }
75
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
77 {
78         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79         delayed_latency_entry->packet_id = packet_id;
80         return delayed_latency_entry;
81 }
82
83 struct rx_pkt_meta_data {
84         uint8_t  *hdr;
85         uint32_t pkt_tx_time;
86         uint32_t bytes_after_in_bulk;
87 };
88
89 struct task_lat {
90         struct task_base base;
91         uint64_t limit;
92         uint64_t rx_packet_index;
93         uint64_t last_pkts_tsc;
94         struct delayed_latency_entry **delayed_latency_entries;
95         struct lat_info *latency_buffer;
96         uint32_t latency_buffer_idx;
97         uint32_t latency_buffer_size;
98         uint64_t begin;
99         uint16_t lat_pos;
100         uint16_t unique_id_pos;
101         uint16_t accur_pos;
102         uint16_t sig_pos;
103         uint32_t sig;
104         volatile uint16_t use_lt; /* which lt to use, */
105         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
106         struct lat_test lt[2];
107         struct lat_test *lat_test;
108         uint32_t generator_count;
109         struct early_loss_detect *eld;
110         struct rx_pkt_meta_data *rx_pkt_meta;
111         uint64_t link_speed;
112         // Following fields are only used when starting or stopping, not in general runtime
113         uint64_t *prev_tx_packet_index;
114         FILE *fp_rx;
115         FILE *fp_tx;
116         struct prox_port_cfg *port;
117 };
118 /* This function calculate the difference between rx and tx_time
119  * Both values are uint32_t (see handle_lat_bulk)
120  * rx time should be higher than tx_time...except every UINT32_MAX
121  * cycles, when rx_time overflows. 
122  * As the return value is also uint32_t, returning (rx_time - tx_time) 
123  * is also fine when it overflows.
124  */
125 static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
126 {
127         return rx_time - tx_time;
128 }
129
130 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
131 {
132         if (task->use_lt == task->using_lt)
133                 return &task->lt[!task->using_lt];
134         return NULL;
135 }
136
137 void task_lat_use_other_latency_meassurement(struct task_lat *task)
138 {
139         task->use_lt = !task->using_lt;
140 }
141
142 static void task_lat_update_lat_test(struct task_lat *task)
143 {
144         if (task->use_lt != task->using_lt) {
145                 task->using_lt = task->use_lt;
146                 task->lat_test = &task->lt[task->using_lt];
147                 task->lat_test->accuracy_limit_tsc = task->limit;
148         }
149 }
150
151 static int compare_tx_time(const void *val1, const void *val2)
152 {
153         const struct lat_info *ptr1 = val1;
154         const struct lat_info *ptr2 = val2;
155
156         return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
157 }
158
159 static int compare_tx_packet_index(const void *val1, const void *val2)
160 {
161         const struct lat_info *ptr1 = val1;
162         const struct lat_info *ptr2 = val2;
163
164         return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
165 }
166
167 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
168 {
169         uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
170         uint32_t small = UINT32_MAX >> 1;
171
172         lat++;
173
174         /* Buffer is sorted so far by RX time.
175          * We might have packets being reordered by SUT.
176          *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
177          * Note that:
178          *      - overflow only happens if receiving and storing 4 billions packets...
179          *      - a absolute difference of less than 2 billion packets is not considered as an overflow
180          */
181         for (uint32_t i = 1; i < count; i++) {
182                 tx_packet_index = lat->tx_packet_index;
183                 if (tx_packet_index > old_tx_packet_index) {
184                         if (tx_packet_index - old_tx_packet_index < small) {
185                                 // The diff is small => increasing index count
186                         } else {
187                                 // The diff is big => it is more likely that the previous packet was overflow
188                                 n_overflow--;
189                         }
190                 } else {
191                         if (old_tx_packet_index - tx_packet_index < small) {
192                                 // The diff is small => packet reorder
193                         } else {
194                                 // The diff is big => it is more likely that this is an overflow
195                                 n_overflow++;
196                         }
197                 }
198                 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
199                 old_tx_packet_index = tx_packet_index;
200                 lat++;
201         }
202 }
203
204 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
205 {
206         uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
207         uint32_t small = UINT32_MAX >> 1;
208         lat++;
209
210         /*
211          * Same algorithm as above, but with time.
212          * Note that:
213          *      - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
214          *      - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
215          *              => algorithm does not work if receiving less than 1 packet every 2 seconds
216          */
217         for (uint32_t i = 1; i < count; i++) {
218                 tx_time = lat->tx_time;
219                 if (tx_time > old_tx_time) {
220                         if (tx_time - old_tx_time > small) {
221                                 n_overflow--;
222                         }
223                 } else {
224                         if (old_tx_time - tx_time > small) {
225                                 n_overflow++;
226                         }
227                 }
228                 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
229                 old_tx_time = tx_time;
230                 lat++;
231         }
232 }
233
234 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
235 {
236         struct lat_test *lat_test = task->lat_test;
237
238         for (uint32_t j = 0; j < task->generator_count; j++) {
239                 struct early_loss_detect *eld = &task->eld[j];
240
241                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
242         }
243 }
244
245 static void task_lat_reset_eld(struct task_lat *task)
246 {
247         for (uint32_t j = 0; j < task->generator_count; j++) {
248                 early_loss_detect_reset(&task->eld[j]);
249         }
250 }
251
252 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
253 {
254         uint64_t min_tsc = UINT64_MAX;
255
256         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
257                 if (min_tsc > task->latency_buffer[i].tx_time)
258                         min_tsc = task->latency_buffer[i].tx_time;
259         }
260
261         return min_tsc << LATENCY_ACCURACY;
262 }
263
264 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
265 {
266         uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
267
268         return lat << LATENCY_ACCURACY;
269 }
270
271 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
272 {
273         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
274 }
275
276 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
277 {
278         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
279 }
280
281 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
282 {
283         return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
284 }
285
286 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
287 {
288         return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
289 }
290
291 static void lat_write_latency_to_file(struct task_lat *task)
292 {
293         uint64_t min_tsc;
294         uint64_t n_loss;
295
296         min_tsc = lat_latency_buffer_get_min_tsc(task);
297
298         // Dumping all packet statistics
299         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
300         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
301         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
302                 struct lat_info *lat_info = &task->latency_buffer[i];
303                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
304                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
305                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
306
307                 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
308                         lat_info->rx_packet_index,
309                         lat_info->port_queue_id,
310                         lat_info->tx_packet_index,
311                         tsc_to_nsec(lat_tsc),
312                         tsc_to_nsec(rx_tsc - min_tsc),
313                         tsc_to_nsec(tx_tsc - min_tsc));
314         }
315
316         // To detect dropped packets, we need to sort them based on TX
317         if (task->unique_id_pos) {
318                 plogx_info("Adapting tx_packet_index\n");
319                 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
320                 plogx_info("Sorting packets based on tx_packet_index\n");
321                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
322                 plogx_info("Sorted packets based on packet_index\n");
323         } else {
324                 plogx_info("Adapting tx_time\n");
325                 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
326                 plogx_info("Sorting packets based on tx_time\n");
327                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
328                 plogx_info("Sorted packets based on packet_time\n");
329         }
330
331         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
332         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
333         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
334
335         for (uint32_t i = 0; i < task->generator_count;i++)
336                 task->prev_tx_packet_index[i] = -1;
337
338         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
339                 struct lat_info *lat_info = &task->latency_buffer[i];
340                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
341                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
342                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
343                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
344                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
345
346                 /* Packet n + ACCURACY_BUFFER_SIZE delivers the TX error for packet n,
347                    hence the last ACCURACY_BUFFER_SIZE packets do no have TX error. */
348                 if (i + ACCURACY_BUFFER_SIZE >= task->latency_buffer_idx) {
349                         tx_err_tsc = 0;
350                 }
351
352                 if (lat_info->port_queue_id >= task->generator_count) {
353                         plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
354                                 lat_info->port_queue_id, lat_info->tx_packet_index);
355                         continue;
356                 }
357                 // Log dropped packet
358                 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
359                 if (n_loss)
360                         fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
361                                 lat_info->port_queue_id,
362                                 lat_info->tx_packet_index - n_loss, n_loss);
363                 // Log next packet
364                 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
365                         lat_info->port_queue_id,
366                         lat_info->tx_packet_index,
367                         lat_info->rx_packet_index,
368                         tsc_to_nsec(lat_tsc),
369                         tsc_to_nsec(tx_tsc - min_tsc),
370                         tsc_to_nsec(rx_tsc - min_tsc),
371                         tsc_to_nsec(tx_err_tsc),
372                         tsc_to_nsec(rx_err_tsc));
373 #ifdef LAT_DEBUG
374                 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
375                         lat_info->id_in_bulk,
376                         lat_info->bulk_size,
377                         tsc_to_nsec(lat_info->begin - min_tsc),
378                         tsc_to_nsec(lat_info->before - min_tsc),
379                         tsc_to_nsec(lat_info->after - min_tsc));
380 #endif
381                 fprintf(task->fp_tx, "\n");
382                 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
383         }
384         fflush(task->fp_rx);
385         fflush(task->fp_tx);
386         task->latency_buffer_idx = 0;
387 }
388
389 static void lat_stop(struct task_base *tbase)
390 {
391         struct task_lat *task = (struct task_lat *)tbase;
392
393         if (task->unique_id_pos) {
394                 task_lat_count_remaining_lost_packets(task);
395                 task_lat_reset_eld(task);
396         }
397         if (task->latency_buffer)
398                 lat_write_latency_to_file(task);
399 }
400
401 #ifdef LAT_DEBUG
402 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
403 {
404         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
405
406         lat_info->bulk_size = bulk_size;
407         lat_info->id_in_bulk = id_in_bulk;
408         lat_info->begin = task->begin;
409         lat_info->before = task->base.aux->tsc_rx.before;
410         lat_info->after = task->base.aux->tsc_rx.after;
411 }
412 #endif
413
414 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
415 {
416         struct lat_info *lat_info;
417
418         /* If unique_id_pos is specified then latency is stored per
419            packet being sent. Lost packets are detected runtime, and
420            latency stored for those packets will be 0 */
421         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
422         lat_info->rx_packet_index = rx_packet_index;
423         lat_info->tx_packet_index = packet_id;
424         lat_info->port_queue_id = generator_id;
425         lat_info->rx_time = rx_time;
426         lat_info->tx_time = tx_time;
427         lat_info->rx_err = rx_err;
428         lat_info->tx_err = tx_err;
429 }
430
431 static uint32_t task_lat_early_loss_detect(struct task_lat *task, struct unique_id *unique_id)
432 {
433         struct early_loss_detect *eld;
434         uint8_t generator_id;
435         uint32_t packet_index;
436
437         unique_id_get(unique_id, &generator_id, &packet_index);
438
439         if (generator_id >= task->generator_count)
440                 return 0;
441
442         eld = &task->eld[generator_id];
443
444         return early_loss_detect_add(eld, packet_index);
445 }
446
447 static uint64_t tsc_extrapolate_backward(uint64_t link_speed, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
448 {
449         uint64_t tsc = tsc_from - (rte_get_tsc_hz()*bytes)/link_speed;
450         if (likely(tsc > tsc_minimum))
451                 return tsc;
452         else
453                 return tsc_minimum;
454 }
455
456 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
457 {
458         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
459         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
460
461         bucket_id = bucket_id < bucket_count? bucket_id : bucket_count;
462         lat_test->buckets[bucket_id]++;
463 }
464
465 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
466 {
467         lat_test->lost_packets += lost_packets;
468 }
469
470 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
471 {
472         if (error > lat_test->accuracy_limit_tsc)
473                 return;
474         lat_test->tot_pkts++;
475
476         lat_test->tot_lat += lat_tsc;
477         lat_test->tot_lat_error += error;
478
479         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
480         lat_test->var_lat += lat_tsc * lat_tsc;
481         lat_test->var_lat_error += 2 * lat_tsc * error;
482         lat_test->var_lat_error += error * error;
483
484         if (lat_tsc > lat_test->max_lat) {
485                 lat_test->max_lat = lat_tsc;
486                 lat_test->max_lat_error = error;
487         }
488         if (lat_tsc < lat_test->min_lat) {
489                 lat_test->min_lat = lat_tsc;
490                 lat_test->min_lat_error = error;
491         }
492
493 #ifdef LATENCY_HISTOGRAM
494         lat_test_histogram_add(lat_test, lat_tsc);
495 #endif
496 }
497
498 static int task_lat_can_store_latency(struct task_lat *task)
499 {
500         return task->latency_buffer_idx < task->latency_buffer_size;
501 }
502
503 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
504 {
505         if (tx_time == 0)
506                 return;
507         uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
508
509         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
510
511         if (task_lat_can_store_latency(task)) {
512                 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
513         }
514 }
515
516 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
517 {
518         struct task_lat *task = (struct task_lat *)tbase;
519         uint64_t rx_time_err;
520
521         uint32_t pkt_rx_time, pkt_tx_time;
522
523         if (n_pkts == 0) {
524                 task->begin = tbase->aux->tsc_rx.before;
525                 return 0;
526         }
527
528         task_lat_update_lat_test(task);
529
530         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
531         uint32_t tx_time_err = 0;
532
533         /* Go once through all received packets and read them.  If
534            packet has just been modified by another core, the cost of
535            latency will be partialy amortized though the bulk size */
536         for (uint16_t j = 0; j < n_pkts; ++j) {
537                 struct rte_mbuf *mbuf = mbufs[j];
538                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
539         }
540
541         if (task->sig) {
542                 for (uint16_t j = 0; j < n_pkts; ++j) {
543                         if (*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig)
544                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
545                         else
546                                 task->rx_pkt_meta[j].pkt_tx_time = 0;
547                 }
548         } else {
549                 for (uint16_t j = 0; j < n_pkts; ++j) {
550                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
551                 }
552         }
553
554         uint32_t bytes_total_in_bulk = 0;
555         // Find RX time of first packet, for RX accuracy
556         for (uint16_t j = 0; j < n_pkts; ++j) {
557                 uint16_t flipped = n_pkts - 1 - j;
558
559                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
560                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
561         }
562
563         pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
564         if ((uint32_t)((task->begin >> LATENCY_ACCURACY)) > pkt_rx_time) {
565                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
566                 rx_time_err = pkt_rx_time - (uint32_t)(task->last_pkts_tsc >> LATENCY_ACCURACY);
567         } else {
568                 rx_time_err = pkt_rx_time - (uint32_t)(task->begin >> LATENCY_ACCURACY);
569         }
570
571         struct unique_id *unique_id = NULL;
572         struct delayed_latency_entry *delayed_latency_entry;
573         uint32_t packet_id, generator_id;
574
575         for (uint16_t j = 0; j < n_pkts; ++j) {
576                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
577                 uint8_t *hdr = rx_pkt_meta->hdr;
578
579                 pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
580                 pkt_tx_time = rx_pkt_meta->pkt_tx_time;
581
582                 if (task->unique_id_pos) {
583                         unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
584
585                         uint32_t n_loss = task_lat_early_loss_detect(task, unique_id);
586                         packet_id = unique_id->packet_id;
587                         generator_id = unique_id->generator_id;
588                         lat_test_add_lost(task->lat_test, n_loss);
589                 } else {
590                         packet_id = task->rx_packet_index;
591                         generator_id = 0;
592                 }
593                 task->lat_test->tot_all_pkts++;
594
595                 /* If accuracy is enabled, latency is reported with a
596                    delay of ACCURACY_BUFFER_SIZE packets since the generator puts the
597                    accuracy for packet N into packet N + ACCURACY_BUFFER_SIZE. The delay
598                    ensures that all reported latencies have both rx
599                    and tx error. */
600                 if (task->accur_pos) {
601                         tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
602
603                         delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_BUFFER_SIZE);
604
605                         if (delayed_latency_entry) {
606                                 task_lat_store_lat(task,
607                                                    delayed_latency_entry->rx_packet_id,
608                                                    delayed_latency_entry->pkt_rx_time,
609                                                    delayed_latency_entry->pkt_tx_time,
610                                                    delayed_latency_entry->rx_time_err,
611                                                    tx_time_err,
612                                                    delayed_latency_entry->tx_packet_id,
613                                                    delayed_latency_entry->generator_id);
614                         }
615
616                         delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
617                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
618                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
619                         delayed_latency_entry->rx_time_err = rx_time_err;
620                         delayed_latency_entry->rx_packet_id = task->rx_packet_index;
621                         delayed_latency_entry->tx_packet_id = packet_id;
622                         delayed_latency_entry->generator_id = generator_id;
623                 } else {
624                         task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
625                 }
626                 task->rx_packet_index++;
627         }
628         int ret;
629         ret = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
630         task->begin = tbase->aux->tsc_rx.before;
631         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
632         return ret;
633 }
634
635 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
636 {
637         const int socket_id = rte_lcore_to_socket_id(core_id);
638         char name[256];
639         size_t latency_buffer_mem_size = 0;
640
641         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
642                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
643
644         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
645
646         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
647         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
648
649         sprintf(name, "latency.rx_%u.txt", core_id);
650         task->fp_rx = fopen(name, "w+");
651         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
652
653         sprintf(name, "latency.tx_%u.txt", core_id);
654         task->fp_tx = fopen(name, "w+");
655         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
656
657         task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
658         PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
659 }
660
661 static void task_init_generator_count(struct task_lat *task)
662 {
663         uint8_t *generator_count = prox_sh_find_system("generator_count");
664
665         if (generator_count == NULL) {
666                 task->generator_count = 1;
667                 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
668         } else
669                 task->generator_count = *generator_count;
670         plog_info("\tLatency using %u generators\n", task->generator_count);
671 }
672
673 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
674 {
675         size_t eld_mem_size;
676
677         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
678         task->eld = prox_zmalloc(eld_mem_size, socket_id);
679         PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
680 }
681
682 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
683 {
684         task->limit = nsec_to_tsc(accuracy_limit_nsec);
685 }
686
687 static void lat_start(struct task_base *tbase)
688 {
689         struct task_lat *task = (struct task_lat *)tbase;
690
691         if (task->port && task->port->link_speed) {
692                 // task->port->link->speed reports the link speed in Mbps e.g. 40k for a 40 Gbps NIC
693                 // task->link_speed reported link speed in Bytes per sec.
694                 task->link_speed = task->port->link_speed * 125000L;
695                 plog_info("\tReceiving at %lu Mbps\n", 8 * task->link_speed / 1000000);
696         }
697 }
698
699 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
700 {
701         struct task_lat *task = (struct task_lat *)tbase;
702         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
703
704         task->lat_pos = targ->lat_pos;
705         task->accur_pos = targ->accur_pos;
706         task->sig_pos = targ->sig_pos;
707         task->sig = targ->sig;
708
709         task->unique_id_pos = targ->packet_id_pos;
710         task->latency_buffer_size = targ->latency_buffer_size;
711
712         task_init_generator_count(task);
713
714         if (task->latency_buffer_size) {
715                 init_task_lat_latency_buffer(task, targ->lconf->id);
716         }
717
718         if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
719                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
720         }
721
722         if (task->accur_pos) {
723                 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
724                 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
725                 for (uint i = 0; i < task->generator_count; i++) {
726                         task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
727                         PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
728                 }
729                 if (task->unique_id_pos == 0) {
730                         /* When using accuracy feature, the accuracy from TX is written ACCURACY_BUFFER_SIZE packets later
731                         * We can only retrieve the good packet if a packet id is written to it.
732                         * Otherwise we will use the packet RECEIVED ACCURACY_BUFFER_SIZE packets ago which is OK if
733                         * packets are not re-ordered. If packets are re-ordered, then the matching between
734                         * the tx accuracy znd the latency is wrong.
735                         */
736                         plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
737                 }
738         }
739
740         task->lt[0].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
741         task->lt[1].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
742         if (task->unique_id_pos) {
743                 task_lat_init_eld(task, socket_id);
744                 task_lat_reset_eld(task);
745         }
746         task->lat_test = &task->lt[task->using_lt];
747
748         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
749         task->rx_pkt_meta = prox_zmalloc(MAX_RX_PKT_ALL * sizeof(*task->rx_pkt_meta), socket_id);
750         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
751
752         task->link_speed = UINT64_MAX;
753         if (targ->nb_rxports) {
754                 // task->port structure is only used while starting handle_lat to get the link_speed.
755                 // link_speed can not be quiried at init as the port has not been initialized yet.
756                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
757                 task->port = port;
758         }
759 }
760
761 static struct task_init task_init_lat = {
762         .mode_str = "lat",
763         .init = init_task_lat,
764         .handle = handle_lat_bulk,
765         .start = lat_start,
766         .stop = lat_stop,
767         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_RX_ALL | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
768         .size = sizeof(struct task_lat)
769 };
770
771 __attribute__((constructor)) static void reg_task_lat(void)
772 {
773         reg_task(&task_init_lat);
774 }