Fix reorder count after reset
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2019 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     11
38 #define ACCURACY_BUFFER_SIZE    (2 * ACCURACY_WINDOW)
39
40 struct lat_info {
41         uint32_t rx_packet_index;
42         uint64_t tx_packet_index;
43         uint32_t tx_err;
44         uint32_t rx_err;
45         uint64_t rx_time;
46         uint64_t tx_time;
47         uint16_t port_queue_id;
48 #ifdef LAT_DEBUG
49         uint16_t id_in_bulk;
50         uint16_t bulk_size;
51         uint64_t begin;
52         uint64_t after;
53         uint64_t before;
54 #endif
55 };
56
57 struct delayed_latency_entry {
58         uint32_t rx_packet_id;
59         uint32_t tx_packet_id;
60         uint32_t packet_id;
61         uint8_t generator_id;
62         uint64_t pkt_rx_time;
63         uint64_t pkt_tx_time;   // Time written into packets by gen. Unit is TSC >> LATENCY_ACCURACY
64         uint64_t rx_time_err;
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
68 {
69         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70         if (delayed_latency_entry->packet_id == packet_id)
71                 return delayed_latency_entry;
72         else
73                 return NULL;
74 }
75
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
77 {
78         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79         delayed_latency_entry->packet_id = packet_id;
80         return delayed_latency_entry;
81 }
82
83 struct rx_pkt_meta_data {
84         uint8_t  *hdr;
85         uint32_t pkt_tx_time;
86         uint32_t bytes_after_in_bulk;
87 };
88
89 struct task_lat {
90         struct task_base base;
91         uint64_t limit;
92         uint64_t rx_packet_index;
93         uint64_t last_pkts_tsc;
94         struct delayed_latency_entry **delayed_latency_entries;
95         struct lat_info *latency_buffer;
96         uint32_t latency_buffer_idx;
97         uint32_t latency_buffer_size;
98         uint64_t begin;
99         uint16_t lat_pos;
100         uint16_t unique_id_pos;
101         uint16_t accur_pos;
102         uint16_t sig_pos;
103         uint32_t sig;
104         volatile uint16_t use_lt; /* which lt to use, */
105         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
106         struct lat_test lt[2];
107         struct lat_test *lat_test;
108         uint32_t generator_count;
109         uint16_t min_pkt_len;
110         struct early_loss_detect *eld;
111         struct rx_pkt_meta_data *rx_pkt_meta;
112         // Following fields are only used when starting or stopping, not in general runtime
113         uint64_t *prev_tx_packet_index;
114         FILE *fp_rx;
115         FILE *fp_tx;
116         struct prox_port_cfg *port;
117         uint64_t *bytes_to_tsc;
118         uint64_t *previous_packet;
119 };
120 /* This function calculate the difference between rx and tx_time
121  * Both values are uint32_t (see handle_lat_bulk)
122  * rx time should be higher than tx_time...except every UINT32_MAX
123  * cycles, when rx_time overflows.
124  * As the return value is also uint32_t, returning (rx_time - tx_time)
125  * is also fine when it overflows.
126  */
127 static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
128 {
129         return rx_time - tx_time;
130 }
131
132 uint32_t task_lat_get_latency_bucket_size(struct task_lat *task)
133 {
134         return task->lat_test->bucket_size;
135 }
136
137 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
138 {
139         if (task->use_lt == task->using_lt)
140                 return &task->lt[!task->using_lt];
141         return NULL;
142 }
143
144 void task_lat_use_other_latency_meassurement(struct task_lat *task)
145 {
146         task->use_lt = !task->using_lt;
147 }
148
149 static void task_lat_update_lat_test(struct task_lat *task)
150 {
151         if (task->use_lt != task->using_lt) {
152                 task->using_lt = task->use_lt;
153                 task->lat_test = &task->lt[task->using_lt];
154                 task->lat_test->accuracy_limit_tsc = task->limit;
155         }
156 }
157
158 static int compare_tx_time(const void *val1, const void *val2)
159 {
160         const struct lat_info *ptr1 = val1;
161         const struct lat_info *ptr2 = val2;
162
163         return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
164 }
165
166 static int compare_tx_packet_index(const void *val1, const void *val2)
167 {
168         const struct lat_info *ptr1 = val1;
169         const struct lat_info *ptr2 = val2;
170
171         return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
172 }
173
174 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
175 {
176         uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
177         uint32_t small = UINT32_MAX >> 1;
178
179         lat++;
180
181         /* Buffer is sorted so far by RX time.
182          * We might have packets being reordered by SUT.
183          *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
184          * Note that:
185          *      - overflow only happens if receiving and storing 4 billions packets...
186          *      - a absolute difference of less than 2 billion packets is not considered as an overflow
187          */
188         for (uint32_t i = 1; i < count; i++) {
189                 tx_packet_index = lat->tx_packet_index;
190                 if (tx_packet_index > old_tx_packet_index) {
191                         if (tx_packet_index - old_tx_packet_index < small) {
192                                 // The diff is small => increasing index count
193                         } else {
194                                 // The diff is big => it is more likely that the previous packet was overflow
195                                 n_overflow--;
196                         }
197                 } else {
198                         if (old_tx_packet_index - tx_packet_index < small) {
199                                 // The diff is small => packet reorder
200                         } else {
201                                 // The diff is big => it is more likely that this is an overflow
202                                 n_overflow++;
203                         }
204                 }
205                 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
206                 old_tx_packet_index = tx_packet_index;
207                 lat++;
208         }
209 }
210
211 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
212 {
213         uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
214         uint32_t small = UINT32_MAX >> 1;
215         lat++;
216
217         /*
218          * Same algorithm as above, but with time.
219          * Note that:
220          *      - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
221          *      - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
222          *              => algorithm does not work if receiving less than 1 packet every 2 seconds
223          */
224         for (uint32_t i = 1; i < count; i++) {
225                 tx_time = lat->tx_time;
226                 if (tx_time > old_tx_time) {
227                         if (tx_time - old_tx_time > small) {
228                                 n_overflow--;
229                         }
230                 } else {
231                         if (old_tx_time - tx_time > small) {
232                                 n_overflow++;
233                         }
234                 }
235                 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
236                 old_tx_time = tx_time;
237                 lat++;
238         }
239 }
240
241 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
242 {
243         struct lat_test *lat_test = task->lat_test;
244
245         for (uint32_t j = 0; j < task->generator_count; j++) {
246                 struct early_loss_detect *eld = &task->eld[j];
247
248                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
249         }
250 }
251
252 static void task_lat_reset_eld(struct task_lat *task)
253 {
254         for (uint32_t j = 0; j < task->generator_count; j++) {
255                 early_loss_detect_reset(&task->eld[j]);
256         }
257 }
258
259 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
260 {
261         uint64_t min_tsc = UINT64_MAX;
262
263         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
264                 if (min_tsc > task->latency_buffer[i].tx_time)
265                         min_tsc = task->latency_buffer[i].tx_time;
266         }
267
268         return min_tsc << LATENCY_ACCURACY;
269 }
270
271 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
272 {
273         uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
274
275         return lat << LATENCY_ACCURACY;
276 }
277
278 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
279 {
280         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
281 }
282
283 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
284 {
285         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
286 }
287
288 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
289 {
290         return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
291 }
292
293 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
294 {
295         return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
296 }
297
298 static void lat_write_latency_to_file(struct task_lat *task)
299 {
300         uint64_t min_tsc;
301         uint64_t n_loss;
302
303         min_tsc = lat_latency_buffer_get_min_tsc(task);
304
305         // Dumping all packet statistics
306         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
307         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
308         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
309                 struct lat_info *lat_info = &task->latency_buffer[i];
310                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
311                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
312                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
313
314                 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
315                         lat_info->rx_packet_index,
316                         lat_info->port_queue_id,
317                         lat_info->tx_packet_index,
318                         tsc_to_nsec(lat_tsc),
319                         tsc_to_nsec(rx_tsc - min_tsc),
320                         tsc_to_nsec(tx_tsc - min_tsc));
321         }
322
323         // To detect dropped packets, we need to sort them based on TX
324         if (task->unique_id_pos) {
325                 plogx_info("Adapting tx_packet_index\n");
326                 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
327                 plogx_info("Sorting packets based on tx_packet_index\n");
328                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
329                 plogx_info("Sorted packets based on packet_index\n");
330         } else {
331                 plogx_info("Adapting tx_time\n");
332                 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
333                 plogx_info("Sorting packets based on tx_time\n");
334                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
335                 plogx_info("Sorted packets based on packet_time\n");
336         }
337
338         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
339         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
340         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
341
342         for (uint32_t i = 0; i < task->generator_count;i++)
343                 task->prev_tx_packet_index[i] = -1;
344
345         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
346                 struct lat_info *lat_info = &task->latency_buffer[i];
347                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
348                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
349                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
350                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
351                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
352
353                 /* Packet n + ACCURACY_WINDOW delivers the TX error for packet n,
354                    hence the last ACCURACY_WINDOW packets do no have TX error. */
355                 if (i + ACCURACY_WINDOW >= task->latency_buffer_idx) {
356                         tx_err_tsc = 0;
357                 }
358
359                 if (lat_info->port_queue_id >= task->generator_count) {
360                         plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
361                                 lat_info->port_queue_id, lat_info->tx_packet_index);
362                         continue;
363                 }
364                 // Log dropped packet
365                 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
366                 if (n_loss)
367                         fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
368                                 lat_info->port_queue_id,
369                                 lat_info->tx_packet_index - n_loss, n_loss);
370                 // Log next packet
371                 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
372                         lat_info->port_queue_id,
373                         lat_info->tx_packet_index,
374                         lat_info->rx_packet_index,
375                         tsc_to_nsec(lat_tsc),
376                         tsc_to_nsec(tx_tsc - min_tsc),
377                         tsc_to_nsec(rx_tsc - min_tsc),
378                         tsc_to_nsec(tx_err_tsc),
379                         tsc_to_nsec(rx_err_tsc));
380 #ifdef LAT_DEBUG
381                 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
382                         lat_info->id_in_bulk,
383                         lat_info->bulk_size,
384                         tsc_to_nsec(lat_info->begin - min_tsc),
385                         tsc_to_nsec(lat_info->before - min_tsc),
386                         tsc_to_nsec(lat_info->after - min_tsc));
387 #endif
388                 fprintf(task->fp_tx, "\n");
389                 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
390         }
391         fflush(task->fp_rx);
392         fflush(task->fp_tx);
393         task->latency_buffer_idx = 0;
394 }
395
396 static void lat_stop(struct task_base *tbase)
397 {
398         struct task_lat *task = (struct task_lat *)tbase;
399
400         if (task->unique_id_pos) {
401                 task_lat_count_remaining_lost_packets(task);
402                 task_lat_reset_eld(task);
403                 memset(task->previous_packet, 0, sizeof(task->previous_packet) * task->generator_count);
404         }
405         if (task->latency_buffer)
406                 lat_write_latency_to_file(task);
407 }
408
409 #ifdef LAT_DEBUG
410 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
411 {
412         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
413
414         lat_info->bulk_size = bulk_size;
415         lat_info->id_in_bulk = id_in_bulk;
416         lat_info->begin = task->begin;
417         lat_info->before = task->base.aux->tsc_rx.before;
418         lat_info->after = task->base.aux->tsc_rx.after;
419 }
420 #endif
421
422 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
423 {
424         struct lat_info *lat_info;
425
426         /* If unique_id_pos is specified then latency is stored per
427            packet being sent. Lost packets are detected runtime, and
428            latency stored for those packets will be 0 */
429         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
430         lat_info->rx_packet_index = rx_packet_index;
431         lat_info->tx_packet_index = packet_id;
432         lat_info->port_queue_id = generator_id;
433         lat_info->rx_time = rx_time;
434         lat_info->tx_time = tx_time;
435         lat_info->rx_err = rx_err;
436         lat_info->tx_err = tx_err;
437 }
438
439 static uint32_t task_lat_early_loss_detect(struct task_lat *task, uint32_t packet_id, uint8_t generator_id)
440 {
441         struct early_loss_detect *eld = &task->eld[generator_id];
442         return early_loss_detect_add(eld, packet_id);
443 }
444
445 static void lat_test_check_duplicate(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
446 {
447         struct early_loss_detect *eld = &task->eld[generator_id];
448         uint32_t old_queue_id, queue_pos;
449
450         queue_pos = packet_id & PACKET_QUEUE_MASK;
451         old_queue_id = eld->entries[queue_pos];
452         if ((packet_id >> PACKET_QUEUE_BITS) == old_queue_id)
453                 lat_test->duplicate++;
454 }
455
456 static uint64_t tsc_extrapolate_backward(struct task_lat *task, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
457 {
458 #ifdef NO_LAT_EXTRAPOLATION
459         uint64_t tsc = tsc_from;
460 #else
461         uint64_t tsc = tsc_from - task->bytes_to_tsc[bytes];
462 #endif
463         if (likely(tsc > tsc_minimum))
464                 return tsc;
465         else
466                 return tsc_minimum;
467 }
468
469 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
470 {
471         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
472         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
473
474         bucket_id = bucket_id < bucket_count? bucket_id : (bucket_count - 1);
475         lat_test->buckets[bucket_id]++;
476 }
477
478 static void lat_test_check_ordering(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
479 {
480         if (packet_id < task->previous_packet[generator_id]) {
481                 lat_test->mis_ordered++;
482                 lat_test->extent += task->previous_packet[generator_id] - packet_id;
483         }
484         task->previous_packet[generator_id] = packet_id;
485 }
486
487 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
488 {
489         lat_test->lost_packets += lost_packets;
490 }
491
492 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
493 {
494         if (error > lat_test->accuracy_limit_tsc)
495                 return;
496         lat_test->tot_pkts++;
497
498         lat_test->tot_lat += lat_tsc;
499         lat_test->tot_lat_error += error;
500
501         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
502         lat_test->var_lat += lat_tsc * lat_tsc;
503         lat_test->var_lat_error += 2 * lat_tsc * error;
504         lat_test->var_lat_error += error * error;
505
506         if (lat_tsc > lat_test->max_lat) {
507                 lat_test->max_lat = lat_tsc;
508                 lat_test->max_lat_error = error;
509         }
510         if (lat_tsc < lat_test->min_lat) {
511                 lat_test->min_lat = lat_tsc;
512                 lat_test->min_lat_error = error;
513         }
514
515 #ifdef LATENCY_HISTOGRAM
516         lat_test_histogram_add(lat_test, lat_tsc);
517 #endif
518 }
519
520 static int task_lat_can_store_latency(struct task_lat *task)
521 {
522         return task->latency_buffer_idx < task->latency_buffer_size;
523 }
524
525 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
526 {
527         uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
528
529         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
530
531         if (task_lat_can_store_latency(task)) {
532                 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
533         }
534 }
535
536 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
537 {
538         struct task_lat *task = (struct task_lat *)tbase;
539         int rc;
540
541         if (n_pkts == 0) {
542                 task->begin = tbase->aux->tsc_rx.before;
543                 return 0;
544         }
545
546         task_lat_update_lat_test(task);
547
548         // Remember those packets with bad length or bad signature
549         uint32_t non_dp_count = 0;
550         uint64_t pkt_bad_len_sig = 0;
551 #define BIT64_SET(a64, bit)     a64 |=  (((uint64_t)1) << (bit & 63))
552 #define BIT64_CLR(a64, bit)     a64 &= ~(((uint64_t)1) << (bit & 63))
553 #define BIT64_TEST(a64, bit)    a64  &  (((uint64_t)1) << (bit & 63))
554
555         /* Go once through all received packets and read them.  If
556            packet has just been modified by another core, the cost of
557            latency will be partialy amortized though the bulk size */
558         for (uint16_t j = 0; j < n_pkts; ++j) {
559                 struct rte_mbuf *mbuf = mbufs[j];
560                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
561
562                 // Remember those packets which are too short to hold the values that we expect
563                 if (unlikely(rte_pktmbuf_pkt_len(mbuf) < task->min_pkt_len)) {
564                         BIT64_SET(pkt_bad_len_sig, j);
565                         non_dp_count++;
566                 } else
567                         BIT64_CLR(pkt_bad_len_sig, j);
568         }
569
570         if (task->sig_pos) {
571                 for (uint16_t j = 0; j < n_pkts; ++j) {
572                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
573                                 continue;
574                         // Remember those packets with bad signature
575                         if (likely(*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig))
576                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
577                         else {
578                                 BIT64_SET(pkt_bad_len_sig, j);
579                                 non_dp_count++;
580                         }
581                 }
582         } else {
583                 for (uint16_t j = 0; j < n_pkts; ++j) {
584                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
585                                 continue;
586                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
587                 }
588         }
589
590         uint32_t bytes_total_in_bulk = 0;
591         // Find RX time of first packet, for RX accuracy
592         for (uint16_t j = 0; j < n_pkts; ++j) {
593                 uint16_t flipped = n_pkts - 1 - j;
594
595                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
596                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
597         }
598
599         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
600
601         uint64_t rx_time_err;
602         uint64_t pkt_rx_time64 = tsc_extrapolate_backward(task, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
603         if (unlikely((task->begin >> LATENCY_ACCURACY) > pkt_rx_time64)) {
604                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
605                 rx_time_err = pkt_rx_time64 - (task->last_pkts_tsc >> LATENCY_ACCURACY);
606         } else {
607                 rx_time_err = pkt_rx_time64 - (task->begin >> LATENCY_ACCURACY);
608         }
609
610         TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, non_dp_count);
611         for (uint16_t j = 0; j < n_pkts; ++j) {
612                 // Used to display % of packets within accuracy limit vs. total number of packets (used_col)
613                 task->lat_test->tot_all_pkts++;
614
615                 // Skip those packets with bad length or bad signature
616                 if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
617                         continue;
618
619                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
620                 uint8_t *hdr = rx_pkt_meta->hdr;
621
622                 uint32_t pkt_rx_time = tsc_extrapolate_backward(task, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
623                 uint32_t pkt_tx_time = rx_pkt_meta->pkt_tx_time;
624
625                 uint8_t generator_id;
626                 uint32_t packet_id;
627                 if (task->unique_id_pos) {
628                         struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
629                         unique_id_get(unique_id, &generator_id, &packet_id);
630
631                         if (unlikely(generator_id >= task->generator_count)) {
632                                 /* No need to remember unexpected packet at this stage
633                                 BIT64_SET(pkt_bad_len_sig, j);
634                                 */
635                                 // Skip unexpected packet
636                                 continue;
637                         }
638                         lat_test_check_ordering(task, task->lat_test, packet_id, generator_id);
639                         lat_test_check_duplicate(task, task->lat_test, packet_id, generator_id);
640                         lat_test_add_lost(task->lat_test, task_lat_early_loss_detect(task, packet_id, generator_id));
641                 } else {
642                         generator_id = 0;
643                         packet_id = task->rx_packet_index;
644                 }
645
646                 /* If accuracy is enabled, latency is reported with a
647                    delay of ACCURACY_WINDOW packets since the generator puts the
648                    accuracy for packet N into packet N + ACCURACY_WINDOW. The delay
649                    ensures that all reported latencies have both rx
650                    and tx error. */
651                 if (task->accur_pos) {
652                         uint32_t tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
653
654                         struct delayed_latency_entry *delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_WINDOW);
655
656                         if (delayed_latency_entry) {
657                                 task_lat_store_lat(task,
658                                                    delayed_latency_entry->rx_packet_id,
659                                                    delayed_latency_entry->pkt_rx_time,
660                                                    delayed_latency_entry->pkt_tx_time,
661                                                    delayed_latency_entry->rx_time_err,
662                                                    tx_time_err,
663                                                    delayed_latency_entry->tx_packet_id,
664                                                    delayed_latency_entry->generator_id);
665                         }
666
667                         delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
668                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
669                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
670                         delayed_latency_entry->rx_time_err = rx_time_err;
671                         delayed_latency_entry->rx_packet_id = task->rx_packet_index;
672                         delayed_latency_entry->tx_packet_id = packet_id;
673                         delayed_latency_entry->generator_id = generator_id;
674                 } else {
675                         task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
676                 }
677
678                 // Bad/unexpected packets do not need to be indexed
679                 task->rx_packet_index++;
680         }
681
682         if (n_pkts < MAX_PKT_BURST)
683                 task->begin = tbase->aux->tsc_rx.before;
684         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
685
686         rc = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
687         // non_dp_count should not be drop-handled, as there are all by definition considered as not handled
688         // RX = DISCARDED + HANDLED + NON_DP + (TX - TX_NON_DP) + TX_FAIL
689         TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, -non_dp_count);
690         return rc;
691 }
692
693 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
694 {
695         const int socket_id = rte_lcore_to_socket_id(core_id);
696         char name[256];
697         size_t latency_buffer_mem_size = 0;
698
699         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
700                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
701
702         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
703
704         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
705         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
706
707         sprintf(name, "latency.rx_%u.txt", core_id);
708         task->fp_rx = fopen(name, "w+");
709         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
710
711         sprintf(name, "latency.tx_%u.txt", core_id);
712         task->fp_tx = fopen(name, "w+");
713         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
714
715         task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
716         PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
717 }
718
719 static void task_init_generator_count(struct task_lat *task)
720 {
721         uint8_t *generator_count = prox_sh_find_system("generator_count");
722
723         if (generator_count == NULL) {
724                 task->generator_count = 1;
725                 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
726         } else
727                 task->generator_count = *generator_count;
728         plog_info("\t\tLatency using %u generators\n", task->generator_count);
729 }
730
731 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
732 {
733         size_t eld_mem_size;
734
735         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
736         task->eld = prox_zmalloc(eld_mem_size, socket_id);
737         PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
738 }
739
740 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
741 {
742         task->limit = nsec_to_tsc(accuracy_limit_nsec);
743 }
744
745 static void lat_start(struct task_base *tbase)
746 {
747         struct task_lat *task = (struct task_lat *)tbase;
748
749 }
750
751 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
752 {
753         struct task_lat *task = (struct task_lat *)tbase;
754         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
755
756         task->lat_pos = targ->lat_pos;
757         task->accur_pos = targ->accur_pos;
758         task->sig_pos = targ->sig_pos;
759         task->sig = targ->sig;
760
761         task->unique_id_pos = targ->packet_id_pos;
762         task->latency_buffer_size = targ->latency_buffer_size;
763
764         PROX_PANIC(task->lat_pos == 0, "Missing 'lat pos' parameter in config file\n");
765         uint16_t min_pkt_len = task->lat_pos + sizeof(uint32_t);
766         if (task->unique_id_pos && (
767                 min_pkt_len < task->unique_id_pos + sizeof(struct unique_id)))
768                 min_pkt_len = task->unique_id_pos + sizeof(struct unique_id);
769         if (task->accur_pos && (
770                 min_pkt_len < task->accur_pos + sizeof(uint32_t)))
771                 min_pkt_len = task->accur_pos + sizeof(uint32_t);
772         if (task->sig_pos && (
773                 min_pkt_len < task->sig_pos + sizeof(uint32_t)))
774                 min_pkt_len = task->sig_pos + sizeof(uint32_t);
775         task->min_pkt_len = min_pkt_len;
776
777         task_init_generator_count(task);
778
779         if (task->latency_buffer_size) {
780                 init_task_lat_latency_buffer(task, targ->lconf->id);
781         }
782
783         if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
784                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
785         }
786
787         if (task->accur_pos) {
788                 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
789                 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
790                 for (uint i = 0; i < task->generator_count; i++) {
791                         task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
792                         PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
793                 }
794                 if (task->unique_id_pos == 0) {
795                         /* When using accuracy feature, the accuracy from TX is written ACCURACY_WINDOW packets later
796                         * We can only retrieve the good packet if a packet id is written to it.
797                         * Otherwise we will use the packet RECEIVED ACCURACY_WINDOW packets ago which is OK if
798                         * packets are not re-ordered. If packets are re-ordered, then the matching between
799                         * the TX accuracy and the latency is wrong.
800                         */
801                         plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
802                 }
803         }
804
805         task->lt[0].min_lat = -1;
806         task->lt[1].min_lat = -1;
807         task->lt[0].bucket_size = targ->bucket_size;
808         task->lt[1].bucket_size = targ->bucket_size;
809         if (task->unique_id_pos) {
810                 task_lat_init_eld(task, socket_id);
811                 task_lat_reset_eld(task);
812                 task->previous_packet = prox_zmalloc(sizeof(task->previous_packet) * task->generator_count , socket_id);
813                 PROX_PANIC(task->previous_packet == NULL, "Failed to allocate array for storing previous packet\n");
814         }
815         task->lat_test = &task->lt[task->using_lt];
816
817         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
818         task->rx_pkt_meta = prox_zmalloc(MAX_PKT_BURST * sizeof(*task->rx_pkt_meta), socket_id);
819         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
820
821         uint32_t max_frame_size = MAX_PKT_SIZE;
822         uint64_t bytes_per_hz = UINT64_MAX;
823         if (targ->nb_rxports) {
824                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
825                 max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE;
826
827                 // port->max_link_speed reports the maximum, non negotiated ink speed in Mbps e.g. 40k for a 40 Gbps NIC.
828                 // It can be UINT32_MAX (virtual devices or not supported by DPDK < 16.04)
829                 if (port->max_link_speed != UINT32_MAX) {
830                         bytes_per_hz = port->max_link_speed * 125000L;
831                         plog_info("\t\tPort %u: max link speed is %ld Mbps\n",
832                                 (uint8_t)(port - prox_port_cfg), 8 * bytes_per_hz / 1000000);
833                 }
834         }
835         task->bytes_to_tsc = prox_zmalloc(max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST, rte_lcore_to_socket_id(targ->lconf->id));
836         PROX_PANIC(task->bytes_to_tsc == NULL,
837                 "Failed to allocate %u bytes (in huge pages) for bytes_to_tsc\n", max_frame_size);
838
839         // There are cases where hz estimate might be slighly over-estimated
840         // This results in too much extrapolation
841         // Only account for 99% of extrapolation to handle cases with up to 1% error clocks
842         for (unsigned int i = 0; i < max_frame_size * MAX_PKT_BURST ; i++) {
843                 if (bytes_per_hz == UINT64_MAX)
844                         task->bytes_to_tsc[i] = 0;
845                 else
846                         task->bytes_to_tsc[i] = (rte_get_tsc_hz() * i * 0.99) / bytes_per_hz;
847         }
848 }
849
850 static struct task_init task_init_lat = {
851         .mode_str = "lat",
852         .init = init_task_lat,
853         .handle = handle_lat_bulk,
854         .start = lat_start,
855         .stop = lat_stop,
856         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
857         .size = sizeof(struct task_lat)
858 };
859
860 __attribute__((constructor)) static void reg_task_lat(void)
861 {
862         reg_task(&task_init_lat);
863 }