a0e5fb42130fd24397106a1ff0183196583d0697
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     10
38 #define ACCURACY_BUFFER_SIZE    64
39
40 struct lat_info {
41         uint32_t rx_packet_index;
42         uint64_t tx_packet_index;
43         uint32_t tx_err;
44         uint32_t rx_err;
45         uint64_t rx_time;
46         uint64_t tx_time;
47         uint16_t port_queue_id;
48 #ifdef LAT_DEBUG
49         uint16_t id_in_bulk;
50         uint16_t bulk_size;
51         uint64_t begin;
52         uint64_t after;
53         uint64_t before;
54 #endif
55 };
56
57 struct delayed_latency_entry {
58         uint32_t rx_packet_id;
59         uint32_t tx_packet_id;
60         uint32_t packet_id;
61         uint8_t generator_id;
62         uint64_t pkt_rx_time;
63         uint64_t pkt_tx_time;
64         uint64_t rx_time_err;
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
68 {
69         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70         if (delayed_latency_entry->packet_id == packet_id)
71                 return delayed_latency_entry;
72         else
73                 return NULL;
74 }
75
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
77 {
78         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79         delayed_latency_entry->packet_id = packet_id;
80         return delayed_latency_entry;
81 }
82
83 struct rx_pkt_meta_data {
84         uint8_t  *hdr;
85         uint32_t pkt_tx_time;
86         uint32_t bytes_after_in_bulk;
87 };
88
89 struct task_lat {
90         struct task_base base;
91         uint64_t limit;
92         uint64_t rx_packet_index;
93         uint64_t last_pkts_tsc;
94         struct delayed_latency_entry **delayed_latency_entries;
95         struct lat_info *latency_buffer;
96         uint32_t latency_buffer_idx;
97         uint32_t latency_buffer_size;
98         uint64_t begin;
99         uint16_t lat_pos;
100         uint16_t unique_id_pos;
101         uint16_t accur_pos;
102         uint16_t sig_pos;
103         uint32_t sig;
104         volatile uint16_t use_lt; /* which lt to use, */
105         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
106         struct lat_test lt[2];
107         struct lat_test *lat_test;
108         uint32_t generator_count;
109         uint16_t min_pkt_len;
110         struct early_loss_detect *eld;
111         struct rx_pkt_meta_data *rx_pkt_meta;
112         uint64_t link_speed;
113         // Following fields are only used when starting or stopping, not in general runtime
114         uint64_t *prev_tx_packet_index;
115         FILE *fp_rx;
116         FILE *fp_tx;
117         struct prox_port_cfg *port;
118 };
119 /* This function calculate the difference between rx and tx_time
120  * Both values are uint32_t (see handle_lat_bulk)
121  * rx time should be higher than tx_time...except every UINT32_MAX
122  * cycles, when rx_time overflows.
123  * As the return value is also uint32_t, returning (rx_time - tx_time)
124  * is also fine when it overflows.
125  */
126 static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
127 {
128         return rx_time - tx_time;
129 }
130
131 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
132 {
133         if (task->use_lt == task->using_lt)
134                 return &task->lt[!task->using_lt];
135         return NULL;
136 }
137
138 void task_lat_use_other_latency_meassurement(struct task_lat *task)
139 {
140         task->use_lt = !task->using_lt;
141 }
142
143 static void task_lat_update_lat_test(struct task_lat *task)
144 {
145         if (task->use_lt != task->using_lt) {
146                 task->using_lt = task->use_lt;
147                 task->lat_test = &task->lt[task->using_lt];
148                 task->lat_test->accuracy_limit_tsc = task->limit;
149         }
150 }
151
152 static int compare_tx_time(const void *val1, const void *val2)
153 {
154         const struct lat_info *ptr1 = val1;
155         const struct lat_info *ptr2 = val2;
156
157         return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
158 }
159
160 static int compare_tx_packet_index(const void *val1, const void *val2)
161 {
162         const struct lat_info *ptr1 = val1;
163         const struct lat_info *ptr2 = val2;
164
165         return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
166 }
167
168 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
169 {
170         uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
171         uint32_t small = UINT32_MAX >> 1;
172
173         lat++;
174
175         /* Buffer is sorted so far by RX time.
176          * We might have packets being reordered by SUT.
177          *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
178          * Note that:
179          *      - overflow only happens if receiving and storing 4 billions packets...
180          *      - a absolute difference of less than 2 billion packets is not considered as an overflow
181          */
182         for (uint32_t i = 1; i < count; i++) {
183                 tx_packet_index = lat->tx_packet_index;
184                 if (tx_packet_index > old_tx_packet_index) {
185                         if (tx_packet_index - old_tx_packet_index < small) {
186                                 // The diff is small => increasing index count
187                         } else {
188                                 // The diff is big => it is more likely that the previous packet was overflow
189                                 n_overflow--;
190                         }
191                 } else {
192                         if (old_tx_packet_index - tx_packet_index < small) {
193                                 // The diff is small => packet reorder
194                         } else {
195                                 // The diff is big => it is more likely that this is an overflow
196                                 n_overflow++;
197                         }
198                 }
199                 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
200                 old_tx_packet_index = tx_packet_index;
201                 lat++;
202         }
203 }
204
205 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
206 {
207         uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
208         uint32_t small = UINT32_MAX >> 1;
209         lat++;
210
211         /*
212          * Same algorithm as above, but with time.
213          * Note that:
214          *      - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
215          *      - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
216          *              => algorithm does not work if receiving less than 1 packet every 2 seconds
217          */
218         for (uint32_t i = 1; i < count; i++) {
219                 tx_time = lat->tx_time;
220                 if (tx_time > old_tx_time) {
221                         if (tx_time - old_tx_time > small) {
222                                 n_overflow--;
223                         }
224                 } else {
225                         if (old_tx_time - tx_time > small) {
226                                 n_overflow++;
227                         }
228                 }
229                 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
230                 old_tx_time = tx_time;
231                 lat++;
232         }
233 }
234
235 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
236 {
237         struct lat_test *lat_test = task->lat_test;
238
239         for (uint32_t j = 0; j < task->generator_count; j++) {
240                 struct early_loss_detect *eld = &task->eld[j];
241
242                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
243         }
244 }
245
246 static void task_lat_reset_eld(struct task_lat *task)
247 {
248         for (uint32_t j = 0; j < task->generator_count; j++) {
249                 early_loss_detect_reset(&task->eld[j]);
250         }
251 }
252
253 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
254 {
255         uint64_t min_tsc = UINT64_MAX;
256
257         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
258                 if (min_tsc > task->latency_buffer[i].tx_time)
259                         min_tsc = task->latency_buffer[i].tx_time;
260         }
261
262         return min_tsc << LATENCY_ACCURACY;
263 }
264
265 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
266 {
267         uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
268
269         return lat << LATENCY_ACCURACY;
270 }
271
272 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
273 {
274         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
275 }
276
277 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
278 {
279         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
280 }
281
282 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
283 {
284         return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
285 }
286
287 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
288 {
289         return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
290 }
291
292 static void lat_write_latency_to_file(struct task_lat *task)
293 {
294         uint64_t min_tsc;
295         uint64_t n_loss;
296
297         min_tsc = lat_latency_buffer_get_min_tsc(task);
298
299         // Dumping all packet statistics
300         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
301         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
302         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
303                 struct lat_info *lat_info = &task->latency_buffer[i];
304                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
305                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
306                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
307
308                 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
309                         lat_info->rx_packet_index,
310                         lat_info->port_queue_id,
311                         lat_info->tx_packet_index,
312                         tsc_to_nsec(lat_tsc),
313                         tsc_to_nsec(rx_tsc - min_tsc),
314                         tsc_to_nsec(tx_tsc - min_tsc));
315         }
316
317         // To detect dropped packets, we need to sort them based on TX
318         if (task->unique_id_pos) {
319                 plogx_info("Adapting tx_packet_index\n");
320                 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
321                 plogx_info("Sorting packets based on tx_packet_index\n");
322                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
323                 plogx_info("Sorted packets based on packet_index\n");
324         } else {
325                 plogx_info("Adapting tx_time\n");
326                 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
327                 plogx_info("Sorting packets based on tx_time\n");
328                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
329                 plogx_info("Sorted packets based on packet_time\n");
330         }
331
332         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
333         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
334         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
335
336         for (uint32_t i = 0; i < task->generator_count;i++)
337                 task->prev_tx_packet_index[i] = -1;
338
339         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
340                 struct lat_info *lat_info = &task->latency_buffer[i];
341                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
342                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
343                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
344                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
345                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
346
347                 /* Packet n + ACCURACY_BUFFER_SIZE delivers the TX error for packet n,
348                    hence the last ACCURACY_BUFFER_SIZE packets do no have TX error. */
349                 if (i + ACCURACY_BUFFER_SIZE >= task->latency_buffer_idx) {
350                         tx_err_tsc = 0;
351                 }
352
353                 if (lat_info->port_queue_id >= task->generator_count) {
354                         plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
355                                 lat_info->port_queue_id, lat_info->tx_packet_index);
356                         continue;
357                 }
358                 // Log dropped packet
359                 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
360                 if (n_loss)
361                         fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
362                                 lat_info->port_queue_id,
363                                 lat_info->tx_packet_index - n_loss, n_loss);
364                 // Log next packet
365                 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
366                         lat_info->port_queue_id,
367                         lat_info->tx_packet_index,
368                         lat_info->rx_packet_index,
369                         tsc_to_nsec(lat_tsc),
370                         tsc_to_nsec(tx_tsc - min_tsc),
371                         tsc_to_nsec(rx_tsc - min_tsc),
372                         tsc_to_nsec(tx_err_tsc),
373                         tsc_to_nsec(rx_err_tsc));
374 #ifdef LAT_DEBUG
375                 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
376                         lat_info->id_in_bulk,
377                         lat_info->bulk_size,
378                         tsc_to_nsec(lat_info->begin - min_tsc),
379                         tsc_to_nsec(lat_info->before - min_tsc),
380                         tsc_to_nsec(lat_info->after - min_tsc));
381 #endif
382                 fprintf(task->fp_tx, "\n");
383                 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
384         }
385         fflush(task->fp_rx);
386         fflush(task->fp_tx);
387         task->latency_buffer_idx = 0;
388 }
389
390 static void lat_stop(struct task_base *tbase)
391 {
392         struct task_lat *task = (struct task_lat *)tbase;
393
394         if (task->unique_id_pos) {
395                 task_lat_count_remaining_lost_packets(task);
396                 task_lat_reset_eld(task);
397         }
398         if (task->latency_buffer)
399                 lat_write_latency_to_file(task);
400 }
401
402 #ifdef LAT_DEBUG
403 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
404 {
405         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
406
407         lat_info->bulk_size = bulk_size;
408         lat_info->id_in_bulk = id_in_bulk;
409         lat_info->begin = task->begin;
410         lat_info->before = task->base.aux->tsc_rx.before;
411         lat_info->after = task->base.aux->tsc_rx.after;
412 }
413 #endif
414
415 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
416 {
417         struct lat_info *lat_info;
418
419         /* If unique_id_pos is specified then latency is stored per
420            packet being sent. Lost packets are detected runtime, and
421            latency stored for those packets will be 0 */
422         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
423         lat_info->rx_packet_index = rx_packet_index;
424         lat_info->tx_packet_index = packet_id;
425         lat_info->port_queue_id = generator_id;
426         lat_info->rx_time = rx_time;
427         lat_info->tx_time = tx_time;
428         lat_info->rx_err = rx_err;
429         lat_info->tx_err = tx_err;
430 }
431
432 static uint32_t task_lat_early_loss_detect(struct task_lat *task, uint32_t packet_id, uint8_t generator_id)
433 {
434         struct early_loss_detect *eld = &task->eld[generator_id];
435         return early_loss_detect_add(eld, packet_id);
436 }
437
438 static uint64_t tsc_extrapolate_backward(uint64_t link_speed, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
439 {
440         uint64_t tsc = tsc_from - (rte_get_tsc_hz()*bytes)/link_speed;
441         if (likely(tsc > tsc_minimum))
442                 return tsc;
443         else
444                 return tsc_minimum;
445 }
446
447 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
448 {
449         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
450         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
451
452         bucket_id = bucket_id < bucket_count? bucket_id : bucket_count;
453         lat_test->buckets[bucket_id]++;
454 }
455
456 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
457 {
458         lat_test->lost_packets += lost_packets;
459 }
460
461 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
462 {
463         if (error > lat_test->accuracy_limit_tsc)
464                 return;
465         lat_test->tot_pkts++;
466
467         lat_test->tot_lat += lat_tsc;
468         lat_test->tot_lat_error += error;
469
470         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
471         lat_test->var_lat += lat_tsc * lat_tsc;
472         lat_test->var_lat_error += 2 * lat_tsc * error;
473         lat_test->var_lat_error += error * error;
474
475         if (lat_tsc > lat_test->max_lat) {
476                 lat_test->max_lat = lat_tsc;
477                 lat_test->max_lat_error = error;
478         }
479         if (lat_tsc < lat_test->min_lat) {
480                 lat_test->min_lat = lat_tsc;
481                 lat_test->min_lat_error = error;
482         }
483
484 #ifdef LATENCY_HISTOGRAM
485         lat_test_histogram_add(lat_test, lat_tsc);
486 #endif
487 }
488
489 static int task_lat_can_store_latency(struct task_lat *task)
490 {
491         return task->latency_buffer_idx < task->latency_buffer_size;
492 }
493
494 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
495 {
496         uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
497
498         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
499
500         if (task_lat_can_store_latency(task)) {
501                 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
502         }
503 }
504
505 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
506 {
507         struct task_lat *task = (struct task_lat *)tbase;
508
509         // If link is down, link_speed is 0
510         if (unlikely(task->link_speed == 0)) {
511                 if (task->port && task->port->link_speed != 0) {
512                         task->link_speed = task->port->link_speed * 125000L;
513                         plog_info("\tPort %u: link speed is %ld Mbps\n",
514                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
515                 } else if (n_pkts) {
516                         return task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
517                 } else {
518                         return 0;
519                 }
520         }
521
522         if (n_pkts == 0) {
523                 task->begin = tbase->aux->tsc_rx.before;
524                 return 0;
525         }
526
527         task_lat_update_lat_test(task);
528
529         // Remember those packets with bad length or bad signature
530         uint32_t non_dp_count = 0;
531         uint64_t pkt_bad_len_sig[(MAX_RX_PKT_ALL + 63) / 64];
532 #define BIT64_SET(a64, bit)     a64[bit / 64] |=  (((uint64_t)1) << (bit & 63))
533 #define BIT64_CLR(a64, bit)     a64[bit / 64] &= ~(((uint64_t)1) << (bit & 63))
534 #define BIT64_TEST(a64, bit)    a64[bit / 64]  &  (((uint64_t)1) << (bit & 63))
535
536         /* Go once through all received packets and read them.  If
537            packet has just been modified by another core, the cost of
538            latency will be partialy amortized though the bulk size */
539         for (uint16_t j = 0; j < n_pkts; ++j) {
540                 struct rte_mbuf *mbuf = mbufs[j];
541                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
542
543                 // Remember those packets which are too short to hold the values that we expect
544                 if (unlikely(rte_pktmbuf_pkt_len(mbuf) < task->min_pkt_len)) {
545                         BIT64_SET(pkt_bad_len_sig, j);
546                         non_dp_count++;
547                 } else
548                         BIT64_CLR(pkt_bad_len_sig, j);
549         }
550
551         if (task->sig_pos) {
552                 for (uint16_t j = 0; j < n_pkts; ++j) {
553                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
554                                 continue;
555                         // Remember those packets with bad signature
556                         if (likely(*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig))
557                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
558                         else {
559                                 BIT64_SET(pkt_bad_len_sig, j);
560                                 non_dp_count++;
561                         }
562                 }
563         } else {
564                 for (uint16_t j = 0; j < n_pkts; ++j) {
565                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
566                                 continue;
567                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
568                 }
569         }
570
571         uint32_t bytes_total_in_bulk = 0;
572         // Find RX time of first packet, for RX accuracy
573         for (uint16_t j = 0; j < n_pkts; ++j) {
574                 uint16_t flipped = n_pkts - 1 - j;
575
576                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
577                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
578         }
579
580         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
581
582         uint64_t rx_time_err;
583         uint64_t pkt_rx_time64 = tsc_extrapolate_backward(task->link_speed, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
584         if (unlikely((task->begin >> LATENCY_ACCURACY) > pkt_rx_time64)) {
585                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
586                 rx_time_err = pkt_rx_time64 - (task->last_pkts_tsc >> LATENCY_ACCURACY);
587         } else {
588                 rx_time_err = pkt_rx_time64 - (task->begin >> LATENCY_ACCURACY);
589         }
590
591         TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, non_dp_count);
592         for (uint16_t j = 0; j < n_pkts; ++j) {
593                 // Used to display % of packets within accuracy limit vs. total number of packets (used_col)
594                 task->lat_test->tot_all_pkts++;
595
596                 // Skip those packets with bad length or bad signature
597                 if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
598                         continue;
599
600                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
601                 uint8_t *hdr = rx_pkt_meta->hdr;
602
603                 uint32_t pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
604                 uint32_t pkt_tx_time = rx_pkt_meta->pkt_tx_time;
605
606                 uint8_t generator_id;
607                 uint32_t packet_id;
608                 if (task->unique_id_pos) {
609                         struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
610                         unique_id_get(unique_id, &generator_id, &packet_id);
611
612                         if (unlikely(generator_id >= task->generator_count)) {
613                                 /* No need to remember unexpected packet at this stage
614                                 BIT64_SET(pkt_bad_len_sig, j);
615                                 */
616                                 // Skip unexpected packet
617                                 continue;
618                         }
619
620                         lat_test_add_lost(task->lat_test, task_lat_early_loss_detect(task, packet_id, generator_id));
621                 } else {
622                         generator_id = 0;
623                         packet_id = task->rx_packet_index;
624                 }
625
626                 /* If accuracy is enabled, latency is reported with a
627                    delay of ACCURACY_BUFFER_SIZE packets since the generator puts the
628                    accuracy for packet N into packet N + ACCURACY_BUFFER_SIZE. The delay
629                    ensures that all reported latencies have both rx
630                    and tx error. */
631                 if (task->accur_pos) {
632                         uint32_t tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
633
634                         struct delayed_latency_entry *delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_BUFFER_SIZE);
635
636                         if (delayed_latency_entry) {
637                                 task_lat_store_lat(task,
638                                                    delayed_latency_entry->rx_packet_id,
639                                                    delayed_latency_entry->pkt_rx_time,
640                                                    delayed_latency_entry->pkt_tx_time,
641                                                    delayed_latency_entry->rx_time_err,
642                                                    tx_time_err,
643                                                    delayed_latency_entry->tx_packet_id,
644                                                    delayed_latency_entry->generator_id);
645                         }
646
647                         delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
648                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
649                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
650                         delayed_latency_entry->rx_time_err = rx_time_err;
651                         delayed_latency_entry->rx_packet_id = task->rx_packet_index;
652                         delayed_latency_entry->tx_packet_id = packet_id;
653                         delayed_latency_entry->generator_id = generator_id;
654                 } else {
655                         task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
656                 }
657
658                 // Bad/unexpected packets do not need to be indexed
659                 task->rx_packet_index++;
660         }
661
662         task->begin = tbase->aux->tsc_rx.before;
663         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
664
665         return task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
666 }
667
668 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
669 {
670         const int socket_id = rte_lcore_to_socket_id(core_id);
671         char name[256];
672         size_t latency_buffer_mem_size = 0;
673
674         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
675                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
676
677         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
678
679         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
680         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
681
682         sprintf(name, "latency.rx_%u.txt", core_id);
683         task->fp_rx = fopen(name, "w+");
684         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
685
686         sprintf(name, "latency.tx_%u.txt", core_id);
687         task->fp_tx = fopen(name, "w+");
688         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
689
690         task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
691         PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
692 }
693
694 static void task_init_generator_count(struct task_lat *task)
695 {
696         uint8_t *generator_count = prox_sh_find_system("generator_count");
697
698         if (generator_count == NULL) {
699                 task->generator_count = 1;
700                 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
701         } else
702                 task->generator_count = *generator_count;
703         plog_info("\tLatency using %u generators\n", task->generator_count);
704 }
705
706 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
707 {
708         size_t eld_mem_size;
709
710         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
711         task->eld = prox_zmalloc(eld_mem_size, socket_id);
712         PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
713 }
714
715 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
716 {
717         task->limit = nsec_to_tsc(accuracy_limit_nsec);
718 }
719
720 static void lat_start(struct task_base *tbase)
721 {
722         struct task_lat *task = (struct task_lat *)tbase;
723
724         if (task->port) {
725                 // task->port->link_speed reports the link speed in Mbps e.g. 40k for a 40 Gbps NIC.
726                 // task->link_speed reports link speed in Bytes per sec.
727                 // It can be 0 if link is down, and must hence be updated in fast path.
728                 task->link_speed = task->port->link_speed * 125000L;
729                 if (task->link_speed)
730                         plog_info("\tPort %u: link speed is %ld Mbps\n",
731                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
732                 else
733                         plog_info("\tPort %u: link speed is %ld Mbps - link might be down\n",
734                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
735         }
736 }
737
738 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
739 {
740         struct task_lat *task = (struct task_lat *)tbase;
741         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
742
743         task->lat_pos = targ->lat_pos;
744         task->accur_pos = targ->accur_pos;
745         task->sig_pos = targ->sig_pos;
746         task->sig = targ->sig;
747
748         task->unique_id_pos = targ->packet_id_pos;
749         task->latency_buffer_size = targ->latency_buffer_size;
750
751         PROX_PANIC(task->lat_pos == 0, "Missing 'lat pos' parameter in config file\n");
752         uint16_t min_pkt_len = task->lat_pos + sizeof(uint32_t);
753         if (task->unique_id_pos && (
754                 min_pkt_len < task->unique_id_pos + sizeof(struct unique_id)))
755                 min_pkt_len = task->unique_id_pos + sizeof(struct unique_id);
756         if (task->accur_pos && (
757                 min_pkt_len < task->accur_pos + sizeof(uint32_t)))
758                 min_pkt_len = task->accur_pos + sizeof(uint32_t);
759         if (task->sig_pos && (
760                 min_pkt_len < task->sig_pos + sizeof(uint32_t)))
761                 min_pkt_len = task->sig_pos + sizeof(uint32_t);
762         task->min_pkt_len = min_pkt_len;
763
764         task_init_generator_count(task);
765
766         if (task->latency_buffer_size) {
767                 init_task_lat_latency_buffer(task, targ->lconf->id);
768         }
769
770         if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
771                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
772         }
773
774         if (task->accur_pos) {
775                 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
776                 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
777                 for (uint i = 0; i < task->generator_count; i++) {
778                         task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
779                         PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
780                 }
781                 if (task->unique_id_pos == 0) {
782                         /* When using accuracy feature, the accuracy from TX is written ACCURACY_BUFFER_SIZE packets later
783                         * We can only retrieve the good packet if a packet id is written to it.
784                         * Otherwise we will use the packet RECEIVED ACCURACY_BUFFER_SIZE packets ago which is OK if
785                         * packets are not re-ordered. If packets are re-ordered, then the matching between
786                         * the tx accuracy znd the latency is wrong.
787                         */
788                         plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
789                 }
790         }
791
792         task->lt[0].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
793         task->lt[1].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
794         if (task->unique_id_pos) {
795                 task_lat_init_eld(task, socket_id);
796                 task_lat_reset_eld(task);
797         }
798         task->lat_test = &task->lt[task->using_lt];
799
800         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
801         task->rx_pkt_meta = prox_zmalloc(MAX_RX_PKT_ALL * sizeof(*task->rx_pkt_meta), socket_id);
802         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
803
804         task->link_speed = UINT64_MAX;
805         if (targ->nb_rxports) {
806                 // task->port structure is only used while starting handle_lat to get the link_speed.
807                 // link_speed can not be quiried at init as the port has not been initialized yet.
808                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
809                 task->port = port;
810         }
811 }
812
813 static struct task_init task_init_lat = {
814         .mode_str = "lat",
815         .init = init_task_lat,
816         .handle = handle_lat_bulk,
817         .start = lat_start,
818         .stop = lat_stop,
819         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_RX_ALL | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
820         .size = sizeof(struct task_lat)
821 };
822
823 __attribute__((constructor)) static void reg_task_lat(void)
824 {
825         reg_task(&task_init_lat);
826 }