Fix mbuf corruption when handling icmp messages.
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2019 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     11
38 #define ACCURACY_BUFFER_SIZE    (2 * ACCURACY_WINDOW)
39
40 struct lat_info {
41         uint32_t rx_packet_index;
42         uint64_t tx_packet_index;
43         uint32_t tx_err;
44         uint32_t rx_err;
45         uint64_t rx_time;
46         uint64_t tx_time;
47         uint16_t port_queue_id;
48 #ifdef LAT_DEBUG
49         uint16_t id_in_bulk;
50         uint16_t bulk_size;
51         uint64_t begin;
52         uint64_t after;
53         uint64_t before;
54 #endif
55 };
56
57 struct delayed_latency_entry {
58         uint32_t rx_packet_id;
59         uint32_t tx_packet_id;
60         uint32_t packet_id;
61         uint8_t generator_id;
62         uint64_t pkt_rx_time;
63         uint64_t pkt_tx_time;   // Time written into packets by gen. Unit is TSC >> LATENCY_ACCURACY
64         uint64_t rx_time_err;
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
68 {
69         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70         if (delayed_latency_entry->packet_id == packet_id)
71                 return delayed_latency_entry;
72         else
73                 return NULL;
74 }
75
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
77 {
78         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79         delayed_latency_entry->packet_id = packet_id;
80         return delayed_latency_entry;
81 }
82
83 struct rx_pkt_meta_data {
84         uint8_t  *hdr;
85         uint32_t pkt_tx_time;
86         uint32_t bytes_after_in_bulk;
87 };
88
89 struct task_lat {
90         struct task_base base;
91         uint64_t limit;
92         uint64_t rx_packet_index;
93         uint64_t last_pkts_tsc;
94         struct delayed_latency_entry **delayed_latency_entries;
95         struct lat_info *latency_buffer;
96         uint32_t latency_buffer_idx;
97         uint32_t latency_buffer_size;
98         uint64_t begin;
99         uint16_t lat_pos;
100         uint16_t unique_id_pos;
101         uint16_t accur_pos;
102         uint16_t sig_pos;
103         uint32_t sig;
104         volatile uint16_t use_lt; /* which lt to use, */
105         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
106         struct lat_test lt[2];
107         struct lat_test *lat_test;
108         uint32_t generator_count;
109         uint16_t min_pkt_len;
110         struct early_loss_detect *eld;
111         struct rx_pkt_meta_data *rx_pkt_meta;
112         // Following fields are only used when starting or stopping, not in general runtime
113         uint64_t *prev_tx_packet_index;
114         FILE *fp_rx;
115         FILE *fp_tx;
116         struct prox_port_cfg *port;
117         uint64_t *bytes_to_tsc;
118         uint64_t *previous_packet;
119 };
120 /* This function calculate the difference between rx and tx_time
121  * Both values are uint32_t (see handle_lat_bulk)
122  * rx time should be higher than tx_time...except every UINT32_MAX
123  * cycles, when rx_time overflows.
124  * As the return value is also uint32_t, returning (rx_time - tx_time)
125  * is also fine when it overflows.
126  */
127 static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
128 {
129         return rx_time - tx_time;
130 }
131
132 uint32_t task_lat_get_latency_bucket_size(struct task_lat *task)
133 {
134         return task->lat_test->bucket_size;
135 }
136
137 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
138 {
139         if (task->use_lt == task->using_lt)
140                 return &task->lt[!task->using_lt];
141         return NULL;
142 }
143
144 void task_lat_use_other_latency_meassurement(struct task_lat *task)
145 {
146         task->use_lt = !task->using_lt;
147 }
148
149 static void task_lat_update_lat_test(struct task_lat *task)
150 {
151         if (task->use_lt != task->using_lt) {
152                 task->using_lt = task->use_lt;
153                 task->lat_test = &task->lt[task->using_lt];
154                 task->lat_test->accuracy_limit_tsc = task->limit;
155         }
156 }
157
158 static int compare_tx_time(const void *val1, const void *val2)
159 {
160         const struct lat_info *ptr1 = val1;
161         const struct lat_info *ptr2 = val2;
162
163         return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
164 }
165
166 static int compare_tx_packet_index(const void *val1, const void *val2)
167 {
168         const struct lat_info *ptr1 = val1;
169         const struct lat_info *ptr2 = val2;
170
171         return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
172 }
173
174 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
175 {
176         uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
177         uint32_t small = UINT32_MAX >> 1;
178
179         lat++;
180
181         /* Buffer is sorted so far by RX time.
182          * We might have packets being reordered by SUT.
183          *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
184          * Note that:
185          *      - overflow only happens if receiving and storing 4 billions packets...
186          *      - a absolute difference of less than 2 billion packets is not considered as an overflow
187          */
188         for (uint32_t i = 1; i < count; i++) {
189                 tx_packet_index = lat->tx_packet_index;
190                 if (tx_packet_index > old_tx_packet_index) {
191                         if (tx_packet_index - old_tx_packet_index < small) {
192                                 // The diff is small => increasing index count
193                         } else {
194                                 // The diff is big => it is more likely that the previous packet was overflow
195                                 n_overflow--;
196                         }
197                 } else {
198                         if (old_tx_packet_index - tx_packet_index < small) {
199                                 // The diff is small => packet reorder
200                         } else {
201                                 // The diff is big => it is more likely that this is an overflow
202                                 n_overflow++;
203                         }
204                 }
205                 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
206                 old_tx_packet_index = tx_packet_index;
207                 lat++;
208         }
209 }
210
211 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
212 {
213         uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
214         uint32_t small = UINT32_MAX >> 1;
215         lat++;
216
217         /*
218          * Same algorithm as above, but with time.
219          * Note that:
220          *      - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
221          *      - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
222          *              => algorithm does not work if receiving less than 1 packet every 2 seconds
223          */
224         for (uint32_t i = 1; i < count; i++) {
225                 tx_time = lat->tx_time;
226                 if (tx_time > old_tx_time) {
227                         if (tx_time - old_tx_time > small) {
228                                 n_overflow--;
229                         }
230                 } else {
231                         if (old_tx_time - tx_time > small) {
232                                 n_overflow++;
233                         }
234                 }
235                 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
236                 old_tx_time = tx_time;
237                 lat++;
238         }
239 }
240
241 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
242 {
243         struct lat_test *lat_test = task->lat_test;
244
245         for (uint32_t j = 0; j < task->generator_count; j++) {
246                 struct early_loss_detect *eld = &task->eld[j];
247
248                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
249         }
250 }
251
252 static void task_lat_reset_eld(struct task_lat *task)
253 {
254         for (uint32_t j = 0; j < task->generator_count; j++) {
255                 early_loss_detect_reset(&task->eld[j]);
256         }
257 }
258
259 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
260 {
261         uint64_t min_tsc = UINT64_MAX;
262
263         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
264                 if (min_tsc > task->latency_buffer[i].tx_time)
265                         min_tsc = task->latency_buffer[i].tx_time;
266         }
267
268         return min_tsc << LATENCY_ACCURACY;
269 }
270
271 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
272 {
273         uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
274
275         return lat << LATENCY_ACCURACY;
276 }
277
278 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
279 {
280         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
281 }
282
283 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
284 {
285         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
286 }
287
288 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
289 {
290         return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
291 }
292
293 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
294 {
295         return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
296 }
297
298 static void lat_write_latency_to_file(struct task_lat *task)
299 {
300         uint64_t min_tsc;
301         uint64_t n_loss;
302
303         min_tsc = lat_latency_buffer_get_min_tsc(task);
304
305         // Dumping all packet statistics
306         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
307         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
308         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
309                 struct lat_info *lat_info = &task->latency_buffer[i];
310                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
311                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
312                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
313
314                 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
315                         lat_info->rx_packet_index,
316                         lat_info->port_queue_id,
317                         lat_info->tx_packet_index,
318                         tsc_to_nsec(lat_tsc),
319                         tsc_to_nsec(rx_tsc - min_tsc),
320                         tsc_to_nsec(tx_tsc - min_tsc));
321         }
322
323         // To detect dropped packets, we need to sort them based on TX
324         if (task->unique_id_pos) {
325                 plogx_info("Adapting tx_packet_index\n");
326                 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
327                 plogx_info("Sorting packets based on tx_packet_index\n");
328                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
329                 plogx_info("Sorted packets based on packet_index\n");
330         } else {
331                 plogx_info("Adapting tx_time\n");
332                 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
333                 plogx_info("Sorting packets based on tx_time\n");
334                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
335                 plogx_info("Sorted packets based on packet_time\n");
336         }
337
338         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
339         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
340         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
341
342         for (uint32_t i = 0; i < task->generator_count;i++)
343                 task->prev_tx_packet_index[i] = -1;
344
345         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
346                 struct lat_info *lat_info = &task->latency_buffer[i];
347                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
348                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
349                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
350                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
351                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
352
353                 /* Packet n + ACCURACY_WINDOW delivers the TX error for packet n,
354                    hence the last ACCURACY_WINDOW packets do no have TX error. */
355                 if (i + ACCURACY_WINDOW >= task->latency_buffer_idx) {
356                         tx_err_tsc = 0;
357                 }
358
359                 if (lat_info->port_queue_id >= task->generator_count) {
360                         plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
361                                 lat_info->port_queue_id, lat_info->tx_packet_index);
362                         continue;
363                 }
364                 // Log dropped packet
365                 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
366                 if (n_loss)
367                         fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
368                                 lat_info->port_queue_id,
369                                 lat_info->tx_packet_index - n_loss, n_loss);
370                 // Log next packet
371                 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
372                         lat_info->port_queue_id,
373                         lat_info->tx_packet_index,
374                         lat_info->rx_packet_index,
375                         tsc_to_nsec(lat_tsc),
376                         tsc_to_nsec(tx_tsc - min_tsc),
377                         tsc_to_nsec(rx_tsc - min_tsc),
378                         tsc_to_nsec(tx_err_tsc),
379                         tsc_to_nsec(rx_err_tsc));
380 #ifdef LAT_DEBUG
381                 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
382                         lat_info->id_in_bulk,
383                         lat_info->bulk_size,
384                         tsc_to_nsec(lat_info->begin - min_tsc),
385                         tsc_to_nsec(lat_info->before - min_tsc),
386                         tsc_to_nsec(lat_info->after - min_tsc));
387 #endif
388                 fprintf(task->fp_tx, "\n");
389                 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
390         }
391         fflush(task->fp_rx);
392         fflush(task->fp_tx);
393         task->latency_buffer_idx = 0;
394 }
395
396 static void lat_stop(struct task_base *tbase)
397 {
398         struct task_lat *task = (struct task_lat *)tbase;
399
400         if (task->unique_id_pos) {
401                 task_lat_count_remaining_lost_packets(task);
402                 task_lat_reset_eld(task);
403         }
404         if (task->latency_buffer)
405                 lat_write_latency_to_file(task);
406 }
407
408 #ifdef LAT_DEBUG
409 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
410 {
411         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
412
413         lat_info->bulk_size = bulk_size;
414         lat_info->id_in_bulk = id_in_bulk;
415         lat_info->begin = task->begin;
416         lat_info->before = task->base.aux->tsc_rx.before;
417         lat_info->after = task->base.aux->tsc_rx.after;
418 }
419 #endif
420
421 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
422 {
423         struct lat_info *lat_info;
424
425         /* If unique_id_pos is specified then latency is stored per
426            packet being sent. Lost packets are detected runtime, and
427            latency stored for those packets will be 0 */
428         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
429         lat_info->rx_packet_index = rx_packet_index;
430         lat_info->tx_packet_index = packet_id;
431         lat_info->port_queue_id = generator_id;
432         lat_info->rx_time = rx_time;
433         lat_info->tx_time = tx_time;
434         lat_info->rx_err = rx_err;
435         lat_info->tx_err = tx_err;
436 }
437
438 static uint32_t task_lat_early_loss_detect(struct task_lat *task, uint32_t packet_id, uint8_t generator_id)
439 {
440         struct early_loss_detect *eld = &task->eld[generator_id];
441         return early_loss_detect_add(eld, packet_id);
442 }
443
444 static void lat_test_check_duplicate(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
445 {
446         struct early_loss_detect *eld = &task->eld[generator_id];
447         uint32_t old_queue_id, queue_pos;
448
449         queue_pos = packet_id & PACKET_QUEUE_MASK;
450         old_queue_id = eld->entries[queue_pos];
451         if ((packet_id >> PACKET_QUEUE_BITS) == old_queue_id)
452                 lat_test->duplicate++;
453 }
454
455 static uint64_t tsc_extrapolate_backward(struct task_lat *task, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
456 {
457 #ifdef NO_LAT_EXTRAPOLATION
458         uint64_t tsc = tsc_from;
459 #else
460         uint64_t tsc = tsc_from - task->bytes_to_tsc[bytes];
461 #endif
462         if (likely(tsc > tsc_minimum))
463                 return tsc;
464         else
465                 return tsc_minimum;
466 }
467
468 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
469 {
470         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
471         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
472
473         bucket_id = bucket_id < bucket_count? bucket_id : (bucket_count - 1);
474         lat_test->buckets[bucket_id]++;
475 }
476
477 static void lat_test_check_ordering(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
478 {
479         if (packet_id < task->previous_packet[generator_id]) {
480                 lat_test->mis_ordered++;
481                 lat_test->extent += task->previous_packet[generator_id] - packet_id;
482         }
483         task->previous_packet[generator_id] = packet_id;
484 }
485
486 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
487 {
488         lat_test->lost_packets += lost_packets;
489 }
490
491 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
492 {
493         if (error > lat_test->accuracy_limit_tsc)
494                 return;
495         lat_test->tot_pkts++;
496
497         lat_test->tot_lat += lat_tsc;
498         lat_test->tot_lat_error += error;
499
500         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
501         lat_test->var_lat += lat_tsc * lat_tsc;
502         lat_test->var_lat_error += 2 * lat_tsc * error;
503         lat_test->var_lat_error += error * error;
504
505         if (lat_tsc > lat_test->max_lat) {
506                 lat_test->max_lat = lat_tsc;
507                 lat_test->max_lat_error = error;
508         }
509         if (lat_tsc < lat_test->min_lat) {
510                 lat_test->min_lat = lat_tsc;
511                 lat_test->min_lat_error = error;
512         }
513
514 #ifdef LATENCY_HISTOGRAM
515         lat_test_histogram_add(lat_test, lat_tsc);
516 #endif
517 }
518
519 static int task_lat_can_store_latency(struct task_lat *task)
520 {
521         return task->latency_buffer_idx < task->latency_buffer_size;
522 }
523
524 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
525 {
526         uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
527
528         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
529
530         if (task_lat_can_store_latency(task)) {
531                 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
532         }
533 }
534
535 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
536 {
537         struct task_lat *task = (struct task_lat *)tbase;
538         int rc;
539
540         if (n_pkts == 0) {
541                 task->begin = tbase->aux->tsc_rx.before;
542                 return 0;
543         }
544
545         task_lat_update_lat_test(task);
546
547         // Remember those packets with bad length or bad signature
548         uint32_t non_dp_count = 0;
549         uint64_t pkt_bad_len_sig = 0;
550 #define BIT64_SET(a64, bit)     a64 |=  (((uint64_t)1) << (bit & 63))
551 #define BIT64_CLR(a64, bit)     a64 &= ~(((uint64_t)1) << (bit & 63))
552 #define BIT64_TEST(a64, bit)    a64  &  (((uint64_t)1) << (bit & 63))
553
554         /* Go once through all received packets and read them.  If
555            packet has just been modified by another core, the cost of
556            latency will be partialy amortized though the bulk size */
557         for (uint16_t j = 0; j < n_pkts; ++j) {
558                 struct rte_mbuf *mbuf = mbufs[j];
559                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
560
561                 // Remember those packets which are too short to hold the values that we expect
562                 if (unlikely(rte_pktmbuf_pkt_len(mbuf) < task->min_pkt_len)) {
563                         BIT64_SET(pkt_bad_len_sig, j);
564                         non_dp_count++;
565                 } else
566                         BIT64_CLR(pkt_bad_len_sig, j);
567         }
568
569         if (task->sig_pos) {
570                 for (uint16_t j = 0; j < n_pkts; ++j) {
571                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
572                                 continue;
573                         // Remember those packets with bad signature
574                         if (likely(*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig))
575                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
576                         else {
577                                 BIT64_SET(pkt_bad_len_sig, j);
578                                 non_dp_count++;
579                         }
580                 }
581         } else {
582                 for (uint16_t j = 0; j < n_pkts; ++j) {
583                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
584                                 continue;
585                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
586                 }
587         }
588
589         uint32_t bytes_total_in_bulk = 0;
590         // Find RX time of first packet, for RX accuracy
591         for (uint16_t j = 0; j < n_pkts; ++j) {
592                 uint16_t flipped = n_pkts - 1 - j;
593
594                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
595                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
596         }
597
598         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
599
600         uint64_t rx_time_err;
601         uint64_t pkt_rx_time64 = tsc_extrapolate_backward(task, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
602         if (unlikely((task->begin >> LATENCY_ACCURACY) > pkt_rx_time64)) {
603                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
604                 rx_time_err = pkt_rx_time64 - (task->last_pkts_tsc >> LATENCY_ACCURACY);
605         } else {
606                 rx_time_err = pkt_rx_time64 - (task->begin >> LATENCY_ACCURACY);
607         }
608
609         TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, non_dp_count);
610         for (uint16_t j = 0; j < n_pkts; ++j) {
611                 // Used to display % of packets within accuracy limit vs. total number of packets (used_col)
612                 task->lat_test->tot_all_pkts++;
613
614                 // Skip those packets with bad length or bad signature
615                 if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
616                         continue;
617
618                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
619                 uint8_t *hdr = rx_pkt_meta->hdr;
620
621                 uint32_t pkt_rx_time = tsc_extrapolate_backward(task, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
622                 uint32_t pkt_tx_time = rx_pkt_meta->pkt_tx_time;
623
624                 uint8_t generator_id;
625                 uint32_t packet_id;
626                 if (task->unique_id_pos) {
627                         struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
628                         unique_id_get(unique_id, &generator_id, &packet_id);
629
630                         if (unlikely(generator_id >= task->generator_count)) {
631                                 /* No need to remember unexpected packet at this stage
632                                 BIT64_SET(pkt_bad_len_sig, j);
633                                 */
634                                 // Skip unexpected packet
635                                 continue;
636                         }
637                         lat_test_check_ordering(task, task->lat_test, packet_id, generator_id);
638                         lat_test_check_duplicate(task, task->lat_test, packet_id, generator_id);
639                         lat_test_add_lost(task->lat_test, task_lat_early_loss_detect(task, packet_id, generator_id));
640                 } else {
641                         generator_id = 0;
642                         packet_id = task->rx_packet_index;
643                 }
644
645                 /* If accuracy is enabled, latency is reported with a
646                    delay of ACCURACY_WINDOW packets since the generator puts the
647                    accuracy for packet N into packet N + ACCURACY_WINDOW. The delay
648                    ensures that all reported latencies have both rx
649                    and tx error. */
650                 if (task->accur_pos) {
651                         uint32_t tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
652
653                         struct delayed_latency_entry *delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_WINDOW);
654
655                         if (delayed_latency_entry) {
656                                 task_lat_store_lat(task,
657                                                    delayed_latency_entry->rx_packet_id,
658                                                    delayed_latency_entry->pkt_rx_time,
659                                                    delayed_latency_entry->pkt_tx_time,
660                                                    delayed_latency_entry->rx_time_err,
661                                                    tx_time_err,
662                                                    delayed_latency_entry->tx_packet_id,
663                                                    delayed_latency_entry->generator_id);
664                         }
665
666                         delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
667                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
668                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
669                         delayed_latency_entry->rx_time_err = rx_time_err;
670                         delayed_latency_entry->rx_packet_id = task->rx_packet_index;
671                         delayed_latency_entry->tx_packet_id = packet_id;
672                         delayed_latency_entry->generator_id = generator_id;
673                 } else {
674                         task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
675                 }
676
677                 // Bad/unexpected packets do not need to be indexed
678                 task->rx_packet_index++;
679         }
680
681         if (n_pkts < MAX_PKT_BURST)
682                 task->begin = tbase->aux->tsc_rx.before;
683         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
684
685         rc = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
686         // non_dp_count should not be drop-handled, as there are all by definition considered as not handled
687         // RX = DISCARDED + HANDLED + NON_DP + (TX - TX_NON_DP) + TX_FAIL
688         TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, -non_dp_count);
689         return rc;
690 }
691
692 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
693 {
694         const int socket_id = rte_lcore_to_socket_id(core_id);
695         char name[256];
696         size_t latency_buffer_mem_size = 0;
697
698         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
699                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
700
701         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
702
703         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
704         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
705
706         sprintf(name, "latency.rx_%u.txt", core_id);
707         task->fp_rx = fopen(name, "w+");
708         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
709
710         sprintf(name, "latency.tx_%u.txt", core_id);
711         task->fp_tx = fopen(name, "w+");
712         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
713
714         task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
715         PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
716 }
717
718 static void task_init_generator_count(struct task_lat *task)
719 {
720         uint8_t *generator_count = prox_sh_find_system("generator_count");
721
722         if (generator_count == NULL) {
723                 task->generator_count = 1;
724                 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
725         } else
726                 task->generator_count = *generator_count;
727         plog_info("\t\tLatency using %u generators\n", task->generator_count);
728 }
729
730 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
731 {
732         size_t eld_mem_size;
733
734         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
735         task->eld = prox_zmalloc(eld_mem_size, socket_id);
736         PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
737 }
738
739 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
740 {
741         task->limit = nsec_to_tsc(accuracy_limit_nsec);
742 }
743
744 static void lat_start(struct task_base *tbase)
745 {
746         struct task_lat *task = (struct task_lat *)tbase;
747
748 }
749
750 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
751 {
752         struct task_lat *task = (struct task_lat *)tbase;
753         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
754
755         task->lat_pos = targ->lat_pos;
756         task->accur_pos = targ->accur_pos;
757         task->sig_pos = targ->sig_pos;
758         task->sig = targ->sig;
759
760         task->unique_id_pos = targ->packet_id_pos;
761         task->latency_buffer_size = targ->latency_buffer_size;
762
763         PROX_PANIC(task->lat_pos == 0, "Missing 'lat pos' parameter in config file\n");
764         uint16_t min_pkt_len = task->lat_pos + sizeof(uint32_t);
765         if (task->unique_id_pos && (
766                 min_pkt_len < task->unique_id_pos + sizeof(struct unique_id)))
767                 min_pkt_len = task->unique_id_pos + sizeof(struct unique_id);
768         if (task->accur_pos && (
769                 min_pkt_len < task->accur_pos + sizeof(uint32_t)))
770                 min_pkt_len = task->accur_pos + sizeof(uint32_t);
771         if (task->sig_pos && (
772                 min_pkt_len < task->sig_pos + sizeof(uint32_t)))
773                 min_pkt_len = task->sig_pos + sizeof(uint32_t);
774         task->min_pkt_len = min_pkt_len;
775
776         task_init_generator_count(task);
777
778         if (task->latency_buffer_size) {
779                 init_task_lat_latency_buffer(task, targ->lconf->id);
780         }
781
782         if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
783                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
784         }
785
786         if (task->accur_pos) {
787                 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
788                 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
789                 for (uint i = 0; i < task->generator_count; i++) {
790                         task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
791                         PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
792                 }
793                 if (task->unique_id_pos == 0) {
794                         /* When using accuracy feature, the accuracy from TX is written ACCURACY_WINDOW packets later
795                         * We can only retrieve the good packet if a packet id is written to it.
796                         * Otherwise we will use the packet RECEIVED ACCURACY_WINDOW packets ago which is OK if
797                         * packets are not re-ordered. If packets are re-ordered, then the matching between
798                         * the TX accuracy and the latency is wrong.
799                         */
800                         plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
801                 }
802         }
803
804         task->lt[0].min_lat = -1;
805         task->lt[1].min_lat = -1;
806         task->lt[0].bucket_size = targ->bucket_size;
807         task->lt[1].bucket_size = targ->bucket_size;
808         if (task->unique_id_pos) {
809                 task_lat_init_eld(task, socket_id);
810                 task_lat_reset_eld(task);
811                 task->previous_packet = prox_zmalloc(sizeof(task->previous_packet) * task->generator_count , socket_id);
812                 PROX_PANIC(task->previous_packet == NULL, "Failed to allocate array for storing previous packet\n");
813         }
814         task->lat_test = &task->lt[task->using_lt];
815
816         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
817         task->rx_pkt_meta = prox_zmalloc(MAX_PKT_BURST * sizeof(*task->rx_pkt_meta), socket_id);
818         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
819
820         uint32_t max_frame_size = MAX_PKT_SIZE;
821         uint64_t bytes_per_hz = UINT64_MAX;
822         if (targ->nb_rxports) {
823                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
824                 max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE;
825
826                 // port->max_link_speed reports the maximum, non negotiated ink speed in Mbps e.g. 40k for a 40 Gbps NIC.
827                 // It can be UINT32_MAX (virtual devices or not supported by DPDK < 16.04)
828                 if (port->max_link_speed != UINT32_MAX) {
829                         bytes_per_hz = port->max_link_speed * 125000L;
830                         plog_info("\t\tPort %u: max link speed is %ld Mbps\n",
831                                 (uint8_t)(port - prox_port_cfg), 8 * bytes_per_hz / 1000000);
832                 }
833         }
834         task->bytes_to_tsc = prox_zmalloc(max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST, rte_lcore_to_socket_id(targ->lconf->id));
835         PROX_PANIC(task->bytes_to_tsc == NULL,
836                 "Failed to allocate %u bytes (in huge pages) for bytes_to_tsc\n", max_frame_size);
837
838         // There are cases where hz estimate might be slighly over-estimated
839         // This results in too much extrapolation
840         // Only account for 99% of extrapolation to handle cases with up to 1% error clocks
841         for (unsigned int i = 0; i < max_frame_size * MAX_PKT_BURST ; i++) {
842                 if (bytes_per_hz == UINT64_MAX)
843                         task->bytes_to_tsc[i] = 0;
844                 else
845                         task->bytes_to_tsc[i] = (rte_get_tsc_hz() * i * 0.99) / bytes_per_hz;
846         }
847 }
848
849 static struct task_init task_init_lat = {
850         .mode_str = "lat",
851         .init = init_task_lat,
852         .handle = handle_lat_bulk,
853         .start = lat_start,
854         .stop = lat_stop,
855         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
856         .size = sizeof(struct task_lat)
857 };
858
859 __attribute__((constructor)) static void reg_task_lat(void)
860 {
861         reg_task(&task_init_lat);
862 }