Add support for detection of misordered packets per flow
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2019 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     11
38 #define ACCURACY_BUFFER_SIZE    (2 * ACCURACY_WINDOW)
39
40 struct lat_info {
41         uint32_t rx_packet_index;
42         uint64_t tx_packet_index;
43         uint32_t tx_err;
44         uint32_t rx_err;
45         uint64_t rx_time;
46         uint64_t tx_time;
47         uint16_t port_queue_id;
48 #ifdef LAT_DEBUG
49         uint16_t id_in_bulk;
50         uint16_t bulk_size;
51         uint64_t begin;
52         uint64_t after;
53         uint64_t before;
54 #endif
55 };
56
57 struct delayed_latency_entry {
58         uint32_t rx_packet_id;
59         uint32_t tx_packet_id;
60         uint32_t packet_id;
61         uint8_t generator_id;
62         uint64_t pkt_rx_time;
63         uint64_t pkt_tx_time;   // Time written into packets by gen. Unit is TSC >> LATENCY_ACCURACY
64         uint64_t rx_time_err;
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
68 {
69         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70         if (delayed_latency_entry->packet_id == packet_id)
71                 return delayed_latency_entry;
72         else
73                 return NULL;
74 }
75
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
77 {
78         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79         delayed_latency_entry->packet_id = packet_id;
80         return delayed_latency_entry;
81 }
82
83 struct rx_pkt_meta_data {
84         uint8_t  *hdr;
85         uint32_t pkt_tx_time;
86         uint32_t bytes_after_in_bulk;
87 };
88
89 struct loss_buffer {
90         uint32_t packet_id;
91         uint32_t n;
92 };
93
94 struct flows {
95         uint32_t packet_id;
96 };
97
98 struct task_lat {
99         struct task_base base;
100         uint64_t limit;
101         uint64_t rx_packet_index;
102         uint64_t last_pkts_tsc;
103         struct delayed_latency_entry **delayed_latency_entries;
104         struct lat_info *latency_buffer;
105         uint32_t latency_buffer_idx;
106         uint32_t latency_buffer_size;
107         uint64_t begin;
108         uint16_t lat_pos;
109         uint16_t unique_id_pos;
110         uint16_t accur_pos;
111         uint16_t sig_pos;
112         uint32_t sig;
113         volatile uint16_t use_lt; /* which lt to use, */
114         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
115         struct lat_test lt[2];
116         struct lat_test *lat_test;
117         uint32_t generator_count;
118         uint16_t min_pkt_len;
119         struct early_loss_detect *eld;
120         struct rx_pkt_meta_data *rx_pkt_meta;
121         // Following fields are only used when starting or stopping, not in general runtime
122         uint64_t *prev_tx_packet_index;
123         FILE *fp_loss;
124         FILE *fp_rx;
125         FILE *fp_tx;
126         struct prox_port_cfg *port;
127         uint64_t *bytes_to_tsc;
128         uint64_t *previous_packet;
129         uint32_t loss_buffer_size;
130         struct loss_buffer *loss_buffer;
131         uint32_t loss_id;
132         uint32_t packet_id_in_flow_pos;
133         int32_t flow_id_pos;
134         uint32_t flow_count;
135         struct flows *flows;
136 };
137 /* This function calculate the difference between rx and tx_time
138  * Both values are uint32_t (see handle_lat_bulk)
139  * rx time should be higher than tx_time...except every UINT32_MAX
140  * cycles, when rx_time overflows.
141  * As the return value is also uint32_t, returning (rx_time - tx_time)
142  * is also fine when it overflows.
143  */
144 static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
145 {
146         return rx_time - tx_time;
147 }
148
149 uint32_t task_lat_get_latency_bucket_size(struct task_lat *task)
150 {
151         return task->lat_test->bucket_size;
152 }
153
154 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
155 {
156         if (task->use_lt == task->using_lt)
157                 return &task->lt[!task->using_lt];
158         return NULL;
159 }
160
161 void task_lat_use_other_latency_meassurement(struct task_lat *task)
162 {
163         task->use_lt = !task->using_lt;
164 }
165
166 static void task_lat_update_lat_test(struct task_lat *task)
167 {
168         if (task->use_lt != task->using_lt) {
169                 task->using_lt = task->use_lt;
170                 task->lat_test = &task->lt[task->using_lt];
171                 task->lat_test->accuracy_limit_tsc = task->limit;
172         }
173 }
174
175 static int compare_tx_time(const void *val1, const void *val2)
176 {
177         const struct lat_info *ptr1 = val1;
178         const struct lat_info *ptr2 = val2;
179
180         return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
181 }
182
183 static int compare_tx_packet_index(const void *val1, const void *val2)
184 {
185         const struct lat_info *ptr1 = val1;
186         const struct lat_info *ptr2 = val2;
187
188         return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
189 }
190
191 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
192 {
193         uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
194         uint32_t small = UINT32_MAX >> 1;
195
196         lat++;
197
198         /* Buffer is sorted so far by RX time.
199          * We might have packets being reordered by SUT.
200          *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
201          * Note that:
202          *      - overflow only happens if receiving and storing 4 billions packets...
203          *      - a absolute difference of less than 2 billion packets is not considered as an overflow
204          */
205         for (uint32_t i = 1; i < count; i++) {
206                 tx_packet_index = lat->tx_packet_index;
207                 if (tx_packet_index > old_tx_packet_index) {
208                         if (tx_packet_index - old_tx_packet_index < small) {
209                                 // The diff is small => increasing index count
210                         } else {
211                                 // The diff is big => it is more likely that the previous packet was overflow
212                                 n_overflow--;
213                         }
214                 } else {
215                         if (old_tx_packet_index - tx_packet_index < small) {
216                                 // The diff is small => packet reorder
217                         } else {
218                                 // The diff is big => it is more likely that this is an overflow
219                                 n_overflow++;
220                         }
221                 }
222                 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
223                 old_tx_packet_index = tx_packet_index;
224                 lat++;
225         }
226 }
227
228 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
229 {
230         uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
231         uint32_t small = UINT32_MAX >> 1;
232         lat++;
233
234         /*
235          * Same algorithm as above, but with time.
236          * Note that:
237          *      - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
238          *      - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
239          *              => algorithm does not work if receiving less than 1 packet every 2 seconds
240          */
241         for (uint32_t i = 1; i < count; i++) {
242                 tx_time = lat->tx_time;
243                 if (tx_time > old_tx_time) {
244                         if (tx_time - old_tx_time > small) {
245                                 n_overflow--;
246                         }
247                 } else {
248                         if (old_tx_time - tx_time > small) {
249                                 n_overflow++;
250                         }
251                 }
252                 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
253                 old_tx_time = tx_time;
254                 lat++;
255         }
256 }
257
258 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
259 {
260         struct lat_test *lat_test = task->lat_test;
261
262         for (uint32_t j = 0; j < task->generator_count; j++) {
263                 struct early_loss_detect *eld = &task->eld[j];
264
265                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
266         }
267 }
268
269 static void task_lat_reset_eld(struct task_lat *task)
270 {
271         for (uint32_t j = 0; j < task->generator_count; j++) {
272                 early_loss_detect_reset(&task->eld[j]);
273         }
274 }
275
276 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
277 {
278         uint64_t min_tsc = UINT64_MAX;
279
280         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
281                 if (min_tsc > task->latency_buffer[i].tx_time)
282                         min_tsc = task->latency_buffer[i].tx_time;
283         }
284
285         return min_tsc << LATENCY_ACCURACY;
286 }
287
288 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
289 {
290         uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
291
292         return lat << LATENCY_ACCURACY;
293 }
294
295 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
296 {
297         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
298 }
299
300 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
301 {
302         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
303 }
304
305 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
306 {
307         return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
308 }
309
310 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
311 {
312         return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
313 }
314
315 static void lat_write_latency_to_file(struct task_lat *task)
316 {
317         uint64_t min_tsc;
318         uint64_t n_loss;
319
320         min_tsc = lat_latency_buffer_get_min_tsc(task);
321
322         // Dumping all packet statistics
323         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
324         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
325         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
326                 struct lat_info *lat_info = &task->latency_buffer[i];
327                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
328                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
329                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
330
331                 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
332                         lat_info->rx_packet_index,
333                         lat_info->port_queue_id,
334                         lat_info->tx_packet_index,
335                         tsc_to_nsec(lat_tsc),
336                         tsc_to_nsec(rx_tsc - min_tsc),
337                         tsc_to_nsec(tx_tsc - min_tsc));
338         }
339
340         // To detect dropped packets, we need to sort them based on TX
341         if (task->unique_id_pos) {
342                 plogx_info("Adapting tx_packet_index\n");
343                 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
344                 plogx_info("Sorting packets based on tx_packet_index\n");
345                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
346                 plogx_info("Sorted packets based on packet_index\n");
347         } else {
348                 plogx_info("Adapting tx_time\n");
349                 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
350                 plogx_info("Sorting packets based on tx_time\n");
351                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
352                 plogx_info("Sorted packets based on packet_time\n");
353         }
354
355         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
356         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
357         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
358
359         for (uint32_t i = 0; i < task->generator_count;i++)
360                 task->prev_tx_packet_index[i] = -1;
361
362         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
363                 struct lat_info *lat_info = &task->latency_buffer[i];
364                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
365                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
366                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
367                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
368                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
369
370                 /* Packet n + ACCURACY_WINDOW delivers the TX error for packet n,
371                    hence the last ACCURACY_WINDOW packets do no have TX error. */
372                 if (i + ACCURACY_WINDOW >= task->latency_buffer_idx) {
373                         tx_err_tsc = 0;
374                 }
375
376                 if (lat_info->port_queue_id >= task->generator_count) {
377                         plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
378                                 lat_info->port_queue_id, lat_info->tx_packet_index);
379                         continue;
380                 }
381                 // Log dropped packet
382                 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
383                 if (n_loss)
384                         fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
385                                 lat_info->port_queue_id,
386                                 lat_info->tx_packet_index - n_loss, n_loss);
387                 // Log next packet
388                 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
389                         lat_info->port_queue_id,
390                         lat_info->tx_packet_index,
391                         lat_info->rx_packet_index,
392                         tsc_to_nsec(lat_tsc),
393                         tsc_to_nsec(tx_tsc - min_tsc),
394                         tsc_to_nsec(rx_tsc - min_tsc),
395                         tsc_to_nsec(tx_err_tsc),
396                         tsc_to_nsec(rx_err_tsc));
397 #ifdef LAT_DEBUG
398                 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
399                         lat_info->id_in_bulk,
400                         lat_info->bulk_size,
401                         tsc_to_nsec(lat_info->begin - min_tsc),
402                         tsc_to_nsec(lat_info->before - min_tsc),
403                         tsc_to_nsec(lat_info->after - min_tsc));
404 #endif
405                 fprintf(task->fp_tx, "\n");
406                 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
407         }
408         fflush(task->fp_rx);
409         fflush(task->fp_tx);
410         task->latency_buffer_idx = 0;
411 }
412
413 static void lat_stop(struct task_base *tbase)
414 {
415         struct task_lat *task = (struct task_lat *)tbase;
416
417         if (task->unique_id_pos) {
418                 task_lat_count_remaining_lost_packets(task);
419                 task_lat_reset_eld(task);
420                 memset(task->previous_packet, 0, sizeof(task->previous_packet) * task->generator_count);
421         }
422         if (task->loss_id) {
423                 for (uint i = 0; i < task->loss_id; i++) {
424                         fprintf(task->fp_loss, "packet %d: %d\n", task->loss_buffer[i].packet_id, task->loss_buffer[i].n);
425                 }
426         }
427         task->lat_test->lost_packets = 0;
428         if (task->latency_buffer)
429                 lat_write_latency_to_file(task);
430 }
431
432 #ifdef LAT_DEBUG
433 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
434 {
435         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
436
437         lat_info->bulk_size = bulk_size;
438         lat_info->id_in_bulk = id_in_bulk;
439         lat_info->begin = task->begin;
440         lat_info->before = task->base.aux->tsc_rx.before;
441         lat_info->after = task->base.aux->tsc_rx.after;
442 }
443 #endif
444
445 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
446 {
447         struct lat_info *lat_info;
448
449         /* If unique_id_pos is specified then latency is stored per
450            packet being sent. Lost packets are detected runtime, and
451            latency stored for those packets will be 0 */
452         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
453         lat_info->rx_packet_index = rx_packet_index;
454         lat_info->tx_packet_index = packet_id;
455         lat_info->port_queue_id = generator_id;
456         lat_info->rx_time = rx_time;
457         lat_info->tx_time = tx_time;
458         lat_info->rx_err = rx_err;
459         lat_info->tx_err = tx_err;
460 }
461
462 static uint32_t task_lat_early_loss_detect(struct task_lat *task, uint32_t packet_id, uint8_t generator_id)
463 {
464         struct early_loss_detect *eld = &task->eld[generator_id];
465         return early_loss_detect_add(eld, packet_id);
466 }
467
468 static void lat_test_check_duplicate(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
469 {
470         struct early_loss_detect *eld = &task->eld[generator_id];
471         uint32_t old_queue_id, queue_pos;
472
473         queue_pos = packet_id & PACKET_QUEUE_MASK;
474         old_queue_id = eld->entries[queue_pos];
475         if ((packet_id >> PACKET_QUEUE_BITS) == old_queue_id)
476                 lat_test->duplicate++;
477 }
478
479 static uint64_t tsc_extrapolate_backward(struct task_lat *task, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
480 {
481 #ifdef NO_LAT_EXTRAPOLATION
482         uint64_t tsc = tsc_from;
483 #else
484         uint64_t tsc = tsc_from - task->bytes_to_tsc[bytes];
485 #endif
486         if (likely(tsc > tsc_minimum))
487                 return tsc;
488         else
489                 return tsc_minimum;
490 }
491
492 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
493 {
494         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
495         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
496
497         bucket_id = bucket_id < bucket_count? bucket_id : (bucket_count - 1);
498         lat_test->buckets[bucket_id]++;
499 }
500
501 static void lat_test_check_flow_ordering(struct task_lat *task, struct lat_test *lat_test, int32_t flow_id, uint32_t packet_id)
502 {
503         if (packet_id < task->flows[flow_id].packet_id) {
504                 lat_test->mis_ordered++;
505                 lat_test->extent += task->flows[flow_id].packet_id - packet_id;
506         }
507         task->flows[flow_id].packet_id = packet_id;
508 }
509
510 static void lat_test_check_ordering(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
511 {
512         if (packet_id < task->previous_packet[generator_id]) {
513                 lat_test->mis_ordered++;
514                 lat_test->extent += task->previous_packet[generator_id] - packet_id;
515         }
516         task->previous_packet[generator_id] = packet_id;
517 }
518
519 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
520 {
521         lat_test->lost_packets += lost_packets;
522 }
523
524 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
525 {
526         if (error > lat_test->accuracy_limit_tsc)
527                 return;
528         lat_test->tot_pkts++;
529
530         lat_test->tot_lat += lat_tsc;
531         lat_test->tot_lat_error += error;
532
533         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
534         lat_test->var_lat += lat_tsc * lat_tsc;
535         lat_test->var_lat_error += 2 * lat_tsc * error;
536         lat_test->var_lat_error += error * error;
537
538         if (lat_tsc > lat_test->max_lat) {
539                 lat_test->max_lat = lat_tsc;
540                 lat_test->max_lat_error = error;
541         }
542         if (lat_tsc < lat_test->min_lat) {
543                 lat_test->min_lat = lat_tsc;
544                 lat_test->min_lat_error = error;
545         }
546
547 #ifdef LATENCY_HISTOGRAM
548         lat_test_histogram_add(lat_test, lat_tsc);
549 #endif
550 }
551
552 static int task_lat_can_store_latency(struct task_lat *task)
553 {
554         return task->latency_buffer_idx < task->latency_buffer_size;
555 }
556
557 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
558 {
559         uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
560
561         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
562
563         if (task_lat_can_store_latency(task)) {
564                 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
565         }
566 }
567
568 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
569 {
570         struct task_lat *task = (struct task_lat *)tbase;
571         static int max_flows_printed = 0;
572         int rc;
573
574         if (n_pkts == 0) {
575                 task->begin = tbase->aux->tsc_rx.before;
576                 return 0;
577         }
578
579         task_lat_update_lat_test(task);
580
581         // Remember those packets with bad length or bad signature
582         uint32_t non_dp_count = 0;
583         uint64_t pkt_bad_len_sig = 0;
584 #define BIT64_SET(a64, bit)     a64 |=  (((uint64_t)1) << (bit & 63))
585 #define BIT64_CLR(a64, bit)     a64 &= ~(((uint64_t)1) << (bit & 63))
586 #define BIT64_TEST(a64, bit)    a64  &  (((uint64_t)1) << (bit & 63))
587
588         /* Go once through all received packets and read them.  If
589            packet has just been modified by another core, the cost of
590            latency will be partialy amortized though the bulk size */
591         for (uint16_t j = 0; j < n_pkts; ++j) {
592                 struct rte_mbuf *mbuf = mbufs[j];
593                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
594
595                 // Remember those packets which are too short to hold the values that we expect
596                 if (unlikely(rte_pktmbuf_pkt_len(mbuf) < task->min_pkt_len)) {
597                         BIT64_SET(pkt_bad_len_sig, j);
598                         non_dp_count++;
599                 } else
600                         BIT64_CLR(pkt_bad_len_sig, j);
601         }
602
603         if (task->sig_pos) {
604                 for (uint16_t j = 0; j < n_pkts; ++j) {
605                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
606                                 continue;
607                         // Remember those packets with bad signature
608                         if (likely(*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig))
609                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
610                         else {
611                                 BIT64_SET(pkt_bad_len_sig, j);
612                                 non_dp_count++;
613                         }
614                 }
615         } else {
616                 for (uint16_t j = 0; j < n_pkts; ++j) {
617                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
618                                 continue;
619                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
620                 }
621         }
622
623         uint32_t bytes_total_in_bulk = 0;
624         // Find RX time of first packet, for RX accuracy
625         for (uint16_t j = 0; j < n_pkts; ++j) {
626                 uint16_t flipped = n_pkts - 1 - j;
627
628                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
629                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
630         }
631
632         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
633
634         uint64_t rx_time_err;
635         uint64_t pkt_rx_time64 = tsc_extrapolate_backward(task, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
636         if (unlikely((task->begin >> LATENCY_ACCURACY) > pkt_rx_time64)) {
637                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
638                 rx_time_err = pkt_rx_time64 - (task->last_pkts_tsc >> LATENCY_ACCURACY);
639         } else {
640                 rx_time_err = pkt_rx_time64 - (task->begin >> LATENCY_ACCURACY);
641         }
642
643         TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, non_dp_count);
644         for (uint16_t j = 0; j < n_pkts; ++j) {
645                 // Used to display % of packets within accuracy limit vs. total number of packets (used_col)
646                 task->lat_test->tot_all_pkts++;
647
648                 // Skip those packets with bad length or bad signature
649                 if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
650                         continue;
651
652                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
653                 uint8_t *hdr = rx_pkt_meta->hdr;
654
655                 uint32_t pkt_rx_time = tsc_extrapolate_backward(task, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
656                 uint32_t pkt_tx_time = rx_pkt_meta->pkt_tx_time;
657
658                 uint8_t generator_id;
659                 uint32_t packet_id;
660                 int32_t flow_id = -1;
661                 if (task->flow_id_pos) {
662                         flow_id = *(int32_t *)(hdr + task->flow_id_pos);
663                         if (unlikely(flow_id >= (int32_t)(task->flow_count))) {
664                                 flow_id = -1;
665                                 if (!max_flows_printed) {
666                                         plog_info("Too many flows - increase flow count (only printed once)\n");
667                                         max_flows_printed = 1;
668                                 }
669                         }
670
671                 }
672                 if (task->packet_id_in_flow_pos && (flow_id != -1)) {
673                         uint32_t packet_id_in_flow;
674                         struct unique_id *unique_id = (struct unique_id *)(hdr + task->packet_id_in_flow_pos);
675                         unique_id_get(unique_id, &generator_id, &packet_id_in_flow);
676                         lat_test_check_flow_ordering(task, task->lat_test, flow_id + generator_id * task->generator_count, packet_id_in_flow);
677                 }
678                 if (task->unique_id_pos) {
679                         struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
680                         unique_id_get(unique_id, &generator_id, &packet_id);
681
682                         if (unlikely(generator_id >= task->generator_count)) {
683                                 /* No need to remember unexpected packet at this stage
684                                 BIT64_SET(pkt_bad_len_sig, j);
685                                 */
686                                 // Skip unexpected packet
687                                 continue;
688                         }
689                         if (flow_id == -1) {
690                                 lat_test_check_ordering(task, task->lat_test, packet_id, generator_id);
691                         }
692                         lat_test_check_duplicate(task, task->lat_test, packet_id, generator_id);
693                         uint32_t loss =  task_lat_early_loss_detect(task, packet_id, generator_id);
694                         if (loss) {
695                                 lat_test_add_lost(task->lat_test, loss);
696                                 if (task->loss_id < task->loss_buffer_size) {
697                                         task->loss_buffer[task->loss_id].packet_id = packet_id;
698                                         task->loss_buffer[task->loss_id++].n = loss;
699                                 }
700                         }
701                 } else {
702                         generator_id = 0;
703                         packet_id = task->rx_packet_index;
704                 }
705
706                 /* If accuracy is enabled, latency is reported with a
707                    delay of ACCURACY_WINDOW packets since the generator puts the
708                    accuracy for packet N into packet N + ACCURACY_WINDOW. The delay
709                    ensures that all reported latencies have both rx
710                    and tx error. */
711                 if (task->accur_pos) {
712                         uint32_t tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
713
714                         struct delayed_latency_entry *delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_WINDOW);
715
716                         if (delayed_latency_entry) {
717                                 task_lat_store_lat(task,
718                                                    delayed_latency_entry->rx_packet_id,
719                                                    delayed_latency_entry->pkt_rx_time,
720                                                    delayed_latency_entry->pkt_tx_time,
721                                                    delayed_latency_entry->rx_time_err,
722                                                    tx_time_err,
723                                                    delayed_latency_entry->tx_packet_id,
724                                                    delayed_latency_entry->generator_id);
725                         }
726
727                         delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
728                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
729                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
730                         delayed_latency_entry->rx_time_err = rx_time_err;
731                         delayed_latency_entry->rx_packet_id = task->rx_packet_index;
732                         delayed_latency_entry->tx_packet_id = packet_id;
733                         delayed_latency_entry->generator_id = generator_id;
734                 } else {
735                         task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
736                 }
737
738                 // Bad/unexpected packets do not need to be indexed
739                 task->rx_packet_index++;
740         }
741
742         if (n_pkts < MAX_PKT_BURST)
743                 task->begin = tbase->aux->tsc_rx.before;
744         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
745
746         rc = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
747         // non_dp_count should not be drop-handled, as there are all by definition considered as not handled
748         // RX = DISCARDED + HANDLED + NON_DP + (TX - TX_NON_DP) + TX_FAIL
749         TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, -non_dp_count);
750         return rc;
751 }
752
753 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
754 {
755         const int socket_id = rte_lcore_to_socket_id(core_id);
756         char name[256];
757         size_t latency_buffer_mem_size = 0;
758
759         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
760                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
761
762         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
763
764         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
765         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
766
767         sprintf(name, "latency.rx_%u.txt", core_id);
768         task->fp_rx = fopen(name, "w+");
769         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
770
771         sprintf(name, "latency.tx_%u.txt", core_id);
772         task->fp_tx = fopen(name, "w+");
773         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
774
775         task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
776         PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
777 }
778
779 static void task_init_generator_count(struct task_lat *task)
780 {
781         uint8_t *generator_count = prox_sh_find_system("generator_count");
782
783         if (generator_count == NULL) {
784                 task->generator_count = 1;
785                 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
786         } else
787                 task->generator_count = *generator_count;
788         plog_info("\t\tLatency using %u generators\n", task->generator_count);
789 }
790
791 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
792 {
793         size_t eld_mem_size;
794
795         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
796         task->eld = prox_zmalloc(eld_mem_size, socket_id);
797         PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
798 }
799
800 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
801 {
802         task->limit = nsec_to_tsc(accuracy_limit_nsec);
803 }
804
805 static void lat_start(struct task_base *tbase)
806 {
807         struct task_lat *task = (struct task_lat *)tbase;
808
809 }
810
811 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
812 {
813         struct task_lat *task = (struct task_lat *)tbase;
814         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
815
816         task->lat_pos = targ->lat_pos;
817         task->accur_pos = targ->accur_pos;
818         task->sig_pos = targ->sig_pos;
819         task->sig = targ->sig;
820         task->packet_id_in_flow_pos = targ->packet_id_in_flow_pos;
821         task->flow_id_pos = targ->flow_id_pos;
822
823         task->unique_id_pos = targ->packet_id_pos;
824         task->latency_buffer_size = targ->latency_buffer_size;
825
826         PROX_PANIC(task->lat_pos == 0, "Missing 'lat pos' parameter in config file\n");
827         uint16_t min_pkt_len = task->lat_pos + sizeof(uint32_t);
828         if (task->unique_id_pos && (
829                 min_pkt_len < task->unique_id_pos + sizeof(struct unique_id)))
830                 min_pkt_len = task->unique_id_pos + sizeof(struct unique_id);
831         if (task->accur_pos && (
832                 min_pkt_len < task->accur_pos + sizeof(uint32_t)))
833                 min_pkt_len = task->accur_pos + sizeof(uint32_t);
834         if (task->sig_pos && (
835                 min_pkt_len < task->sig_pos + sizeof(uint32_t)))
836                 min_pkt_len = task->sig_pos + sizeof(uint32_t);
837         task->min_pkt_len = min_pkt_len;
838
839         task_init_generator_count(task);
840
841         if (task->latency_buffer_size) {
842                 init_task_lat_latency_buffer(task, targ->lconf->id);
843         }
844
845         char name[256];
846         sprintf(name, "loss_%u.txt", targ->lconf->id);
847         task->fp_loss = fopen(name, "w+");
848         PROX_PANIC(task->fp_loss == NULL, "Failed to open %s\n", name);
849
850         if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
851                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
852         }
853
854         if (task->accur_pos) {
855                 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
856                 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
857                 for (uint i = 0; i < task->generator_count; i++) {
858                         task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
859                         PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
860                 }
861                 if (task->unique_id_pos == 0) {
862                         /* When using accuracy feature, the accuracy from TX is written ACCURACY_WINDOW packets later
863                         * We can only retrieve the good packet if a packet id is written to it.
864                         * Otherwise we will use the packet RECEIVED ACCURACY_WINDOW packets ago which is OK if
865                         * packets are not re-ordered. If packets are re-ordered, then the matching between
866                         * the TX accuracy and the latency is wrong.
867                         */
868                         plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
869                 }
870         }
871
872         task->lt[0].min_lat = -1;
873         task->lt[1].min_lat = -1;
874         task->lt[0].bucket_size = targ->bucket_size;
875         task->lt[1].bucket_size = targ->bucket_size;
876         if (task->unique_id_pos) {
877                 task_lat_init_eld(task, socket_id);
878                 task_lat_reset_eld(task);
879                 task->previous_packet = prox_zmalloc(sizeof(task->previous_packet) * task->generator_count , socket_id);
880                 PROX_PANIC(task->previous_packet == NULL, "Failed to allocate array for storing previous packet\n");
881         }
882         task->lat_test = &task->lt[task->using_lt];
883
884         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
885         task->rx_pkt_meta = prox_zmalloc(MAX_PKT_BURST * sizeof(*task->rx_pkt_meta), socket_id);
886         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
887
888         uint32_t max_frame_size = MAX_PKT_SIZE;
889         uint64_t bytes_per_hz = UINT64_MAX;
890         if (targ->nb_rxports) {
891                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
892                 max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE;
893
894                 // port->max_link_speed reports the maximum, non negotiated ink speed in Mbps e.g. 40k for a 40 Gbps NIC.
895                 // It can be UINT32_MAX (virtual devices or not supported by DPDK < 16.04)
896                 if (port->max_link_speed != UINT32_MAX) {
897                         bytes_per_hz = port->max_link_speed * 125000L;
898                         plog_info("\t\tPort %u: max link speed is %ld Mbps\n",
899                                 (uint8_t)(port - prox_port_cfg), 8 * bytes_per_hz / 1000000);
900                 }
901         }
902         task->loss_buffer_size = targ->loss_buffer_size;
903         if (task->loss_buffer_size) {
904                 task->loss_buffer = prox_zmalloc(task->loss_buffer_size * sizeof(struct loss_buffer), rte_lcore_to_socket_id(targ->lconf->id));
905                 PROX_PANIC(task->loss_buffer == NULL,
906                         "Failed to allocate %lu bytes (in huge pages) for loss_buffer\n", task->loss_buffer_size * sizeof(struct loss_buffer));
907         }
908         task->bytes_to_tsc = prox_zmalloc(max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST, rte_lcore_to_socket_id(targ->lconf->id));
909         PROX_PANIC(task->bytes_to_tsc == NULL,
910                 "Failed to allocate %lu bytes (in huge pages) for bytes_to_tsc\n", max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST);
911
912         // There are cases where hz estimate might be slighly over-estimated
913         // This results in too much extrapolation
914         // Only account for 99% of extrapolation to handle cases with up to 1% error clocks
915         for (unsigned int i = 0; i < max_frame_size * MAX_PKT_BURST ; i++) {
916                 if (bytes_per_hz == UINT64_MAX)
917                         task->bytes_to_tsc[i] = 0;
918                 else
919                         task->bytes_to_tsc[i] = (rte_get_tsc_hz() * i * 0.99) / bytes_per_hz;
920         }
921         task->flow_count = targ->flow_count;
922         PROX_PANIC(task->flow_id_pos && (task->flow_count == 0), "flow_count must be configured when flow_id_pos is set\n");
923         if (task->flow_count) {
924                 task->flows = prox_zmalloc(task->flow_count * sizeof(struct flows) * task->generator_count, rte_lcore_to_socket_id(targ->lconf->id));
925                 PROX_PANIC(task->flows == NULL,
926                         "Failed to allocate %lu bytes (in huge pages) for flows\n", task->flow_count * sizeof(struct flows) * task->generator_count);
927         }
928 }
929
930 static struct task_init task_init_lat = {
931         .mode_str = "lat",
932         .init = init_task_lat,
933         .handle = handle_lat_bulk,
934         .start = lat_start,
935         .stop = lat_stop,
936         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
937         .size = sizeof(struct task_lat)
938 };
939
940 __attribute__((constructor)) static void reg_task_lat(void)
941 {
942         reg_task(&task_init_lat);
943 }