Merge "Initial support for DPDK 18.05"
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     10
38 #define ACCURACY_BUFFER_SIZE    64
39
40 struct lat_info {
41         uint32_t rx_packet_index;
42         uint64_t tx_packet_index;
43         uint32_t tx_err;
44         uint32_t rx_err;
45         uint64_t rx_time;
46         uint64_t tx_time;
47         uint16_t port_queue_id;
48 #ifdef LAT_DEBUG
49         uint16_t id_in_bulk;
50         uint16_t bulk_size;
51         uint64_t begin;
52         uint64_t after;
53         uint64_t before;
54 #endif
55 };
56
57 struct delayed_latency_entry {
58         uint32_t rx_packet_id;
59         uint32_t tx_packet_id;
60         uint32_t packet_id;
61         uint8_t generator_id;
62         uint64_t pkt_rx_time;
63         uint64_t pkt_tx_time;
64         uint64_t rx_time_err;
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
68 {
69         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70         if (delayed_latency_entry->packet_id == packet_id)
71                 return delayed_latency_entry;
72         else
73                 return NULL;
74 }
75
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
77 {
78         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79         delayed_latency_entry->packet_id = packet_id;
80         return delayed_latency_entry;
81 }
82
83 struct rx_pkt_meta_data {
84         uint8_t  *hdr;
85         uint32_t pkt_tx_time;
86         uint32_t bytes_after_in_bulk;
87 };
88
89 struct task_lat {
90         struct task_base base;
91         uint64_t limit;
92         uint64_t rx_packet_index;
93         uint64_t last_pkts_tsc;
94         struct delayed_latency_entry **delayed_latency_entries;
95         struct lat_info *latency_buffer;
96         uint32_t latency_buffer_idx;
97         uint32_t latency_buffer_size;
98         uint64_t begin;
99         uint16_t lat_pos;
100         uint16_t unique_id_pos;
101         uint16_t accur_pos;
102         uint16_t sig_pos;
103         uint32_t sig;
104         volatile uint16_t use_lt; /* which lt to use, */
105         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
106         struct lat_test lt[2];
107         struct lat_test *lat_test;
108         uint32_t generator_count;
109         uint16_t min_pkt_len;
110         struct early_loss_detect *eld;
111         struct rx_pkt_meta_data *rx_pkt_meta;
112         uint64_t link_speed;
113         // Following fields are only used when starting or stopping, not in general runtime
114         uint64_t *prev_tx_packet_index;
115         FILE *fp_rx;
116         FILE *fp_tx;
117         struct prox_port_cfg *port;
118 };
119 /* This function calculate the difference between rx and tx_time
120  * Both values are uint32_t (see handle_lat_bulk)
121  * rx time should be higher than tx_time...except every UINT32_MAX
122  * cycles, when rx_time overflows.
123  * As the return value is also uint32_t, returning (rx_time - tx_time)
124  * is also fine when it overflows.
125  */
126 static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
127 {
128         return rx_time - tx_time;
129 }
130
131 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
132 {
133         if (task->use_lt == task->using_lt)
134                 return &task->lt[!task->using_lt];
135         return NULL;
136 }
137
138 void task_lat_use_other_latency_meassurement(struct task_lat *task)
139 {
140         task->use_lt = !task->using_lt;
141 }
142
143 static void task_lat_update_lat_test(struct task_lat *task)
144 {
145         if (task->use_lt != task->using_lt) {
146                 task->using_lt = task->use_lt;
147                 task->lat_test = &task->lt[task->using_lt];
148                 task->lat_test->accuracy_limit_tsc = task->limit;
149         }
150 }
151
152 static int compare_tx_time(const void *val1, const void *val2)
153 {
154         const struct lat_info *ptr1 = val1;
155         const struct lat_info *ptr2 = val2;
156
157         return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
158 }
159
160 static int compare_tx_packet_index(const void *val1, const void *val2)
161 {
162         const struct lat_info *ptr1 = val1;
163         const struct lat_info *ptr2 = val2;
164
165         return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
166 }
167
168 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
169 {
170         uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
171         uint32_t small = UINT32_MAX >> 1;
172
173         lat++;
174
175         /* Buffer is sorted so far by RX time.
176          * We might have packets being reordered by SUT.
177          *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
178          * Note that:
179          *      - overflow only happens if receiving and storing 4 billions packets...
180          *      - a absolute difference of less than 2 billion packets is not considered as an overflow
181          */
182         for (uint32_t i = 1; i < count; i++) {
183                 tx_packet_index = lat->tx_packet_index;
184                 if (tx_packet_index > old_tx_packet_index) {
185                         if (tx_packet_index - old_tx_packet_index < small) {
186                                 // The diff is small => increasing index count
187                         } else {
188                                 // The diff is big => it is more likely that the previous packet was overflow
189                                 n_overflow--;
190                         }
191                 } else {
192                         if (old_tx_packet_index - tx_packet_index < small) {
193                                 // The diff is small => packet reorder
194                         } else {
195                                 // The diff is big => it is more likely that this is an overflow
196                                 n_overflow++;
197                         }
198                 }
199                 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
200                 old_tx_packet_index = tx_packet_index;
201                 lat++;
202         }
203 }
204
205 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
206 {
207         uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
208         uint32_t small = UINT32_MAX >> 1;
209         lat++;
210
211         /*
212          * Same algorithm as above, but with time.
213          * Note that:
214          *      - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
215          *      - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
216          *              => algorithm does not work if receiving less than 1 packet every 2 seconds
217          */
218         for (uint32_t i = 1; i < count; i++) {
219                 tx_time = lat->tx_time;
220                 if (tx_time > old_tx_time) {
221                         if (tx_time - old_tx_time > small) {
222                                 n_overflow--;
223                         }
224                 } else {
225                         if (old_tx_time - tx_time > small) {
226                                 n_overflow++;
227                         }
228                 }
229                 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
230                 old_tx_time = tx_time;
231                 lat++;
232         }
233 }
234
235 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
236 {
237         struct lat_test *lat_test = task->lat_test;
238
239         for (uint32_t j = 0; j < task->generator_count; j++) {
240                 struct early_loss_detect *eld = &task->eld[j];
241
242                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
243         }
244 }
245
246 static void task_lat_reset_eld(struct task_lat *task)
247 {
248         for (uint32_t j = 0; j < task->generator_count; j++) {
249                 early_loss_detect_reset(&task->eld[j]);
250         }
251 }
252
253 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
254 {
255         uint64_t min_tsc = UINT64_MAX;
256
257         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
258                 if (min_tsc > task->latency_buffer[i].tx_time)
259                         min_tsc = task->latency_buffer[i].tx_time;
260         }
261
262         return min_tsc << LATENCY_ACCURACY;
263 }
264
265 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
266 {
267         uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
268
269         return lat << LATENCY_ACCURACY;
270 }
271
272 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
273 {
274         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
275 }
276
277 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
278 {
279         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
280 }
281
282 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
283 {
284         return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
285 }
286
287 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
288 {
289         return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
290 }
291
292 static void lat_write_latency_to_file(struct task_lat *task)
293 {
294         uint64_t min_tsc;
295         uint64_t n_loss;
296
297         min_tsc = lat_latency_buffer_get_min_tsc(task);
298
299         // Dumping all packet statistics
300         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
301         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
302         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
303                 struct lat_info *lat_info = &task->latency_buffer[i];
304                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
305                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
306                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
307
308                 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
309                         lat_info->rx_packet_index,
310                         lat_info->port_queue_id,
311                         lat_info->tx_packet_index,
312                         tsc_to_nsec(lat_tsc),
313                         tsc_to_nsec(rx_tsc - min_tsc),
314                         tsc_to_nsec(tx_tsc - min_tsc));
315         }
316
317         // To detect dropped packets, we need to sort them based on TX
318         if (task->unique_id_pos) {
319                 plogx_info("Adapting tx_packet_index\n");
320                 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
321                 plogx_info("Sorting packets based on tx_packet_index\n");
322                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
323                 plogx_info("Sorted packets based on packet_index\n");
324         } else {
325                 plogx_info("Adapting tx_time\n");
326                 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
327                 plogx_info("Sorting packets based on tx_time\n");
328                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
329                 plogx_info("Sorted packets based on packet_time\n");
330         }
331
332         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
333         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
334         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
335
336         for (uint32_t i = 0; i < task->generator_count;i++)
337                 task->prev_tx_packet_index[i] = -1;
338
339         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
340                 struct lat_info *lat_info = &task->latency_buffer[i];
341                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
342                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
343                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
344                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
345                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
346
347                 /* Packet n + ACCURACY_BUFFER_SIZE delivers the TX error for packet n,
348                    hence the last ACCURACY_BUFFER_SIZE packets do no have TX error. */
349                 if (i + ACCURACY_BUFFER_SIZE >= task->latency_buffer_idx) {
350                         tx_err_tsc = 0;
351                 }
352
353                 if (lat_info->port_queue_id >= task->generator_count) {
354                         plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
355                                 lat_info->port_queue_id, lat_info->tx_packet_index);
356                         continue;
357                 }
358                 // Log dropped packet
359                 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
360                 if (n_loss)
361                         fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
362                                 lat_info->port_queue_id,
363                                 lat_info->tx_packet_index - n_loss, n_loss);
364                 // Log next packet
365                 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
366                         lat_info->port_queue_id,
367                         lat_info->tx_packet_index,
368                         lat_info->rx_packet_index,
369                         tsc_to_nsec(lat_tsc),
370                         tsc_to_nsec(tx_tsc - min_tsc),
371                         tsc_to_nsec(rx_tsc - min_tsc),
372                         tsc_to_nsec(tx_err_tsc),
373                         tsc_to_nsec(rx_err_tsc));
374 #ifdef LAT_DEBUG
375                 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
376                         lat_info->id_in_bulk,
377                         lat_info->bulk_size,
378                         tsc_to_nsec(lat_info->begin - min_tsc),
379                         tsc_to_nsec(lat_info->before - min_tsc),
380                         tsc_to_nsec(lat_info->after - min_tsc));
381 #endif
382                 fprintf(task->fp_tx, "\n");
383                 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
384         }
385         fflush(task->fp_rx);
386         fflush(task->fp_tx);
387         task->latency_buffer_idx = 0;
388 }
389
390 static void lat_stop(struct task_base *tbase)
391 {
392         struct task_lat *task = (struct task_lat *)tbase;
393
394         if (task->unique_id_pos) {
395                 task_lat_count_remaining_lost_packets(task);
396                 task_lat_reset_eld(task);
397         }
398         if (task->latency_buffer)
399                 lat_write_latency_to_file(task);
400 }
401
402 #ifdef LAT_DEBUG
403 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
404 {
405         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
406
407         lat_info->bulk_size = bulk_size;
408         lat_info->id_in_bulk = id_in_bulk;
409         lat_info->begin = task->begin;
410         lat_info->before = task->base.aux->tsc_rx.before;
411         lat_info->after = task->base.aux->tsc_rx.after;
412 }
413 #endif
414
415 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
416 {
417         struct lat_info *lat_info;
418
419         /* If unique_id_pos is specified then latency is stored per
420            packet being sent. Lost packets are detected runtime, and
421            latency stored for those packets will be 0 */
422         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
423         lat_info->rx_packet_index = rx_packet_index;
424         lat_info->tx_packet_index = packet_id;
425         lat_info->port_queue_id = generator_id;
426         lat_info->rx_time = rx_time;
427         lat_info->tx_time = tx_time;
428         lat_info->rx_err = rx_err;
429         lat_info->tx_err = tx_err;
430 }
431
432 static uint32_t task_lat_early_loss_detect(struct task_lat *task, uint32_t packet_id, uint8_t generator_id)
433 {
434         struct early_loss_detect *eld = &task->eld[generator_id];
435         return early_loss_detect_add(eld, packet_id);
436 }
437
438 static uint64_t tsc_extrapolate_backward(uint64_t link_speed, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
439 {
440         uint64_t tsc = tsc_from - (rte_get_tsc_hz()*bytes)/link_speed;
441         if (likely(tsc > tsc_minimum))
442                 return tsc;
443         else
444                 return tsc_minimum;
445 }
446
447 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
448 {
449         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
450         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
451
452         bucket_id = bucket_id < bucket_count? bucket_id : bucket_count;
453         lat_test->buckets[bucket_id]++;
454 }
455
456 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
457 {
458         lat_test->lost_packets += lost_packets;
459 }
460
461 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
462 {
463         if (error > lat_test->accuracy_limit_tsc)
464                 return;
465         lat_test->tot_pkts++;
466
467         lat_test->tot_lat += lat_tsc;
468         lat_test->tot_lat_error += error;
469
470         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
471         lat_test->var_lat += lat_tsc * lat_tsc;
472         lat_test->var_lat_error += 2 * lat_tsc * error;
473         lat_test->var_lat_error += error * error;
474
475         if (lat_tsc > lat_test->max_lat) {
476                 lat_test->max_lat = lat_tsc;
477                 lat_test->max_lat_error = error;
478         }
479         if (lat_tsc < lat_test->min_lat) {
480                 lat_test->min_lat = lat_tsc;
481                 lat_test->min_lat_error = error;
482         }
483
484 #ifdef LATENCY_HISTOGRAM
485         lat_test_histogram_add(lat_test, lat_tsc);
486 #endif
487 }
488
489 static int task_lat_can_store_latency(struct task_lat *task)
490 {
491         return task->latency_buffer_idx < task->latency_buffer_size;
492 }
493
494 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
495 {
496         uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
497
498         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
499
500         if (task_lat_can_store_latency(task)) {
501                 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
502         }
503 }
504
505 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
506 {
507         struct task_lat *task = (struct task_lat *)tbase;
508
509         // If link is down, link_speed is 0
510         if (unlikely(task->link_speed == 0)) {
511                 if (task->port && task->port->link_speed != 0) {
512                         task->link_speed = task->port->link_speed * 125000L;
513                         plog_info("\tPort %u: link speed is %ld Mbps\n",
514                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
515                 } else if (n_pkts) {
516                         return task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
517                 } else {
518                         return 0;
519                 }
520         }
521
522         if (n_pkts == 0) {
523                 task->begin = tbase->aux->tsc_rx.before;
524                 return 0;
525         }
526
527         task_lat_update_lat_test(task);
528
529         // Remember those packets with bad length or bad signature
530         uint64_t pkt_bad_len_sig[(MAX_RX_PKT_ALL + 63) / 64];
531 #define BIT64_SET(a64, bit)     a64[bit / 64] |=  (((uint64_t)1) << (bit & 63))
532 #define BIT64_CLR(a64, bit)     a64[bit / 64] &= ~(((uint64_t)1) << (bit & 63))
533 #define BIT64_TEST(a64, bit)    a64[bit / 64]  &  (((uint64_t)1) << (bit & 63))
534
535         /* Go once through all received packets and read them.  If
536            packet has just been modified by another core, the cost of
537            latency will be partialy amortized though the bulk size */
538         for (uint16_t j = 0; j < n_pkts; ++j) {
539                 struct rte_mbuf *mbuf = mbufs[j];
540                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
541
542                 // Remember those packets which are too short to hold the values that we expect
543                 if (unlikely(rte_pktmbuf_pkt_len(mbuf) < task->min_pkt_len))
544                         BIT64_SET(pkt_bad_len_sig, j);
545                 else
546                         BIT64_CLR(pkt_bad_len_sig, j);
547         }
548
549         if (task->sig_pos) {
550                 for (uint16_t j = 0; j < n_pkts; ++j) {
551                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
552                                 continue;
553                         // Remember those packets with bad signature
554                         if (likely(*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig))
555                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
556                         else
557                                 BIT64_SET(pkt_bad_len_sig, j);
558                 }
559         } else {
560                 for (uint16_t j = 0; j < n_pkts; ++j) {
561                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
562                                 continue;
563                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
564                 }
565         }
566
567         uint32_t bytes_total_in_bulk = 0;
568         // Find RX time of first packet, for RX accuracy
569         for (uint16_t j = 0; j < n_pkts; ++j) {
570                 uint16_t flipped = n_pkts - 1 - j;
571
572                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
573                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
574         }
575
576         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
577
578         uint64_t rx_time_err;
579         uint64_t pkt_rx_time64 = tsc_extrapolate_backward(task->link_speed, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
580         if (unlikely((task->begin >> LATENCY_ACCURACY) > pkt_rx_time64)) {
581                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
582                 rx_time_err = pkt_rx_time64 - (task->last_pkts_tsc >> LATENCY_ACCURACY);
583         } else {
584                 rx_time_err = pkt_rx_time64 - (task->begin >> LATENCY_ACCURACY);
585         }
586
587         for (uint16_t j = 0; j < n_pkts; ++j) {
588                 // Used to display % of packets within accuracy limit vs. total number of packets (used_col)
589                 task->lat_test->tot_all_pkts++;
590
591                 // Skip those packets with bad length or bad signature
592                 if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
593                         continue;
594
595                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
596                 uint8_t *hdr = rx_pkt_meta->hdr;
597
598                 uint32_t pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
599                 uint32_t pkt_tx_time = rx_pkt_meta->pkt_tx_time;
600
601                 uint8_t generator_id;
602                 uint32_t packet_id;
603                 if (task->unique_id_pos) {
604                         struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
605                         unique_id_get(unique_id, &generator_id, &packet_id);
606
607                         if (unlikely(generator_id >= task->generator_count)) {
608                                 /* No need to remember unexpected packet at this stage
609                                 BIT64_SET(pkt_bad_len_sig, j);
610                                 */
611                                 // Skip unexpected packet
612                                 continue;
613                         }
614
615                         lat_test_add_lost(task->lat_test, task_lat_early_loss_detect(task, packet_id, generator_id));
616                 } else {
617                         generator_id = 0;
618                         packet_id = task->rx_packet_index;
619                 }
620
621                 /* If accuracy is enabled, latency is reported with a
622                    delay of ACCURACY_BUFFER_SIZE packets since the generator puts the
623                    accuracy for packet N into packet N + ACCURACY_BUFFER_SIZE. The delay
624                    ensures that all reported latencies have both rx
625                    and tx error. */
626                 if (task->accur_pos) {
627                         uint32_t tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
628
629                         struct delayed_latency_entry *delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_BUFFER_SIZE);
630
631                         if (delayed_latency_entry) {
632                                 task_lat_store_lat(task,
633                                                    delayed_latency_entry->rx_packet_id,
634                                                    delayed_latency_entry->pkt_rx_time,
635                                                    delayed_latency_entry->pkt_tx_time,
636                                                    delayed_latency_entry->rx_time_err,
637                                                    tx_time_err,
638                                                    delayed_latency_entry->tx_packet_id,
639                                                    delayed_latency_entry->generator_id);
640                         }
641
642                         delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
643                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
644                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
645                         delayed_latency_entry->rx_time_err = rx_time_err;
646                         delayed_latency_entry->rx_packet_id = task->rx_packet_index;
647                         delayed_latency_entry->tx_packet_id = packet_id;
648                         delayed_latency_entry->generator_id = generator_id;
649                 } else {
650                         task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
651                 }
652
653                 // Bad/unexpected packets do not need to be indexed
654                 task->rx_packet_index++;
655         }
656
657         task->begin = tbase->aux->tsc_rx.before;
658         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
659
660         return task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
661 }
662
663 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
664 {
665         const int socket_id = rte_lcore_to_socket_id(core_id);
666         char name[256];
667         size_t latency_buffer_mem_size = 0;
668
669         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
670                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
671
672         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
673
674         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
675         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
676
677         sprintf(name, "latency.rx_%u.txt", core_id);
678         task->fp_rx = fopen(name, "w+");
679         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
680
681         sprintf(name, "latency.tx_%u.txt", core_id);
682         task->fp_tx = fopen(name, "w+");
683         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
684
685         task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
686         PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
687 }
688
689 static void task_init_generator_count(struct task_lat *task)
690 {
691         uint8_t *generator_count = prox_sh_find_system("generator_count");
692
693         if (generator_count == NULL) {
694                 task->generator_count = 1;
695                 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
696         } else
697                 task->generator_count = *generator_count;
698         plog_info("\tLatency using %u generators\n", task->generator_count);
699 }
700
701 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
702 {
703         size_t eld_mem_size;
704
705         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
706         task->eld = prox_zmalloc(eld_mem_size, socket_id);
707         PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
708 }
709
710 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
711 {
712         task->limit = nsec_to_tsc(accuracy_limit_nsec);
713 }
714
715 static void lat_start(struct task_base *tbase)
716 {
717         struct task_lat *task = (struct task_lat *)tbase;
718
719         if (task->port) {
720                 // task->port->link_speed reports the link speed in Mbps e.g. 40k for a 40 Gbps NIC.
721                 // task->link_speed reports link speed in Bytes per sec.
722                 // It can be 0 if link is down, and must hence be updated in fast path.
723                 task->link_speed = task->port->link_speed * 125000L;
724                 if (task->link_speed)
725                         plog_info("\tPort %u: link speed is %ld Mbps\n",
726                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
727                 else
728                         plog_info("\tPort %u: link speed is %ld Mbps - link might be down\n",
729                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
730         }
731 }
732
733 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
734 {
735         struct task_lat *task = (struct task_lat *)tbase;
736         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
737
738         task->lat_pos = targ->lat_pos;
739         task->accur_pos = targ->accur_pos;
740         task->sig_pos = targ->sig_pos;
741         task->sig = targ->sig;
742
743         task->unique_id_pos = targ->packet_id_pos;
744         task->latency_buffer_size = targ->latency_buffer_size;
745
746         PROX_PANIC(task->lat_pos == 0, "Missing 'lat pos' parameter in config file\n");
747         uint16_t min_pkt_len = task->lat_pos + sizeof(uint32_t);
748         if (task->unique_id_pos && (
749                 min_pkt_len < task->unique_id_pos + sizeof(struct unique_id)))
750                 min_pkt_len = task->unique_id_pos + sizeof(struct unique_id);
751         if (task->accur_pos && (
752                 min_pkt_len < task->accur_pos + sizeof(uint32_t)))
753                 min_pkt_len = task->accur_pos + sizeof(uint32_t);
754         if (task->sig_pos && (
755                 min_pkt_len < task->sig_pos + sizeof(uint32_t)))
756                 min_pkt_len = task->sig_pos + sizeof(uint32_t);
757         task->min_pkt_len = min_pkt_len;
758
759         task_init_generator_count(task);
760
761         if (task->latency_buffer_size) {
762                 init_task_lat_latency_buffer(task, targ->lconf->id);
763         }
764
765         if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
766                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
767         }
768
769         if (task->accur_pos) {
770                 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
771                 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
772                 for (uint i = 0; i < task->generator_count; i++) {
773                         task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
774                         PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
775                 }
776                 if (task->unique_id_pos == 0) {
777                         /* When using accuracy feature, the accuracy from TX is written ACCURACY_BUFFER_SIZE packets later
778                         * We can only retrieve the good packet if a packet id is written to it.
779                         * Otherwise we will use the packet RECEIVED ACCURACY_BUFFER_SIZE packets ago which is OK if
780                         * packets are not re-ordered. If packets are re-ordered, then the matching between
781                         * the tx accuracy znd the latency is wrong.
782                         */
783                         plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
784                 }
785         }
786
787         task->lt[0].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
788         task->lt[1].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
789         if (task->unique_id_pos) {
790                 task_lat_init_eld(task, socket_id);
791                 task_lat_reset_eld(task);
792         }
793         task->lat_test = &task->lt[task->using_lt];
794
795         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
796         task->rx_pkt_meta = prox_zmalloc(MAX_RX_PKT_ALL * sizeof(*task->rx_pkt_meta), socket_id);
797         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
798
799         task->link_speed = UINT64_MAX;
800         if (targ->nb_rxports) {
801                 // task->port structure is only used while starting handle_lat to get the link_speed.
802                 // link_speed can not be quiried at init as the port has not been initialized yet.
803                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
804                 task->port = port;
805         }
806 }
807
808 static struct task_init task_init_lat = {
809         .mode_str = "lat",
810         .init = init_task_lat,
811         .handle = handle_lat_bulk,
812         .start = lat_start,
813         .stop = lat_stop,
814         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_RX_ALL | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
815         .size = sizeof(struct task_lat)
816 };
817
818 __attribute__((constructor)) static void reg_task_lat(void)
819 {
820         reg_task(&task_init_lat);
821 }