Fix toeplitz initialization key and rss key len for mlx4
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2019 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     11
38 #define ACCURACY_BUFFER_SIZE    (2 * ACCURACY_WINDOW)
39
40 struct lat_info {
41         uint32_t rx_packet_index;
42         uint64_t tx_packet_index;
43         uint32_t tx_err;
44         uint32_t rx_err;
45         uint64_t rx_time;
46         uint64_t tx_time;
47         uint16_t port_queue_id;
48 #ifdef LAT_DEBUG
49         uint16_t id_in_bulk;
50         uint16_t bulk_size;
51         uint64_t begin;
52         uint64_t after;
53         uint64_t before;
54 #endif
55 };
56
57 struct delayed_latency_entry {
58         uint32_t rx_packet_id;
59         uint32_t tx_packet_id;
60         uint32_t packet_id;
61         uint8_t generator_id;
62         uint64_t pkt_rx_time;
63         uint64_t pkt_tx_time;   // Time written into packets by gen. Unit is TSC >> LATENCY_ACCURACY
64         uint64_t rx_time_err;
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
68 {
69         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70         if (delayed_latency_entry->packet_id == packet_id)
71                 return delayed_latency_entry;
72         else
73                 return NULL;
74 }
75
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
77 {
78         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79         delayed_latency_entry->packet_id = packet_id;
80         return delayed_latency_entry;
81 }
82
83 struct rx_pkt_meta_data {
84         uint8_t  *hdr;
85         uint32_t pkt_tx_time;
86         uint32_t bytes_after_in_bulk;
87 };
88
89 struct loss_buffer {
90         uint32_t packet_id;
91         uint32_t n;
92 };
93
94 struct task_lat {
95         struct task_base base;
96         uint64_t limit;
97         uint64_t rx_packet_index;
98         uint64_t last_pkts_tsc;
99         struct delayed_latency_entry **delayed_latency_entries;
100         struct lat_info *latency_buffer;
101         uint32_t latency_buffer_idx;
102         uint32_t latency_buffer_size;
103         uint64_t begin;
104         uint16_t lat_pos;
105         uint16_t unique_id_pos;
106         uint16_t accur_pos;
107         uint16_t sig_pos;
108         uint32_t sig;
109         volatile uint16_t use_lt; /* which lt to use, */
110         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
111         struct lat_test lt[2];
112         struct lat_test *lat_test;
113         uint32_t generator_count;
114         uint16_t min_pkt_len;
115         struct early_loss_detect *eld;
116         struct rx_pkt_meta_data *rx_pkt_meta;
117         // Following fields are only used when starting or stopping, not in general runtime
118         uint64_t *prev_tx_packet_index;
119         FILE *fp_loss;
120         FILE *fp_rx;
121         FILE *fp_tx;
122         struct prox_port_cfg *port;
123         uint64_t *bytes_to_tsc;
124         uint64_t *previous_packet;
125         uint32_t loss_buffer_size;
126         struct loss_buffer *loss_buffer;
127         uint32_t loss_id;
128 };
129 /* This function calculate the difference between rx and tx_time
130  * Both values are uint32_t (see handle_lat_bulk)
131  * rx time should be higher than tx_time...except every UINT32_MAX
132  * cycles, when rx_time overflows.
133  * As the return value is also uint32_t, returning (rx_time - tx_time)
134  * is also fine when it overflows.
135  */
136 static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
137 {
138         return rx_time - tx_time;
139 }
140
141 uint32_t task_lat_get_latency_bucket_size(struct task_lat *task)
142 {
143         return task->lat_test->bucket_size;
144 }
145
146 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
147 {
148         if (task->use_lt == task->using_lt)
149                 return &task->lt[!task->using_lt];
150         return NULL;
151 }
152
153 void task_lat_use_other_latency_meassurement(struct task_lat *task)
154 {
155         task->use_lt = !task->using_lt;
156 }
157
158 static void task_lat_update_lat_test(struct task_lat *task)
159 {
160         if (task->use_lt != task->using_lt) {
161                 task->using_lt = task->use_lt;
162                 task->lat_test = &task->lt[task->using_lt];
163                 task->lat_test->accuracy_limit_tsc = task->limit;
164         }
165 }
166
167 static int compare_tx_time(const void *val1, const void *val2)
168 {
169         const struct lat_info *ptr1 = val1;
170         const struct lat_info *ptr2 = val2;
171
172         return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
173 }
174
175 static int compare_tx_packet_index(const void *val1, const void *val2)
176 {
177         const struct lat_info *ptr1 = val1;
178         const struct lat_info *ptr2 = val2;
179
180         return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
181 }
182
183 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
184 {
185         uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
186         uint32_t small = UINT32_MAX >> 1;
187
188         lat++;
189
190         /* Buffer is sorted so far by RX time.
191          * We might have packets being reordered by SUT.
192          *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
193          * Note that:
194          *      - overflow only happens if receiving and storing 4 billions packets...
195          *      - a absolute difference of less than 2 billion packets is not considered as an overflow
196          */
197         for (uint32_t i = 1; i < count; i++) {
198                 tx_packet_index = lat->tx_packet_index;
199                 if (tx_packet_index > old_tx_packet_index) {
200                         if (tx_packet_index - old_tx_packet_index < small) {
201                                 // The diff is small => increasing index count
202                         } else {
203                                 // The diff is big => it is more likely that the previous packet was overflow
204                                 n_overflow--;
205                         }
206                 } else {
207                         if (old_tx_packet_index - tx_packet_index < small) {
208                                 // The diff is small => packet reorder
209                         } else {
210                                 // The diff is big => it is more likely that this is an overflow
211                                 n_overflow++;
212                         }
213                 }
214                 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
215                 old_tx_packet_index = tx_packet_index;
216                 lat++;
217         }
218 }
219
220 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
221 {
222         uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
223         uint32_t small = UINT32_MAX >> 1;
224         lat++;
225
226         /*
227          * Same algorithm as above, but with time.
228          * Note that:
229          *      - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
230          *      - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
231          *              => algorithm does not work if receiving less than 1 packet every 2 seconds
232          */
233         for (uint32_t i = 1; i < count; i++) {
234                 tx_time = lat->tx_time;
235                 if (tx_time > old_tx_time) {
236                         if (tx_time - old_tx_time > small) {
237                                 n_overflow--;
238                         }
239                 } else {
240                         if (old_tx_time - tx_time > small) {
241                                 n_overflow++;
242                         }
243                 }
244                 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
245                 old_tx_time = tx_time;
246                 lat++;
247         }
248 }
249
250 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
251 {
252         struct lat_test *lat_test = task->lat_test;
253
254         for (uint32_t j = 0; j < task->generator_count; j++) {
255                 struct early_loss_detect *eld = &task->eld[j];
256
257                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
258         }
259 }
260
261 static void task_lat_reset_eld(struct task_lat *task)
262 {
263         for (uint32_t j = 0; j < task->generator_count; j++) {
264                 early_loss_detect_reset(&task->eld[j]);
265         }
266 }
267
268 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
269 {
270         uint64_t min_tsc = UINT64_MAX;
271
272         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
273                 if (min_tsc > task->latency_buffer[i].tx_time)
274                         min_tsc = task->latency_buffer[i].tx_time;
275         }
276
277         return min_tsc << LATENCY_ACCURACY;
278 }
279
280 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
281 {
282         uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
283
284         return lat << LATENCY_ACCURACY;
285 }
286
287 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
288 {
289         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
290 }
291
292 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
293 {
294         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
295 }
296
297 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
298 {
299         return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
300 }
301
302 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
303 {
304         return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
305 }
306
307 static void lat_write_latency_to_file(struct task_lat *task)
308 {
309         uint64_t min_tsc;
310         uint64_t n_loss;
311
312         min_tsc = lat_latency_buffer_get_min_tsc(task);
313
314         // Dumping all packet statistics
315         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
316         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
317         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
318                 struct lat_info *lat_info = &task->latency_buffer[i];
319                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
320                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
321                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
322
323                 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
324                         lat_info->rx_packet_index,
325                         lat_info->port_queue_id,
326                         lat_info->tx_packet_index,
327                         tsc_to_nsec(lat_tsc),
328                         tsc_to_nsec(rx_tsc - min_tsc),
329                         tsc_to_nsec(tx_tsc - min_tsc));
330         }
331
332         // To detect dropped packets, we need to sort them based on TX
333         if (task->unique_id_pos) {
334                 plogx_info("Adapting tx_packet_index\n");
335                 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
336                 plogx_info("Sorting packets based on tx_packet_index\n");
337                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
338                 plogx_info("Sorted packets based on packet_index\n");
339         } else {
340                 plogx_info("Adapting tx_time\n");
341                 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
342                 plogx_info("Sorting packets based on tx_time\n");
343                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
344                 plogx_info("Sorted packets based on packet_time\n");
345         }
346
347         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
348         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
349         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
350
351         for (uint32_t i = 0; i < task->generator_count;i++)
352                 task->prev_tx_packet_index[i] = -1;
353
354         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
355                 struct lat_info *lat_info = &task->latency_buffer[i];
356                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
357                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
358                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
359                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
360                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
361
362                 /* Packet n + ACCURACY_WINDOW delivers the TX error for packet n,
363                    hence the last ACCURACY_WINDOW packets do no have TX error. */
364                 if (i + ACCURACY_WINDOW >= task->latency_buffer_idx) {
365                         tx_err_tsc = 0;
366                 }
367
368                 if (lat_info->port_queue_id >= task->generator_count) {
369                         plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
370                                 lat_info->port_queue_id, lat_info->tx_packet_index);
371                         continue;
372                 }
373                 // Log dropped packet
374                 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
375                 if (n_loss)
376                         fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
377                                 lat_info->port_queue_id,
378                                 lat_info->tx_packet_index - n_loss, n_loss);
379                 // Log next packet
380                 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
381                         lat_info->port_queue_id,
382                         lat_info->tx_packet_index,
383                         lat_info->rx_packet_index,
384                         tsc_to_nsec(lat_tsc),
385                         tsc_to_nsec(tx_tsc - min_tsc),
386                         tsc_to_nsec(rx_tsc - min_tsc),
387                         tsc_to_nsec(tx_err_tsc),
388                         tsc_to_nsec(rx_err_tsc));
389 #ifdef LAT_DEBUG
390                 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
391                         lat_info->id_in_bulk,
392                         lat_info->bulk_size,
393                         tsc_to_nsec(lat_info->begin - min_tsc),
394                         tsc_to_nsec(lat_info->before - min_tsc),
395                         tsc_to_nsec(lat_info->after - min_tsc));
396 #endif
397                 fprintf(task->fp_tx, "\n");
398                 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
399         }
400         fflush(task->fp_rx);
401         fflush(task->fp_tx);
402         task->latency_buffer_idx = 0;
403 }
404
405 static void lat_stop(struct task_base *tbase)
406 {
407         struct task_lat *task = (struct task_lat *)tbase;
408
409         if (task->unique_id_pos) {
410                 task_lat_count_remaining_lost_packets(task);
411                 task_lat_reset_eld(task);
412                 memset(task->previous_packet, 0, sizeof(task->previous_packet) * task->generator_count);
413         }
414         if (task->loss_id) {
415                 for (uint i = 0; i < task->loss_id; i++) {
416                         fprintf(task->fp_loss, "packet %d: %d\n", task->loss_buffer[i].packet_id, task->loss_buffer[i].n);
417                 }
418         }
419         task->lat_test->lost_packets = 0;
420         if (task->latency_buffer)
421                 lat_write_latency_to_file(task);
422 }
423
424 #ifdef LAT_DEBUG
425 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
426 {
427         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
428
429         lat_info->bulk_size = bulk_size;
430         lat_info->id_in_bulk = id_in_bulk;
431         lat_info->begin = task->begin;
432         lat_info->before = task->base.aux->tsc_rx.before;
433         lat_info->after = task->base.aux->tsc_rx.after;
434 }
435 #endif
436
437 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
438 {
439         struct lat_info *lat_info;
440
441         /* If unique_id_pos is specified then latency is stored per
442            packet being sent. Lost packets are detected runtime, and
443            latency stored for those packets will be 0 */
444         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
445         lat_info->rx_packet_index = rx_packet_index;
446         lat_info->tx_packet_index = packet_id;
447         lat_info->port_queue_id = generator_id;
448         lat_info->rx_time = rx_time;
449         lat_info->tx_time = tx_time;
450         lat_info->rx_err = rx_err;
451         lat_info->tx_err = tx_err;
452 }
453
454 static uint32_t task_lat_early_loss_detect(struct task_lat *task, uint32_t packet_id, uint8_t generator_id)
455 {
456         struct early_loss_detect *eld = &task->eld[generator_id];
457         return early_loss_detect_add(eld, packet_id);
458 }
459
460 static void lat_test_check_duplicate(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
461 {
462         struct early_loss_detect *eld = &task->eld[generator_id];
463         uint32_t old_queue_id, queue_pos;
464
465         queue_pos = packet_id & PACKET_QUEUE_MASK;
466         old_queue_id = eld->entries[queue_pos];
467         if ((packet_id >> PACKET_QUEUE_BITS) == old_queue_id)
468                 lat_test->duplicate++;
469 }
470
471 static uint64_t tsc_extrapolate_backward(struct task_lat *task, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
472 {
473 #ifdef NO_LAT_EXTRAPOLATION
474         uint64_t tsc = tsc_from;
475 #else
476         uint64_t tsc = tsc_from - task->bytes_to_tsc[bytes];
477 #endif
478         if (likely(tsc > tsc_minimum))
479                 return tsc;
480         else
481                 return tsc_minimum;
482 }
483
484 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
485 {
486         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
487         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
488
489         bucket_id = bucket_id < bucket_count? bucket_id : (bucket_count - 1);
490         lat_test->buckets[bucket_id]++;
491 }
492
493 static void lat_test_check_ordering(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
494 {
495         if (packet_id < task->previous_packet[generator_id]) {
496                 lat_test->mis_ordered++;
497                 lat_test->extent += task->previous_packet[generator_id] - packet_id;
498         }
499         task->previous_packet[generator_id] = packet_id;
500 }
501
502 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
503 {
504         lat_test->lost_packets += lost_packets;
505 }
506
507 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
508 {
509         if (error > lat_test->accuracy_limit_tsc)
510                 return;
511         lat_test->tot_pkts++;
512
513         lat_test->tot_lat += lat_tsc;
514         lat_test->tot_lat_error += error;
515
516         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
517         lat_test->var_lat += lat_tsc * lat_tsc;
518         lat_test->var_lat_error += 2 * lat_tsc * error;
519         lat_test->var_lat_error += error * error;
520
521         if (lat_tsc > lat_test->max_lat) {
522                 lat_test->max_lat = lat_tsc;
523                 lat_test->max_lat_error = error;
524         }
525         if (lat_tsc < lat_test->min_lat) {
526                 lat_test->min_lat = lat_tsc;
527                 lat_test->min_lat_error = error;
528         }
529
530 #ifdef LATENCY_HISTOGRAM
531         lat_test_histogram_add(lat_test, lat_tsc);
532 #endif
533 }
534
535 static int task_lat_can_store_latency(struct task_lat *task)
536 {
537         return task->latency_buffer_idx < task->latency_buffer_size;
538 }
539
540 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
541 {
542         uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
543
544         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
545
546         if (task_lat_can_store_latency(task)) {
547                 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
548         }
549 }
550
551 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
552 {
553         struct task_lat *task = (struct task_lat *)tbase;
554         int rc;
555
556         if (n_pkts == 0) {
557                 task->begin = tbase->aux->tsc_rx.before;
558                 return 0;
559         }
560
561         task_lat_update_lat_test(task);
562
563         // Remember those packets with bad length or bad signature
564         uint32_t non_dp_count = 0;
565         uint64_t pkt_bad_len_sig = 0;
566 #define BIT64_SET(a64, bit)     a64 |=  (((uint64_t)1) << (bit & 63))
567 #define BIT64_CLR(a64, bit)     a64 &= ~(((uint64_t)1) << (bit & 63))
568 #define BIT64_TEST(a64, bit)    a64  &  (((uint64_t)1) << (bit & 63))
569
570         /* Go once through all received packets and read them.  If
571            packet has just been modified by another core, the cost of
572            latency will be partialy amortized though the bulk size */
573         for (uint16_t j = 0; j < n_pkts; ++j) {
574                 struct rte_mbuf *mbuf = mbufs[j];
575                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
576
577                 // Remember those packets which are too short to hold the values that we expect
578                 if (unlikely(rte_pktmbuf_pkt_len(mbuf) < task->min_pkt_len)) {
579                         BIT64_SET(pkt_bad_len_sig, j);
580                         non_dp_count++;
581                 } else
582                         BIT64_CLR(pkt_bad_len_sig, j);
583         }
584
585         if (task->sig_pos) {
586                 for (uint16_t j = 0; j < n_pkts; ++j) {
587                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
588                                 continue;
589                         // Remember those packets with bad signature
590                         if (likely(*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig))
591                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
592                         else {
593                                 BIT64_SET(pkt_bad_len_sig, j);
594                                 non_dp_count++;
595                         }
596                 }
597         } else {
598                 for (uint16_t j = 0; j < n_pkts; ++j) {
599                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
600                                 continue;
601                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
602                 }
603         }
604
605         uint32_t bytes_total_in_bulk = 0;
606         // Find RX time of first packet, for RX accuracy
607         for (uint16_t j = 0; j < n_pkts; ++j) {
608                 uint16_t flipped = n_pkts - 1 - j;
609
610                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
611                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
612         }
613
614         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
615
616         uint64_t rx_time_err;
617         uint64_t pkt_rx_time64 = tsc_extrapolate_backward(task, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
618         if (unlikely((task->begin >> LATENCY_ACCURACY) > pkt_rx_time64)) {
619                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
620                 rx_time_err = pkt_rx_time64 - (task->last_pkts_tsc >> LATENCY_ACCURACY);
621         } else {
622                 rx_time_err = pkt_rx_time64 - (task->begin >> LATENCY_ACCURACY);
623         }
624
625         TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, non_dp_count);
626         for (uint16_t j = 0; j < n_pkts; ++j) {
627                 // Used to display % of packets within accuracy limit vs. total number of packets (used_col)
628                 task->lat_test->tot_all_pkts++;
629
630                 // Skip those packets with bad length or bad signature
631                 if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
632                         continue;
633
634                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
635                 uint8_t *hdr = rx_pkt_meta->hdr;
636
637                 uint32_t pkt_rx_time = tsc_extrapolate_backward(task, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
638                 uint32_t pkt_tx_time = rx_pkt_meta->pkt_tx_time;
639
640                 uint8_t generator_id;
641                 uint32_t packet_id;
642                 if (task->unique_id_pos) {
643                         struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
644                         unique_id_get(unique_id, &generator_id, &packet_id);
645
646                         if (unlikely(generator_id >= task->generator_count)) {
647                                 /* No need to remember unexpected packet at this stage
648                                 BIT64_SET(pkt_bad_len_sig, j);
649                                 */
650                                 // Skip unexpected packet
651                                 continue;
652                         }
653                         lat_test_check_ordering(task, task->lat_test, packet_id, generator_id);
654                         lat_test_check_duplicate(task, task->lat_test, packet_id, generator_id);
655                         uint32_t loss =  task_lat_early_loss_detect(task, packet_id, generator_id);
656                         if (loss) {
657                                 lat_test_add_lost(task->lat_test, loss);
658                                 if (task->loss_id < task->loss_buffer_size) {
659                                         task->loss_buffer[task->loss_id].packet_id = packet_id;
660                                         task->loss_buffer[task->loss_id++].n = loss;
661                                 }
662                         }
663                 } else {
664                         generator_id = 0;
665                         packet_id = task->rx_packet_index;
666                 }
667
668                 /* If accuracy is enabled, latency is reported with a
669                    delay of ACCURACY_WINDOW packets since the generator puts the
670                    accuracy for packet N into packet N + ACCURACY_WINDOW. The delay
671                    ensures that all reported latencies have both rx
672                    and tx error. */
673                 if (task->accur_pos) {
674                         uint32_t tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
675
676                         struct delayed_latency_entry *delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_WINDOW);
677
678                         if (delayed_latency_entry) {
679                                 task_lat_store_lat(task,
680                                                    delayed_latency_entry->rx_packet_id,
681                                                    delayed_latency_entry->pkt_rx_time,
682                                                    delayed_latency_entry->pkt_tx_time,
683                                                    delayed_latency_entry->rx_time_err,
684                                                    tx_time_err,
685                                                    delayed_latency_entry->tx_packet_id,
686                                                    delayed_latency_entry->generator_id);
687                         }
688
689                         delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
690                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
691                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
692                         delayed_latency_entry->rx_time_err = rx_time_err;
693                         delayed_latency_entry->rx_packet_id = task->rx_packet_index;
694                         delayed_latency_entry->tx_packet_id = packet_id;
695                         delayed_latency_entry->generator_id = generator_id;
696                 } else {
697                         task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
698                 }
699
700                 // Bad/unexpected packets do not need to be indexed
701                 task->rx_packet_index++;
702         }
703
704         if (n_pkts < MAX_PKT_BURST)
705                 task->begin = tbase->aux->tsc_rx.before;
706         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
707
708         rc = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
709         // non_dp_count should not be drop-handled, as there are all by definition considered as not handled
710         // RX = DISCARDED + HANDLED + NON_DP + (TX - TX_NON_DP) + TX_FAIL
711         TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, -non_dp_count);
712         return rc;
713 }
714
715 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
716 {
717         const int socket_id = rte_lcore_to_socket_id(core_id);
718         char name[256];
719         size_t latency_buffer_mem_size = 0;
720
721         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
722                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
723
724         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
725
726         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
727         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
728
729         sprintf(name, "latency.rx_%u.txt", core_id);
730         task->fp_rx = fopen(name, "w+");
731         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
732
733         sprintf(name, "latency.tx_%u.txt", core_id);
734         task->fp_tx = fopen(name, "w+");
735         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
736
737         task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
738         PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
739 }
740
741 static void task_init_generator_count(struct task_lat *task)
742 {
743         uint8_t *generator_count = prox_sh_find_system("generator_count");
744
745         if (generator_count == NULL) {
746                 task->generator_count = 1;
747                 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
748         } else
749                 task->generator_count = *generator_count;
750         plog_info("\t\tLatency using %u generators\n", task->generator_count);
751 }
752
753 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
754 {
755         size_t eld_mem_size;
756
757         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
758         task->eld = prox_zmalloc(eld_mem_size, socket_id);
759         PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
760 }
761
762 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
763 {
764         task->limit = nsec_to_tsc(accuracy_limit_nsec);
765 }
766
767 static void lat_start(struct task_base *tbase)
768 {
769         struct task_lat *task = (struct task_lat *)tbase;
770
771 }
772
773 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
774 {
775         struct task_lat *task = (struct task_lat *)tbase;
776         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
777
778         task->lat_pos = targ->lat_pos;
779         task->accur_pos = targ->accur_pos;
780         task->sig_pos = targ->sig_pos;
781         task->sig = targ->sig;
782
783         task->unique_id_pos = targ->packet_id_pos;
784         task->latency_buffer_size = targ->latency_buffer_size;
785
786         PROX_PANIC(task->lat_pos == 0, "Missing 'lat pos' parameter in config file\n");
787         uint16_t min_pkt_len = task->lat_pos + sizeof(uint32_t);
788         if (task->unique_id_pos && (
789                 min_pkt_len < task->unique_id_pos + sizeof(struct unique_id)))
790                 min_pkt_len = task->unique_id_pos + sizeof(struct unique_id);
791         if (task->accur_pos && (
792                 min_pkt_len < task->accur_pos + sizeof(uint32_t)))
793                 min_pkt_len = task->accur_pos + sizeof(uint32_t);
794         if (task->sig_pos && (
795                 min_pkt_len < task->sig_pos + sizeof(uint32_t)))
796                 min_pkt_len = task->sig_pos + sizeof(uint32_t);
797         task->min_pkt_len = min_pkt_len;
798
799         task_init_generator_count(task);
800
801         if (task->latency_buffer_size) {
802                 init_task_lat_latency_buffer(task, targ->lconf->id);
803         }
804
805         char name[256];
806         sprintf(name, "loss_%u.txt", targ->lconf->id);
807         task->fp_loss = fopen(name, "w+");
808         PROX_PANIC(task->fp_loss == NULL, "Failed to open %s\n", name);
809
810         if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
811                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
812         }
813
814         if (task->accur_pos) {
815                 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
816                 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
817                 for (uint i = 0; i < task->generator_count; i++) {
818                         task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
819                         PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
820                 }
821                 if (task->unique_id_pos == 0) {
822                         /* When using accuracy feature, the accuracy from TX is written ACCURACY_WINDOW packets later
823                         * We can only retrieve the good packet if a packet id is written to it.
824                         * Otherwise we will use the packet RECEIVED ACCURACY_WINDOW packets ago which is OK if
825                         * packets are not re-ordered. If packets are re-ordered, then the matching between
826                         * the TX accuracy and the latency is wrong.
827                         */
828                         plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
829                 }
830         }
831
832         task->lt[0].min_lat = -1;
833         task->lt[1].min_lat = -1;
834         task->lt[0].bucket_size = targ->bucket_size;
835         task->lt[1].bucket_size = targ->bucket_size;
836         if (task->unique_id_pos) {
837                 task_lat_init_eld(task, socket_id);
838                 task_lat_reset_eld(task);
839                 task->previous_packet = prox_zmalloc(sizeof(task->previous_packet) * task->generator_count , socket_id);
840                 PROX_PANIC(task->previous_packet == NULL, "Failed to allocate array for storing previous packet\n");
841         }
842         task->lat_test = &task->lt[task->using_lt];
843
844         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
845         task->rx_pkt_meta = prox_zmalloc(MAX_PKT_BURST * sizeof(*task->rx_pkt_meta), socket_id);
846         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
847
848         uint32_t max_frame_size = MAX_PKT_SIZE;
849         uint64_t bytes_per_hz = UINT64_MAX;
850         if (targ->nb_rxports) {
851                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
852                 max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE;
853
854                 // port->max_link_speed reports the maximum, non negotiated ink speed in Mbps e.g. 40k for a 40 Gbps NIC.
855                 // It can be UINT32_MAX (virtual devices or not supported by DPDK < 16.04)
856                 if (port->max_link_speed != UINT32_MAX) {
857                         bytes_per_hz = port->max_link_speed * 125000L;
858                         plog_info("\t\tPort %u: max link speed is %ld Mbps\n",
859                                 (uint8_t)(port - prox_port_cfg), 8 * bytes_per_hz / 1000000);
860                 }
861         }
862         task->loss_buffer_size = targ->loss_buffer_size;
863         task->loss_buffer = prox_zmalloc(task->loss_buffer_size * sizeof(struct loss_buffer), rte_lcore_to_socket_id(targ->lconf->id));
864         PROX_PANIC(task->loss_buffer == NULL,
865                 "Failed to allocate %lu bytes (in huge pages) for loss_buffer\n", task->loss_buffer_size * sizeof(struct loss_buffer));
866
867         task->bytes_to_tsc = prox_zmalloc(max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST, rte_lcore_to_socket_id(targ->lconf->id));
868         PROX_PANIC(task->bytes_to_tsc == NULL,
869                 "Failed to allocate %lu bytes (in huge pages) for bytes_to_tsc\n", max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST);
870
871         // There are cases where hz estimate might be slighly over-estimated
872         // This results in too much extrapolation
873         // Only account for 99% of extrapolation to handle cases with up to 1% error clocks
874         for (unsigned int i = 0; i < max_frame_size * MAX_PKT_BURST ; i++) {
875                 if (bytes_per_hz == UINT64_MAX)
876                         task->bytes_to_tsc[i] = 0;
877                 else
878                         task->bytes_to_tsc[i] = (rte_get_tsc_hz() * i * 0.99) / bytes_per_hz;
879         }
880 }
881
882 static struct task_init task_init_lat = {
883         .mode_str = "lat",
884         .init = init_task_lat,
885         .handle = handle_lat_bulk,
886         .start = lat_start,
887         .stop = lat_stop,
888         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
889         .size = sizeof(struct task_lat)
890 };
891
892 __attribute__((constructor)) static void reg_task_lat(void)
893 {
894         reg_task(&task_init_lat);
895 }