3962b21d6cb33956e3bd1bffc6bbad44936582e8
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     10
38 #define ACCURACY_BUFFER_SIZE    64
39
40 struct lat_info {
41         uint32_t rx_packet_index;
42         uint64_t tx_packet_index;
43         uint32_t tx_err;
44         uint32_t rx_err;
45         uint64_t rx_time;
46         uint64_t tx_time;
47         uint16_t port_queue_id;
48 #ifdef LAT_DEBUG
49         uint16_t id_in_bulk;
50         uint16_t bulk_size;
51         uint64_t begin;
52         uint64_t after;
53         uint64_t before;
54 #endif
55 };
56
57 struct delayed_latency_entry {
58         uint32_t rx_packet_id;
59         uint32_t tx_packet_id;
60         uint32_t packet_id;
61         uint8_t generator_id;
62         uint64_t pkt_rx_time;
63         uint64_t pkt_tx_time;
64         uint64_t rx_time_err;
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
68 {
69         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70         if (delayed_latency_entry->packet_id == packet_id)
71                 return delayed_latency_entry;
72         else
73                 return NULL;
74 }
75
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
77 {
78         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79         delayed_latency_entry->packet_id = packet_id;
80         return delayed_latency_entry;
81 }
82
83 struct rx_pkt_meta_data {
84         uint8_t  *hdr;
85         uint32_t pkt_tx_time;
86         uint32_t bytes_after_in_bulk;
87 };
88
89 struct task_lat {
90         struct task_base base;
91         uint64_t limit;
92         uint64_t rx_packet_index;
93         uint64_t last_pkts_tsc;
94         struct delayed_latency_entry **delayed_latency_entries;
95         struct lat_info *latency_buffer;
96         uint32_t latency_buffer_idx;
97         uint32_t latency_buffer_size;
98         uint64_t begin;
99         uint16_t lat_pos;
100         uint16_t unique_id_pos;
101         uint16_t accur_pos;
102         uint16_t sig_pos;
103         uint32_t sig;
104         volatile uint16_t use_lt; /* which lt to use, */
105         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
106         struct lat_test lt[2];
107         struct lat_test *lat_test;
108         uint32_t generator_count;
109         struct early_loss_detect *eld;
110         struct rx_pkt_meta_data *rx_pkt_meta;
111         uint64_t link_speed;
112         // Following fields are only used when starting or stopping, not in general runtime
113         uint64_t *prev_tx_packet_index;
114         FILE *fp_rx;
115         FILE *fp_tx;
116         struct prox_port_cfg *port;
117 };
118
119 static uint32_t diff_or_zero(uint32_t a, uint32_t b)
120 {
121        return a < b? 0 : a - b;
122 }
123
124 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
125 {
126         if (task->use_lt == task->using_lt)
127                 return &task->lt[!task->using_lt];
128         return NULL;
129 }
130
131 void task_lat_use_other_latency_meassurement(struct task_lat *task)
132 {
133         task->use_lt = !task->using_lt;
134 }
135
136 static void task_lat_update_lat_test(struct task_lat *task)
137 {
138         if (task->use_lt != task->using_lt) {
139                 task->using_lt = task->use_lt;
140                 task->lat_test = &task->lt[task->using_lt];
141                 task->lat_test->accuracy_limit_tsc = task->limit;
142         }
143 }
144
145 static int compare_tx_time(const void *val1, const void *val2)
146 {
147         const struct lat_info *ptr1 = val1;
148         const struct lat_info *ptr2 = val2;
149
150         return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
151 }
152
153 static int compare_tx_packet_index(const void *val1, const void *val2)
154 {
155         const struct lat_info *ptr1 = val1;
156         const struct lat_info *ptr2 = val2;
157
158         return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
159 }
160
161 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
162 {
163         uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
164         uint32_t small = UINT32_MAX >> 1;
165
166         lat++;
167
168         /* Buffer is sorted so far by RX time.
169          * We might have packets being reordered by SUT.
170          *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
171          * Note that:
172          *      - overflow only happens if receiving and storing 4 billions packets...
173          *      - a absolute difference of less than 2 billion packets is not considered as an overflow
174          */
175         for (uint32_t i = 1; i < count; i++) {
176                 tx_packet_index = lat->tx_packet_index;
177                 if (tx_packet_index > old_tx_packet_index) {
178                         if (tx_packet_index - old_tx_packet_index < small) {
179                                 // The diff is small => increasing index count
180                         } else {
181                                 // The diff is big => it is more likely that the previous packet was overflow
182                                 n_overflow--;
183                         }
184                 } else {
185                         if (old_tx_packet_index - tx_packet_index < small) {
186                                 // The diff is small => packet reorder
187                         } else {
188                                 // The diff is big => it is more likely that this is an overflow
189                                 n_overflow++;
190                         }
191                 }
192                 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
193                 old_tx_packet_index = tx_packet_index;
194                 lat++;
195         }
196 }
197
198 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
199 {
200         uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
201         uint32_t small = UINT32_MAX >> 1;
202         lat++;
203
204         /*
205          * Same algorithm as above, but with time.
206          * Note that:
207          *      - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
208          *      - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
209          *              => algorithm does not work if receiving less than 1 packet every 2 seconds
210          */
211         for (uint32_t i = 1; i < count; i++) {
212                 tx_time = lat->tx_time;
213                 if (tx_time > old_tx_time) {
214                         if (tx_time - old_tx_time > small) {
215                                 n_overflow--;
216                         }
217                 } else {
218                         if (old_tx_time - tx_time > small) {
219                                 n_overflow++;
220                         }
221                 }
222                 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
223                 old_tx_time = tx_time;
224                 lat++;
225         }
226 }
227
228 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
229 {
230         struct lat_test *lat_test = task->lat_test;
231
232         for (uint32_t j = 0; j < task->generator_count; j++) {
233                 struct early_loss_detect *eld = &task->eld[j];
234
235                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
236         }
237 }
238
239 static void task_lat_reset_eld(struct task_lat *task)
240 {
241         for (uint32_t j = 0; j < task->generator_count; j++) {
242                 early_loss_detect_reset(&task->eld[j]);
243         }
244 }
245
246 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
247 {
248         uint64_t min_tsc = UINT64_MAX;
249
250         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
251                 if (min_tsc > task->latency_buffer[i].tx_time)
252                         min_tsc = task->latency_buffer[i].tx_time;
253         }
254
255         return min_tsc << LATENCY_ACCURACY;
256 }
257
258 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
259 {
260         uint64_t lat = diff_or_zero(lat_info->rx_time, lat_info->tx_time);
261
262         return lat << LATENCY_ACCURACY;
263 }
264
265 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
266 {
267         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
268 }
269
270 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
271 {
272         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
273 }
274
275 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
276 {
277         return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
278 }
279
280 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
281 {
282         return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
283 }
284
285 static void lat_write_latency_to_file(struct task_lat *task)
286 {
287         uint64_t min_tsc;
288         uint64_t n_loss;
289
290         min_tsc = lat_latency_buffer_get_min_tsc(task);
291
292         // Dumping all packet statistics
293         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
294         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
295         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
296                 struct lat_info *lat_info = &task->latency_buffer[i];
297                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
298                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
299                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
300
301                 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
302                         lat_info->rx_packet_index,
303                         lat_info->port_queue_id,
304                         lat_info->tx_packet_index,
305                         tsc_to_nsec(lat_tsc),
306                         tsc_to_nsec(rx_tsc - min_tsc),
307                         tsc_to_nsec(tx_tsc - min_tsc));
308         }
309
310         // To detect dropped packets, we need to sort them based on TX
311         if (task->unique_id_pos) {
312                 plogx_info("Adapting tx_packet_index\n");
313                 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
314                 plogx_info("Sorting packets based on tx_packet_index\n");
315                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
316                 plogx_info("Sorted packets based on packet_index\n");
317         } else {
318                 plogx_info("Adapting tx_time\n");
319                 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
320                 plogx_info("Sorting packets based on tx_time\n");
321                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
322                 plogx_info("Sorted packets based on packet_time\n");
323         }
324
325         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
326         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
327         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
328
329         for (uint32_t i = 0; i < task->generator_count;i++)
330                 task->prev_tx_packet_index[i] = -1;
331
332         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
333                 struct lat_info *lat_info = &task->latency_buffer[i];
334                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
335                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
336                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
337                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
338                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
339
340                 /* Packet n + ACCURACY_BUFFER_SIZE delivers the TX error for packet n,
341                    hence the last ACCURACY_BUFFER_SIZE packets do no have TX error. */
342                 if (i + ACCURACY_BUFFER_SIZE >= task->latency_buffer_idx) {
343                         tx_err_tsc = 0;
344                 }
345
346                 if (lat_info->port_queue_id >= task->generator_count) {
347                         plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
348                                 lat_info->port_queue_id, lat_info->tx_packet_index);
349                         continue;
350                 }
351                 // Log dropped packet
352                 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
353                 if (n_loss)
354                         fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
355                                 lat_info->port_queue_id,
356                                 lat_info->tx_packet_index - n_loss, n_loss);
357                 // Log next packet
358                 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
359                         lat_info->port_queue_id,
360                         lat_info->tx_packet_index,
361                         lat_info->rx_packet_index,
362                         tsc_to_nsec(lat_tsc),
363                         tsc_to_nsec(tx_tsc - min_tsc),
364                         tsc_to_nsec(rx_tsc - min_tsc),
365                         tsc_to_nsec(tx_err_tsc),
366                         tsc_to_nsec(rx_err_tsc));
367 #ifdef LAT_DEBUG
368                 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
369                         lat_info->id_in_bulk,
370                         lat_info->bulk_size,
371                         tsc_to_nsec(lat_info->begin - min_tsc),
372                         tsc_to_nsec(lat_info->before - min_tsc),
373                         tsc_to_nsec(lat_info->after - min_tsc));
374 #endif
375                 fprintf(task->fp_tx, "\n");
376                 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
377         }
378         fflush(task->fp_rx);
379         fflush(task->fp_tx);
380         task->latency_buffer_idx = 0;
381 }
382
383 static void lat_stop(struct task_base *tbase)
384 {
385         struct task_lat *task = (struct task_lat *)tbase;
386
387         if (task->unique_id_pos) {
388                 task_lat_count_remaining_lost_packets(task);
389                 task_lat_reset_eld(task);
390         }
391         if (task->latency_buffer)
392                 lat_write_latency_to_file(task);
393 }
394
395 #ifdef LAT_DEBUG
396 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
397 {
398         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
399
400         lat_info->bulk_size = bulk_size;
401         lat_info->id_in_bulk = id_in_bulk;
402         lat_info->begin = task->begin;
403         lat_info->before = task->base.aux->tsc_rx.before;
404         lat_info->after = task->base.aux->tsc_rx.after;
405 }
406 #endif
407
408 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
409 {
410         struct lat_info *lat_info;
411
412         /* If unique_id_pos is specified then latency is stored per
413            packet being sent. Lost packets are detected runtime, and
414            latency stored for those packets will be 0 */
415         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
416         lat_info->rx_packet_index = rx_packet_index;
417         lat_info->tx_packet_index = packet_id;
418         lat_info->port_queue_id = generator_id;
419         lat_info->rx_time = rx_time;
420         lat_info->tx_time = tx_time;
421         lat_info->rx_err = rx_err;
422         lat_info->tx_err = tx_err;
423 }
424
425 static uint32_t task_lat_early_loss_detect(struct task_lat *task, struct unique_id *unique_id)
426 {
427         struct early_loss_detect *eld;
428         uint8_t generator_id;
429         uint32_t packet_index;
430
431         unique_id_get(unique_id, &generator_id, &packet_index);
432
433         if (generator_id >= task->generator_count)
434                 return 0;
435
436         eld = &task->eld[generator_id];
437
438         return early_loss_detect_add(eld, packet_index);
439 }
440
441 static uint64_t tsc_extrapolate_backward(uint64_t link_speed, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
442 {
443         uint64_t tsc = tsc_from - (rte_get_tsc_hz()*bytes)/link_speed;
444         if (likely(tsc > tsc_minimum))
445                 return tsc;
446         else
447                 return tsc_minimum;
448 }
449
450 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
451 {
452         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
453         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
454
455         bucket_id = bucket_id < bucket_count? bucket_id : bucket_count;
456         lat_test->buckets[bucket_id]++;
457 }
458
459 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
460 {
461         lat_test->lost_packets += lost_packets;
462 }
463
464 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
465 {
466         if (error > lat_test->accuracy_limit_tsc)
467                 return;
468         lat_test->tot_pkts++;
469
470         lat_test->tot_lat += lat_tsc;
471         lat_test->tot_lat_error += error;
472
473         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
474         lat_test->var_lat += lat_tsc * lat_tsc;
475         lat_test->var_lat_error += 2 * lat_tsc * error;
476         lat_test->var_lat_error += error * error;
477
478         if (lat_tsc > lat_test->max_lat) {
479                 lat_test->max_lat = lat_tsc;
480                 lat_test->max_lat_error = error;
481         }
482         if (lat_tsc < lat_test->min_lat) {
483                 lat_test->min_lat = lat_tsc;
484                 lat_test->min_lat_error = error;
485         }
486
487 #ifdef LATENCY_HISTOGRAM
488         lat_test_histogram_add(lat_test, lat_tsc);
489 #endif
490 }
491
492 static int task_lat_can_store_latency(struct task_lat *task)
493 {
494         return task->latency_buffer_idx < task->latency_buffer_size;
495 }
496
497 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
498 {
499         if (tx_time == 0)
500                 return;
501         uint32_t lat_tsc = diff_or_zero(rx_time, tx_time) << LATENCY_ACCURACY;
502
503         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
504
505         if (task_lat_can_store_latency(task)) {
506                 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
507         }
508 }
509
510 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
511 {
512         struct task_lat *task = (struct task_lat *)tbase;
513         uint64_t rx_time_err;
514
515         uint32_t pkt_rx_time, pkt_tx_time;
516
517         if (n_pkts == 0) {
518                 task->begin = tbase->aux->tsc_rx.before;
519                 return 0;
520         }
521
522         task_lat_update_lat_test(task);
523
524         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
525         uint32_t tx_time_err = 0;
526
527         /* Go once through all received packets and read them.  If
528            packet has just been modified by another core, the cost of
529            latency will be partialy amortized though the bulk size */
530         for (uint16_t j = 0; j < n_pkts; ++j) {
531                 struct rte_mbuf *mbuf = mbufs[j];
532                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
533         }
534
535         if (task->sig) {
536                 for (uint16_t j = 0; j < n_pkts; ++j) {
537                         if (*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig)
538                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
539                         else
540                                 task->rx_pkt_meta[j].pkt_tx_time = 0;
541                 }
542         } else {
543                 for (uint16_t j = 0; j < n_pkts; ++j) {
544                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
545                 }
546         }
547
548         uint32_t bytes_total_in_bulk = 0;
549         // Find RX time of first packet, for RX accuracy
550         for (uint16_t j = 0; j < n_pkts; ++j) {
551                 uint16_t flipped = n_pkts - 1 - j;
552
553                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
554                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
555         }
556
557         pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
558         if ((uint32_t)((task->begin >> LATENCY_ACCURACY)) > pkt_rx_time) {
559                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
560                 rx_time_err = pkt_rx_time - (uint32_t)(task->last_pkts_tsc >> LATENCY_ACCURACY);
561         } else {
562                 rx_time_err = pkt_rx_time - (uint32_t)(task->begin >> LATENCY_ACCURACY);
563         }
564
565         struct unique_id *unique_id = NULL;
566         struct delayed_latency_entry *delayed_latency_entry;
567         uint32_t packet_id, generator_id;
568
569         for (uint16_t j = 0; j < n_pkts; ++j) {
570                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
571                 uint8_t *hdr = rx_pkt_meta->hdr;
572
573                 pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
574                 pkt_tx_time = rx_pkt_meta->pkt_tx_time;
575
576                 if (task->unique_id_pos) {
577                         unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
578
579                         uint32_t n_loss = task_lat_early_loss_detect(task, unique_id);
580                         packet_id = unique_id->packet_id;
581                         generator_id = unique_id->generator_id;
582                         lat_test_add_lost(task->lat_test, n_loss);
583                 } else {
584                         packet_id = task->rx_packet_index;
585                         generator_id = 0;
586                 }
587                 task->lat_test->tot_all_pkts++;
588
589                 /* If accuracy is enabled, latency is reported with a
590                    delay of ACCURACY_BUFFER_SIZE packets since the generator puts the
591                    accuracy for packet N into packet N + ACCURACY_BUFFER_SIZE. The delay
592                    ensures that all reported latencies have both rx
593                    and tx error. */
594                 if (task->accur_pos) {
595                         tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
596
597                         delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_BUFFER_SIZE);
598
599                         if (delayed_latency_entry) {
600                                 task_lat_store_lat(task,
601                                                    delayed_latency_entry->rx_packet_id,
602                                                    delayed_latency_entry->pkt_rx_time,
603                                                    delayed_latency_entry->pkt_tx_time,
604                                                    delayed_latency_entry->rx_time_err,
605                                                    tx_time_err,
606                                                    delayed_latency_entry->tx_packet_id,
607                                                    delayed_latency_entry->generator_id);
608                         }
609
610                         delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
611                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
612                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
613                         delayed_latency_entry->rx_time_err = rx_time_err;
614                         delayed_latency_entry->rx_packet_id = task->rx_packet_index;
615                         delayed_latency_entry->tx_packet_id = packet_id;
616                         delayed_latency_entry->generator_id = generator_id;
617                 } else {
618                         task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
619                 }
620                 task->rx_packet_index++;
621         }
622         int ret;
623         ret = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
624         task->begin = tbase->aux->tsc_rx.before;
625         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
626         return ret;
627 }
628
629 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
630 {
631         const int socket_id = rte_lcore_to_socket_id(core_id);
632         char name[256];
633         size_t latency_buffer_mem_size = 0;
634
635         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
636                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
637
638         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
639
640         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
641         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
642
643         sprintf(name, "latency.rx_%u.txt", core_id);
644         task->fp_rx = fopen(name, "w+");
645         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
646
647         sprintf(name, "latency.tx_%u.txt", core_id);
648         task->fp_tx = fopen(name, "w+");
649         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
650
651         task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
652         PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
653 }
654
655 static void task_init_generator_count(struct task_lat *task)
656 {
657         uint8_t *generator_count = prox_sh_find_system("generator_count");
658
659         if (generator_count == NULL) {
660                 task->generator_count = 1;
661                 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
662         } else
663                 task->generator_count = *generator_count;
664         plog_info("\tLatency using %u generators\n", task->generator_count);
665 }
666
667 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
668 {
669         size_t eld_mem_size;
670
671         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
672         task->eld = prox_zmalloc(eld_mem_size, socket_id);
673         PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
674 }
675
676 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
677 {
678         task->limit = nsec_to_tsc(accuracy_limit_nsec);
679 }
680
681 static void lat_start(struct task_base *tbase)
682 {
683         struct task_lat *task = (struct task_lat *)tbase;
684
685         if (task->port) {
686                 // task->port->link->speed reports the link speed in Mbps e.g. 40k for a 40 Gbps NIC
687                 // task->link_speed reported link speed in Bytes per sec.
688                 task->link_speed = task->port->link_speed * 125000L;
689                 plog_info("\tReceiving at %lu Mbps\n", 8 * task->link_speed / 1000000);
690         }
691 }
692
693 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
694 {
695         struct task_lat *task = (struct task_lat *)tbase;
696         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
697
698         task->lat_pos = targ->lat_pos;
699         task->accur_pos = targ->accur_pos;
700         task->sig_pos = targ->sig_pos;
701         task->sig = targ->sig;
702
703         task->unique_id_pos = targ->packet_id_pos;
704         task->latency_buffer_size = targ->latency_buffer_size;
705
706         task_init_generator_count(task);
707
708         if (task->latency_buffer_size) {
709                 init_task_lat_latency_buffer(task, targ->lconf->id);
710         }
711
712         if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
713                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
714         }
715
716         if (task->accur_pos) {
717                 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
718                 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
719                 for (uint i = 0; i < task->generator_count; i++) {
720                         task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
721                         PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
722                 }
723                 if (task->unique_id_pos == 0) {
724                         /* When using accuracy feature, the accuracy from TX is written ACCURACY_BUFFER_SIZE packets later
725                         * We can only retrieve the good packet if a packet id is written to it.
726                         * Otherwise we will use the packet RECEIVED ACCURACY_BUFFER_SIZE packets ago which is OK if
727                         * packets are not re-ordered. If packets are re-ordered, then the matching between
728                         * the tx accuracy znd the latency is wrong.
729                         */
730                         plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
731                 }
732         }
733
734         task->lt[0].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
735         task->lt[1].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
736         if (task->unique_id_pos) {
737                 task_lat_init_eld(task, socket_id);
738                 task_lat_reset_eld(task);
739         }
740         task->lat_test = &task->lt[task->using_lt];
741
742         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
743         task->rx_pkt_meta = prox_zmalloc(MAX_RX_PKT_ALL * sizeof(*task->rx_pkt_meta), socket_id);
744         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
745
746         task->link_speed = UINT64_MAX;
747         if (targ->nb_rxports) {
748                 // task->port structure is only used while starting handle_lat to get the link_speed.
749                 // link_speed can not be quiried at init as the port has not been initialized yet.
750                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
751                 task->port = port;
752         }
753 }
754
755 static struct task_init task_init_lat = {
756         .mode_str = "lat",
757         .init = init_task_lat,
758         .handle = handle_lat_bulk,
759         .start = lat_start,
760         .stop = lat_stop,
761         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_RX_ALL | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
762         .size = sizeof(struct task_lat)
763 };
764
765 __attribute__((constructor)) static void reg_task_lat(void)
766 {
767         reg_task(&task_init_lat);
768 }