Code rewrite and Python3 support
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2019 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     11
38 #define ACCURACY_BUFFER_SIZE    (2 * ACCURACY_WINDOW)
39
40 struct lat_info {
41         uint32_t rx_packet_index;
42         uint64_t tx_packet_index;
43         uint32_t tx_err;
44         uint32_t rx_err;
45         uint64_t rx_time;
46         uint64_t tx_time;
47         uint16_t port_queue_id;
48 #ifdef LAT_DEBUG
49         uint16_t id_in_bulk;
50         uint16_t bulk_size;
51         uint64_t begin;
52         uint64_t after;
53         uint64_t before;
54 #endif
55 };
56
57 struct delayed_latency_entry {
58         uint32_t rx_packet_id;
59         uint32_t tx_packet_id;
60         uint32_t packet_id;
61         uint8_t generator_id;
62         uint64_t pkt_rx_time;
63         uint64_t pkt_tx_time;   // Time written into packets by gen. Unit is TSC >> LATENCY_ACCURACY
64         uint64_t rx_time_err;
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
68 {
69         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70         if (delayed_latency_entry->packet_id == packet_id)
71                 return delayed_latency_entry;
72         else
73                 return NULL;
74 }
75
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
77 {
78         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79         delayed_latency_entry->packet_id = packet_id;
80         return delayed_latency_entry;
81 }
82
83 struct rx_pkt_meta_data {
84         uint8_t  *hdr;
85         uint32_t pkt_tx_time;
86         uint32_t bytes_after_in_bulk;
87 };
88
89 struct task_lat {
90         struct task_base base;
91         uint64_t limit;
92         uint64_t rx_packet_index;
93         uint64_t last_pkts_tsc;
94         struct delayed_latency_entry **delayed_latency_entries;
95         struct lat_info *latency_buffer;
96         uint32_t latency_buffer_idx;
97         uint32_t latency_buffer_size;
98         uint64_t begin;
99         uint16_t lat_pos;
100         uint16_t unique_id_pos;
101         uint16_t accur_pos;
102         uint16_t sig_pos;
103         uint32_t sig;
104         volatile uint16_t use_lt; /* which lt to use, */
105         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
106         struct lat_test lt[2];
107         struct lat_test *lat_test;
108         uint32_t generator_count;
109         uint16_t min_pkt_len;
110         struct early_loss_detect *eld;
111         struct rx_pkt_meta_data *rx_pkt_meta;
112         // Following fields are only used when starting or stopping, not in general runtime
113         uint64_t *prev_tx_packet_index;
114         FILE *fp_rx;
115         FILE *fp_tx;
116         struct prox_port_cfg *port;
117         uint64_t *bytes_to_tsc;
118 };
119 /* This function calculate the difference between rx and tx_time
120  * Both values are uint32_t (see handle_lat_bulk)
121  * rx time should be higher than tx_time...except every UINT32_MAX
122  * cycles, when rx_time overflows.
123  * As the return value is also uint32_t, returning (rx_time - tx_time)
124  * is also fine when it overflows.
125  */
126 static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
127 {
128         return rx_time - tx_time;
129 }
130
131 uint32_t task_lat_get_latency_bucket_size(struct task_lat *task)
132 {
133         return task->lat_test->bucket_size;
134 }
135
136 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
137 {
138         if (task->use_lt == task->using_lt)
139                 return &task->lt[!task->using_lt];
140         return NULL;
141 }
142
143 void task_lat_use_other_latency_meassurement(struct task_lat *task)
144 {
145         task->use_lt = !task->using_lt;
146 }
147
148 static void task_lat_update_lat_test(struct task_lat *task)
149 {
150         if (task->use_lt != task->using_lt) {
151                 task->using_lt = task->use_lt;
152                 task->lat_test = &task->lt[task->using_lt];
153                 task->lat_test->accuracy_limit_tsc = task->limit;
154         }
155 }
156
157 static int compare_tx_time(const void *val1, const void *val2)
158 {
159         const struct lat_info *ptr1 = val1;
160         const struct lat_info *ptr2 = val2;
161
162         return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
163 }
164
165 static int compare_tx_packet_index(const void *val1, const void *val2)
166 {
167         const struct lat_info *ptr1 = val1;
168         const struct lat_info *ptr2 = val2;
169
170         return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
171 }
172
173 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
174 {
175         uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
176         uint32_t small = UINT32_MAX >> 1;
177
178         lat++;
179
180         /* Buffer is sorted so far by RX time.
181          * We might have packets being reordered by SUT.
182          *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
183          * Note that:
184          *      - overflow only happens if receiving and storing 4 billions packets...
185          *      - a absolute difference of less than 2 billion packets is not considered as an overflow
186          */
187         for (uint32_t i = 1; i < count; i++) {
188                 tx_packet_index = lat->tx_packet_index;
189                 if (tx_packet_index > old_tx_packet_index) {
190                         if (tx_packet_index - old_tx_packet_index < small) {
191                                 // The diff is small => increasing index count
192                         } else {
193                                 // The diff is big => it is more likely that the previous packet was overflow
194                                 n_overflow--;
195                         }
196                 } else {
197                         if (old_tx_packet_index - tx_packet_index < small) {
198                                 // The diff is small => packet reorder
199                         } else {
200                                 // The diff is big => it is more likely that this is an overflow
201                                 n_overflow++;
202                         }
203                 }
204                 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
205                 old_tx_packet_index = tx_packet_index;
206                 lat++;
207         }
208 }
209
210 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
211 {
212         uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
213         uint32_t small = UINT32_MAX >> 1;
214         lat++;
215
216         /*
217          * Same algorithm as above, but with time.
218          * Note that:
219          *      - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
220          *      - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
221          *              => algorithm does not work if receiving less than 1 packet every 2 seconds
222          */
223         for (uint32_t i = 1; i < count; i++) {
224                 tx_time = lat->tx_time;
225                 if (tx_time > old_tx_time) {
226                         if (tx_time - old_tx_time > small) {
227                                 n_overflow--;
228                         }
229                 } else {
230                         if (old_tx_time - tx_time > small) {
231                                 n_overflow++;
232                         }
233                 }
234                 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
235                 old_tx_time = tx_time;
236                 lat++;
237         }
238 }
239
240 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
241 {
242         struct lat_test *lat_test = task->lat_test;
243
244         for (uint32_t j = 0; j < task->generator_count; j++) {
245                 struct early_loss_detect *eld = &task->eld[j];
246
247                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
248         }
249 }
250
251 static void task_lat_reset_eld(struct task_lat *task)
252 {
253         for (uint32_t j = 0; j < task->generator_count; j++) {
254                 early_loss_detect_reset(&task->eld[j]);
255         }
256 }
257
258 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
259 {
260         uint64_t min_tsc = UINT64_MAX;
261
262         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
263                 if (min_tsc > task->latency_buffer[i].tx_time)
264                         min_tsc = task->latency_buffer[i].tx_time;
265         }
266
267         return min_tsc << LATENCY_ACCURACY;
268 }
269
270 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
271 {
272         uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
273
274         return lat << LATENCY_ACCURACY;
275 }
276
277 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
278 {
279         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
280 }
281
282 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
283 {
284         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
285 }
286
287 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
288 {
289         return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
290 }
291
292 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
293 {
294         return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
295 }
296
297 static void lat_write_latency_to_file(struct task_lat *task)
298 {
299         uint64_t min_tsc;
300         uint64_t n_loss;
301
302         min_tsc = lat_latency_buffer_get_min_tsc(task);
303
304         // Dumping all packet statistics
305         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
306         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
307         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
308                 struct lat_info *lat_info = &task->latency_buffer[i];
309                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
310                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
311                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
312
313                 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
314                         lat_info->rx_packet_index,
315                         lat_info->port_queue_id,
316                         lat_info->tx_packet_index,
317                         tsc_to_nsec(lat_tsc),
318                         tsc_to_nsec(rx_tsc - min_tsc),
319                         tsc_to_nsec(tx_tsc - min_tsc));
320         }
321
322         // To detect dropped packets, we need to sort them based on TX
323         if (task->unique_id_pos) {
324                 plogx_info("Adapting tx_packet_index\n");
325                 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
326                 plogx_info("Sorting packets based on tx_packet_index\n");
327                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
328                 plogx_info("Sorted packets based on packet_index\n");
329         } else {
330                 plogx_info("Adapting tx_time\n");
331                 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
332                 plogx_info("Sorting packets based on tx_time\n");
333                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
334                 plogx_info("Sorted packets based on packet_time\n");
335         }
336
337         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
338         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
339         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
340
341         for (uint32_t i = 0; i < task->generator_count;i++)
342                 task->prev_tx_packet_index[i] = -1;
343
344         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
345                 struct lat_info *lat_info = &task->latency_buffer[i];
346                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
347                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
348                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
349                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
350                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
351
352                 /* Packet n + ACCURACY_WINDOW delivers the TX error for packet n,
353                    hence the last ACCURACY_WINDOW packets do no have TX error. */
354                 if (i + ACCURACY_WINDOW >= task->latency_buffer_idx) {
355                         tx_err_tsc = 0;
356                 }
357
358                 if (lat_info->port_queue_id >= task->generator_count) {
359                         plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
360                                 lat_info->port_queue_id, lat_info->tx_packet_index);
361                         continue;
362                 }
363                 // Log dropped packet
364                 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
365                 if (n_loss)
366                         fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
367                                 lat_info->port_queue_id,
368                                 lat_info->tx_packet_index - n_loss, n_loss);
369                 // Log next packet
370                 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
371                         lat_info->port_queue_id,
372                         lat_info->tx_packet_index,
373                         lat_info->rx_packet_index,
374                         tsc_to_nsec(lat_tsc),
375                         tsc_to_nsec(tx_tsc - min_tsc),
376                         tsc_to_nsec(rx_tsc - min_tsc),
377                         tsc_to_nsec(tx_err_tsc),
378                         tsc_to_nsec(rx_err_tsc));
379 #ifdef LAT_DEBUG
380                 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
381                         lat_info->id_in_bulk,
382                         lat_info->bulk_size,
383                         tsc_to_nsec(lat_info->begin - min_tsc),
384                         tsc_to_nsec(lat_info->before - min_tsc),
385                         tsc_to_nsec(lat_info->after - min_tsc));
386 #endif
387                 fprintf(task->fp_tx, "\n");
388                 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
389         }
390         fflush(task->fp_rx);
391         fflush(task->fp_tx);
392         task->latency_buffer_idx = 0;
393 }
394
395 static void lat_stop(struct task_base *tbase)
396 {
397         struct task_lat *task = (struct task_lat *)tbase;
398
399         if (task->unique_id_pos) {
400                 task_lat_count_remaining_lost_packets(task);
401                 task_lat_reset_eld(task);
402         }
403         if (task->latency_buffer)
404                 lat_write_latency_to_file(task);
405 }
406
407 #ifdef LAT_DEBUG
408 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
409 {
410         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
411
412         lat_info->bulk_size = bulk_size;
413         lat_info->id_in_bulk = id_in_bulk;
414         lat_info->begin = task->begin;
415         lat_info->before = task->base.aux->tsc_rx.before;
416         lat_info->after = task->base.aux->tsc_rx.after;
417 }
418 #endif
419
420 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
421 {
422         struct lat_info *lat_info;
423
424         /* If unique_id_pos is specified then latency is stored per
425            packet being sent. Lost packets are detected runtime, and
426            latency stored for those packets will be 0 */
427         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
428         lat_info->rx_packet_index = rx_packet_index;
429         lat_info->tx_packet_index = packet_id;
430         lat_info->port_queue_id = generator_id;
431         lat_info->rx_time = rx_time;
432         lat_info->tx_time = tx_time;
433         lat_info->rx_err = rx_err;
434         lat_info->tx_err = tx_err;
435 }
436
437 static uint32_t task_lat_early_loss_detect(struct task_lat *task, uint32_t packet_id, uint8_t generator_id)
438 {
439         struct early_loss_detect *eld = &task->eld[generator_id];
440         return early_loss_detect_add(eld, packet_id);
441 }
442
443 static uint64_t tsc_extrapolate_backward(struct task_lat *task, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
444 {
445 #ifdef NO_LAT_EXTRAPOLATION
446         uint64_t tsc = tsc_from;
447 #else
448         uint64_t tsc = tsc_from - task->bytes_to_tsc[bytes];
449 #endif
450         if (likely(tsc > tsc_minimum))
451                 return tsc;
452         else
453                 return tsc_minimum;
454 }
455
456 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
457 {
458         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
459         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
460
461         bucket_id = bucket_id < bucket_count? bucket_id : (bucket_count - 1);
462         lat_test->buckets[bucket_id]++;
463 }
464
465 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
466 {
467         lat_test->lost_packets += lost_packets;
468 }
469
470 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
471 {
472         if (error > lat_test->accuracy_limit_tsc)
473                 return;
474         lat_test->tot_pkts++;
475
476         lat_test->tot_lat += lat_tsc;
477         lat_test->tot_lat_error += error;
478
479         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
480         lat_test->var_lat += lat_tsc * lat_tsc;
481         lat_test->var_lat_error += 2 * lat_tsc * error;
482         lat_test->var_lat_error += error * error;
483
484         if (lat_tsc > lat_test->max_lat) {
485                 lat_test->max_lat = lat_tsc;
486                 lat_test->max_lat_error = error;
487         }
488         if (lat_tsc < lat_test->min_lat) {
489                 lat_test->min_lat = lat_tsc;
490                 lat_test->min_lat_error = error;
491         }
492
493 #ifdef LATENCY_HISTOGRAM
494         lat_test_histogram_add(lat_test, lat_tsc);
495 #endif
496 }
497
498 static int task_lat_can_store_latency(struct task_lat *task)
499 {
500         return task->latency_buffer_idx < task->latency_buffer_size;
501 }
502
503 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
504 {
505         uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
506
507         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
508
509         if (task_lat_can_store_latency(task)) {
510                 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
511         }
512 }
513
514 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
515 {
516         struct task_lat *task = (struct task_lat *)tbase;
517         int rc;
518
519         if (n_pkts == 0) {
520                 task->begin = tbase->aux->tsc_rx.before;
521                 return 0;
522         }
523
524         task_lat_update_lat_test(task);
525
526         // Remember those packets with bad length or bad signature
527         uint32_t non_dp_count = 0;
528         uint64_t pkt_bad_len_sig = 0;
529 #define BIT64_SET(a64, bit)     a64 |=  (((uint64_t)1) << (bit & 63))
530 #define BIT64_CLR(a64, bit)     a64 &= ~(((uint64_t)1) << (bit & 63))
531 #define BIT64_TEST(a64, bit)    a64  &  (((uint64_t)1) << (bit & 63))
532
533         /* Go once through all received packets and read them.  If
534            packet has just been modified by another core, the cost of
535            latency will be partialy amortized though the bulk size */
536         for (uint16_t j = 0; j < n_pkts; ++j) {
537                 struct rte_mbuf *mbuf = mbufs[j];
538                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
539
540                 // Remember those packets which are too short to hold the values that we expect
541                 if (unlikely(rte_pktmbuf_pkt_len(mbuf) < task->min_pkt_len)) {
542                         BIT64_SET(pkt_bad_len_sig, j);
543                         non_dp_count++;
544                 } else
545                         BIT64_CLR(pkt_bad_len_sig, j);
546         }
547
548         if (task->sig_pos) {
549                 for (uint16_t j = 0; j < n_pkts; ++j) {
550                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
551                                 continue;
552                         // Remember those packets with bad signature
553                         if (likely(*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig))
554                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
555                         else {
556                                 BIT64_SET(pkt_bad_len_sig, j);
557                                 non_dp_count++;
558                         }
559                 }
560         } else {
561                 for (uint16_t j = 0; j < n_pkts; ++j) {
562                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
563                                 continue;
564                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
565                 }
566         }
567
568         uint32_t bytes_total_in_bulk = 0;
569         // Find RX time of first packet, for RX accuracy
570         for (uint16_t j = 0; j < n_pkts; ++j) {
571                 uint16_t flipped = n_pkts - 1 - j;
572
573                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
574                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
575         }
576
577         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
578
579         uint64_t rx_time_err;
580         uint64_t pkt_rx_time64 = tsc_extrapolate_backward(task, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
581         if (unlikely((task->begin >> LATENCY_ACCURACY) > pkt_rx_time64)) {
582                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
583                 rx_time_err = pkt_rx_time64 - (task->last_pkts_tsc >> LATENCY_ACCURACY);
584         } else {
585                 rx_time_err = pkt_rx_time64 - (task->begin >> LATENCY_ACCURACY);
586         }
587
588         TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, non_dp_count);
589         for (uint16_t j = 0; j < n_pkts; ++j) {
590                 // Used to display % of packets within accuracy limit vs. total number of packets (used_col)
591                 task->lat_test->tot_all_pkts++;
592
593                 // Skip those packets with bad length or bad signature
594                 if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
595                         continue;
596
597                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
598                 uint8_t *hdr = rx_pkt_meta->hdr;
599
600                 uint32_t pkt_rx_time = tsc_extrapolate_backward(task, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
601                 uint32_t pkt_tx_time = rx_pkt_meta->pkt_tx_time;
602
603                 uint8_t generator_id;
604                 uint32_t packet_id;
605                 if (task->unique_id_pos) {
606                         struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
607                         unique_id_get(unique_id, &generator_id, &packet_id);
608
609                         if (unlikely(generator_id >= task->generator_count)) {
610                                 /* No need to remember unexpected packet at this stage
611                                 BIT64_SET(pkt_bad_len_sig, j);
612                                 */
613                                 // Skip unexpected packet
614                                 continue;
615                         }
616
617                         lat_test_add_lost(task->lat_test, task_lat_early_loss_detect(task, packet_id, generator_id));
618                 } else {
619                         generator_id = 0;
620                         packet_id = task->rx_packet_index;
621                 }
622
623                 /* If accuracy is enabled, latency is reported with a
624                    delay of ACCURACY_WINDOW packets since the generator puts the
625                    accuracy for packet N into packet N + ACCURACY_WINDOW. The delay
626                    ensures that all reported latencies have both rx
627                    and tx error. */
628                 if (task->accur_pos) {
629                         uint32_t tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
630
631                         struct delayed_latency_entry *delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_WINDOW);
632
633                         if (delayed_latency_entry) {
634                                 task_lat_store_lat(task,
635                                                    delayed_latency_entry->rx_packet_id,
636                                                    delayed_latency_entry->pkt_rx_time,
637                                                    delayed_latency_entry->pkt_tx_time,
638                                                    delayed_latency_entry->rx_time_err,
639                                                    tx_time_err,
640                                                    delayed_latency_entry->tx_packet_id,
641                                                    delayed_latency_entry->generator_id);
642                         }
643
644                         delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
645                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
646                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
647                         delayed_latency_entry->rx_time_err = rx_time_err;
648                         delayed_latency_entry->rx_packet_id = task->rx_packet_index;
649                         delayed_latency_entry->tx_packet_id = packet_id;
650                         delayed_latency_entry->generator_id = generator_id;
651                 } else {
652                         task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
653                 }
654
655                 // Bad/unexpected packets do not need to be indexed
656                 task->rx_packet_index++;
657         }
658
659         if (n_pkts < MAX_PKT_BURST)
660                 task->begin = tbase->aux->tsc_rx.before;
661         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
662
663         rc = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
664         // non_dp_count should not be drop-handled, as there are all by definition considered as not handled
665         // RX = DISCARDED + HANDLED + NON_DP + (TX - TX_NON_DP) + TX_FAIL
666         TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, -non_dp_count);
667         return rc;
668 }
669
670 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
671 {
672         const int socket_id = rte_lcore_to_socket_id(core_id);
673         char name[256];
674         size_t latency_buffer_mem_size = 0;
675
676         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
677                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
678
679         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
680
681         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
682         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
683
684         sprintf(name, "latency.rx_%u.txt", core_id);
685         task->fp_rx = fopen(name, "w+");
686         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
687
688         sprintf(name, "latency.tx_%u.txt", core_id);
689         task->fp_tx = fopen(name, "w+");
690         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
691
692         task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
693         PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
694 }
695
696 static void task_init_generator_count(struct task_lat *task)
697 {
698         uint8_t *generator_count = prox_sh_find_system("generator_count");
699
700         if (generator_count == NULL) {
701                 task->generator_count = 1;
702                 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
703         } else
704                 task->generator_count = *generator_count;
705         plog_info("\tLatency using %u generators\n", task->generator_count);
706 }
707
708 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
709 {
710         size_t eld_mem_size;
711
712         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
713         task->eld = prox_zmalloc(eld_mem_size, socket_id);
714         PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
715 }
716
717 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
718 {
719         task->limit = nsec_to_tsc(accuracy_limit_nsec);
720 }
721
722 static void lat_start(struct task_base *tbase)
723 {
724         struct task_lat *task = (struct task_lat *)tbase;
725
726 }
727
728 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
729 {
730         struct task_lat *task = (struct task_lat *)tbase;
731         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
732
733         task->lat_pos = targ->lat_pos;
734         task->accur_pos = targ->accur_pos;
735         task->sig_pos = targ->sig_pos;
736         task->sig = targ->sig;
737
738         task->unique_id_pos = targ->packet_id_pos;
739         task->latency_buffer_size = targ->latency_buffer_size;
740
741         PROX_PANIC(task->lat_pos == 0, "Missing 'lat pos' parameter in config file\n");
742         uint16_t min_pkt_len = task->lat_pos + sizeof(uint32_t);
743         if (task->unique_id_pos && (
744                 min_pkt_len < task->unique_id_pos + sizeof(struct unique_id)))
745                 min_pkt_len = task->unique_id_pos + sizeof(struct unique_id);
746         if (task->accur_pos && (
747                 min_pkt_len < task->accur_pos + sizeof(uint32_t)))
748                 min_pkt_len = task->accur_pos + sizeof(uint32_t);
749         if (task->sig_pos && (
750                 min_pkt_len < task->sig_pos + sizeof(uint32_t)))
751                 min_pkt_len = task->sig_pos + sizeof(uint32_t);
752         task->min_pkt_len = min_pkt_len;
753
754         task_init_generator_count(task);
755
756         if (task->latency_buffer_size) {
757                 init_task_lat_latency_buffer(task, targ->lconf->id);
758         }
759
760         if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
761                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
762         }
763
764         if (task->accur_pos) {
765                 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
766                 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
767                 for (uint i = 0; i < task->generator_count; i++) {
768                         task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
769                         PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
770                 }
771                 if (task->unique_id_pos == 0) {
772                         /* When using accuracy feature, the accuracy from TX is written ACCURACY_WINDOW packets later
773                         * We can only retrieve the good packet if a packet id is written to it.
774                         * Otherwise we will use the packet RECEIVED ACCURACY_WINDOW packets ago which is OK if
775                         * packets are not re-ordered. If packets are re-ordered, then the matching between
776                         * the TX accuracy and the latency is wrong.
777                         */
778                         plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
779                 }
780         }
781
782         task->lt[0].min_lat = -1;
783         task->lt[1].min_lat = -1;
784         task->lt[0].bucket_size = targ->bucket_size;
785         task->lt[1].bucket_size = targ->bucket_size;
786         if (task->unique_id_pos) {
787                 task_lat_init_eld(task, socket_id);
788                 task_lat_reset_eld(task);
789         }
790         task->lat_test = &task->lt[task->using_lt];
791
792         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
793         task->rx_pkt_meta = prox_zmalloc(MAX_PKT_BURST * sizeof(*task->rx_pkt_meta), socket_id);
794         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
795
796         uint32_t max_frame_size = MAX_PKT_SIZE;
797         uint64_t bytes_per_hz = UINT64_MAX;
798         if (targ->nb_rxports) {
799                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
800                 max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE;
801
802                 // port->max_link_speed reports the maximum, non negotiated ink speed in Mbps e.g. 40k for a 40 Gbps NIC.
803                 // It can be UINT32_MAX (virtual devices or not supported by DPDK < 16.04)
804                 if (port->max_link_speed != UINT32_MAX) {
805                         bytes_per_hz = port->max_link_speed * 125000L;
806                         plog_info("\tPort %u: max link speed is %ld Mbps\n",
807                                 (uint8_t)(port - prox_port_cfg), 8 * bytes_per_hz / 1000000);
808                 }
809         }
810         task->bytes_to_tsc = prox_zmalloc(max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST, rte_lcore_to_socket_id(targ->lconf->id));
811         PROX_PANIC(task->bytes_to_tsc == NULL,
812                 "Failed to allocate %u bytes (in huge pages) for bytes_to_tsc\n", max_frame_size);
813
814         // There are cases where hz estimate might be slighly over-estimated
815         // This results in too much extrapolation
816         // Only account for 99% of extrapolation to handle cases with up to 1% error clocks
817         for (unsigned int i = 0; i < max_frame_size * MAX_PKT_BURST ; i++) {
818                 if (bytes_per_hz == UINT64_MAX)
819                         task->bytes_to_tsc[i] = 0;
820                 else
821                         task->bytes_to_tsc[i] = (rte_get_tsc_hz() * i * 0.99) / bytes_per_hz;
822         }
823 }
824
825 static struct task_init task_init_lat = {
826         .mode_str = "lat",
827         .init = init_task_lat,
828         .handle = handle_lat_bulk,
829         .start = lat_start,
830         .stop = lat_stop,
831         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
832         .size = sizeof(struct task_lat)
833 };
834
835 __attribute__((constructor)) static void reg_task_lat(void)
836 {
837         reg_task(&task_init_lat);
838 }