Use link speed from device capability instead of negotiated speeda
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     10
38 #define ACCURACY_BUFFER_SIZE    64
39
40 struct lat_info {
41         uint32_t rx_packet_index;
42         uint64_t tx_packet_index;
43         uint32_t tx_err;
44         uint32_t rx_err;
45         uint64_t rx_time;
46         uint64_t tx_time;
47         uint16_t port_queue_id;
48 #ifdef LAT_DEBUG
49         uint16_t id_in_bulk;
50         uint16_t bulk_size;
51         uint64_t begin;
52         uint64_t after;
53         uint64_t before;
54 #endif
55 };
56
57 struct delayed_latency_entry {
58         uint32_t rx_packet_id;
59         uint32_t tx_packet_id;
60         uint32_t packet_id;
61         uint8_t generator_id;
62         uint64_t pkt_rx_time;
63         uint64_t pkt_tx_time;
64         uint64_t rx_time_err;
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
68 {
69         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70         if (delayed_latency_entry->packet_id == packet_id)
71                 return delayed_latency_entry;
72         else
73                 return NULL;
74 }
75
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
77 {
78         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79         delayed_latency_entry->packet_id = packet_id;
80         return delayed_latency_entry;
81 }
82
83 struct rx_pkt_meta_data {
84         uint8_t  *hdr;
85         uint32_t pkt_tx_time;
86         uint32_t bytes_after_in_bulk;
87 };
88
89 struct task_lat {
90         struct task_base base;
91         uint64_t limit;
92         uint64_t rx_packet_index;
93         uint64_t last_pkts_tsc;
94         struct delayed_latency_entry **delayed_latency_entries;
95         struct lat_info *latency_buffer;
96         uint32_t latency_buffer_idx;
97         uint32_t latency_buffer_size;
98         uint64_t begin;
99         uint16_t lat_pos;
100         uint16_t unique_id_pos;
101         uint16_t accur_pos;
102         uint16_t sig_pos;
103         uint32_t sig;
104         volatile uint16_t use_lt; /* which lt to use, */
105         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
106         struct lat_test lt[2];
107         struct lat_test *lat_test;
108         uint32_t generator_count;
109         uint16_t min_pkt_len;
110         struct early_loss_detect *eld;
111         struct rx_pkt_meta_data *rx_pkt_meta;
112         uint64_t link_speed;
113         // Following fields are only used when starting or stopping, not in general runtime
114         uint64_t *prev_tx_packet_index;
115         FILE *fp_rx;
116         FILE *fp_tx;
117         struct prox_port_cfg *port;
118 };
119 /* This function calculate the difference between rx and tx_time
120  * Both values are uint32_t (see handle_lat_bulk)
121  * rx time should be higher than tx_time...except every UINT32_MAX
122  * cycles, when rx_time overflows.
123  * As the return value is also uint32_t, returning (rx_time - tx_time)
124  * is also fine when it overflows.
125  */
126 static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
127 {
128         return rx_time - tx_time;
129 }
130
131 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
132 {
133         if (task->use_lt == task->using_lt)
134                 return &task->lt[!task->using_lt];
135         return NULL;
136 }
137
138 void task_lat_use_other_latency_meassurement(struct task_lat *task)
139 {
140         task->use_lt = !task->using_lt;
141 }
142
143 static void task_lat_update_lat_test(struct task_lat *task)
144 {
145         if (task->use_lt != task->using_lt) {
146                 task->using_lt = task->use_lt;
147                 task->lat_test = &task->lt[task->using_lt];
148                 task->lat_test->accuracy_limit_tsc = task->limit;
149         }
150 }
151
152 static int compare_tx_time(const void *val1, const void *val2)
153 {
154         const struct lat_info *ptr1 = val1;
155         const struct lat_info *ptr2 = val2;
156
157         return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
158 }
159
160 static int compare_tx_packet_index(const void *val1, const void *val2)
161 {
162         const struct lat_info *ptr1 = val1;
163         const struct lat_info *ptr2 = val2;
164
165         return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
166 }
167
168 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
169 {
170         uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
171         uint32_t small = UINT32_MAX >> 1;
172
173         lat++;
174
175         /* Buffer is sorted so far by RX time.
176          * We might have packets being reordered by SUT.
177          *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
178          * Note that:
179          *      - overflow only happens if receiving and storing 4 billions packets...
180          *      - a absolute difference of less than 2 billion packets is not considered as an overflow
181          */
182         for (uint32_t i = 1; i < count; i++) {
183                 tx_packet_index = lat->tx_packet_index;
184                 if (tx_packet_index > old_tx_packet_index) {
185                         if (tx_packet_index - old_tx_packet_index < small) {
186                                 // The diff is small => increasing index count
187                         } else {
188                                 // The diff is big => it is more likely that the previous packet was overflow
189                                 n_overflow--;
190                         }
191                 } else {
192                         if (old_tx_packet_index - tx_packet_index < small) {
193                                 // The diff is small => packet reorder
194                         } else {
195                                 // The diff is big => it is more likely that this is an overflow
196                                 n_overflow++;
197                         }
198                 }
199                 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
200                 old_tx_packet_index = tx_packet_index;
201                 lat++;
202         }
203 }
204
205 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
206 {
207         uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
208         uint32_t small = UINT32_MAX >> 1;
209         lat++;
210
211         /*
212          * Same algorithm as above, but with time.
213          * Note that:
214          *      - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
215          *      - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
216          *              => algorithm does not work if receiving less than 1 packet every 2 seconds
217          */
218         for (uint32_t i = 1; i < count; i++) {
219                 tx_time = lat->tx_time;
220                 if (tx_time > old_tx_time) {
221                         if (tx_time - old_tx_time > small) {
222                                 n_overflow--;
223                         }
224                 } else {
225                         if (old_tx_time - tx_time > small) {
226                                 n_overflow++;
227                         }
228                 }
229                 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
230                 old_tx_time = tx_time;
231                 lat++;
232         }
233 }
234
235 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
236 {
237         struct lat_test *lat_test = task->lat_test;
238
239         for (uint32_t j = 0; j < task->generator_count; j++) {
240                 struct early_loss_detect *eld = &task->eld[j];
241
242                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
243         }
244 }
245
246 static void task_lat_reset_eld(struct task_lat *task)
247 {
248         for (uint32_t j = 0; j < task->generator_count; j++) {
249                 early_loss_detect_reset(&task->eld[j]);
250         }
251 }
252
253 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
254 {
255         uint64_t min_tsc = UINT64_MAX;
256
257         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
258                 if (min_tsc > task->latency_buffer[i].tx_time)
259                         min_tsc = task->latency_buffer[i].tx_time;
260         }
261
262         return min_tsc << LATENCY_ACCURACY;
263 }
264
265 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
266 {
267         uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
268
269         return lat << LATENCY_ACCURACY;
270 }
271
272 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
273 {
274         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
275 }
276
277 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
278 {
279         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
280 }
281
282 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
283 {
284         return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
285 }
286
287 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
288 {
289         return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
290 }
291
292 static void lat_write_latency_to_file(struct task_lat *task)
293 {
294         uint64_t min_tsc;
295         uint64_t n_loss;
296
297         min_tsc = lat_latency_buffer_get_min_tsc(task);
298
299         // Dumping all packet statistics
300         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
301         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
302         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
303                 struct lat_info *lat_info = &task->latency_buffer[i];
304                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
305                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
306                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
307
308                 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
309                         lat_info->rx_packet_index,
310                         lat_info->port_queue_id,
311                         lat_info->tx_packet_index,
312                         tsc_to_nsec(lat_tsc),
313                         tsc_to_nsec(rx_tsc - min_tsc),
314                         tsc_to_nsec(tx_tsc - min_tsc));
315         }
316
317         // To detect dropped packets, we need to sort them based on TX
318         if (task->unique_id_pos) {
319                 plogx_info("Adapting tx_packet_index\n");
320                 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
321                 plogx_info("Sorting packets based on tx_packet_index\n");
322                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
323                 plogx_info("Sorted packets based on packet_index\n");
324         } else {
325                 plogx_info("Adapting tx_time\n");
326                 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
327                 plogx_info("Sorting packets based on tx_time\n");
328                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
329                 plogx_info("Sorted packets based on packet_time\n");
330         }
331
332         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
333         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
334         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
335
336         for (uint32_t i = 0; i < task->generator_count;i++)
337                 task->prev_tx_packet_index[i] = -1;
338
339         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
340                 struct lat_info *lat_info = &task->latency_buffer[i];
341                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
342                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
343                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
344                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
345                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
346
347                 /* Packet n + ACCURACY_BUFFER_SIZE delivers the TX error for packet n,
348                    hence the last ACCURACY_BUFFER_SIZE packets do no have TX error. */
349                 if (i + ACCURACY_BUFFER_SIZE >= task->latency_buffer_idx) {
350                         tx_err_tsc = 0;
351                 }
352
353                 if (lat_info->port_queue_id >= task->generator_count) {
354                         plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
355                                 lat_info->port_queue_id, lat_info->tx_packet_index);
356                         continue;
357                 }
358                 // Log dropped packet
359                 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
360                 if (n_loss)
361                         fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
362                                 lat_info->port_queue_id,
363                                 lat_info->tx_packet_index - n_loss, n_loss);
364                 // Log next packet
365                 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
366                         lat_info->port_queue_id,
367                         lat_info->tx_packet_index,
368                         lat_info->rx_packet_index,
369                         tsc_to_nsec(lat_tsc),
370                         tsc_to_nsec(tx_tsc - min_tsc),
371                         tsc_to_nsec(rx_tsc - min_tsc),
372                         tsc_to_nsec(tx_err_tsc),
373                         tsc_to_nsec(rx_err_tsc));
374 #ifdef LAT_DEBUG
375                 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
376                         lat_info->id_in_bulk,
377                         lat_info->bulk_size,
378                         tsc_to_nsec(lat_info->begin - min_tsc),
379                         tsc_to_nsec(lat_info->before - min_tsc),
380                         tsc_to_nsec(lat_info->after - min_tsc));
381 #endif
382                 fprintf(task->fp_tx, "\n");
383                 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
384         }
385         fflush(task->fp_rx);
386         fflush(task->fp_tx);
387         task->latency_buffer_idx = 0;
388 }
389
390 static void lat_stop(struct task_base *tbase)
391 {
392         struct task_lat *task = (struct task_lat *)tbase;
393
394         if (task->unique_id_pos) {
395                 task_lat_count_remaining_lost_packets(task);
396                 task_lat_reset_eld(task);
397         }
398         if (task->latency_buffer)
399                 lat_write_latency_to_file(task);
400 }
401
402 #ifdef LAT_DEBUG
403 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
404 {
405         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
406
407         lat_info->bulk_size = bulk_size;
408         lat_info->id_in_bulk = id_in_bulk;
409         lat_info->begin = task->begin;
410         lat_info->before = task->base.aux->tsc_rx.before;
411         lat_info->after = task->base.aux->tsc_rx.after;
412 }
413 #endif
414
415 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
416 {
417         struct lat_info *lat_info;
418
419         /* If unique_id_pos is specified then latency is stored per
420            packet being sent. Lost packets are detected runtime, and
421            latency stored for those packets will be 0 */
422         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
423         lat_info->rx_packet_index = rx_packet_index;
424         lat_info->tx_packet_index = packet_id;
425         lat_info->port_queue_id = generator_id;
426         lat_info->rx_time = rx_time;
427         lat_info->tx_time = tx_time;
428         lat_info->rx_err = rx_err;
429         lat_info->tx_err = tx_err;
430 }
431
432 static uint32_t task_lat_early_loss_detect(struct task_lat *task, uint32_t packet_id, uint8_t generator_id)
433 {
434         struct early_loss_detect *eld = &task->eld[generator_id];
435         return early_loss_detect_add(eld, packet_id);
436 }
437
438 static uint64_t tsc_extrapolate_backward(uint64_t link_speed, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
439 {
440         uint64_t tsc = tsc_from - (rte_get_tsc_hz()*bytes)/link_speed;
441         if (likely(tsc > tsc_minimum))
442                 return tsc;
443         else
444                 return tsc_minimum;
445 }
446
447 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
448 {
449         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
450         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
451
452         bucket_id = bucket_id < bucket_count? bucket_id : bucket_count;
453         lat_test->buckets[bucket_id]++;
454 }
455
456 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
457 {
458         lat_test->lost_packets += lost_packets;
459 }
460
461 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
462 {
463         if (error > lat_test->accuracy_limit_tsc)
464                 return;
465         lat_test->tot_pkts++;
466
467         lat_test->tot_lat += lat_tsc;
468         lat_test->tot_lat_error += error;
469
470         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
471         lat_test->var_lat += lat_tsc * lat_tsc;
472         lat_test->var_lat_error += 2 * lat_tsc * error;
473         lat_test->var_lat_error += error * error;
474
475         if (lat_tsc > lat_test->max_lat) {
476                 lat_test->max_lat = lat_tsc;
477                 lat_test->max_lat_error = error;
478         }
479         if (lat_tsc < lat_test->min_lat) {
480                 lat_test->min_lat = lat_tsc;
481                 lat_test->min_lat_error = error;
482         }
483
484 #ifdef LATENCY_HISTOGRAM
485         lat_test_histogram_add(lat_test, lat_tsc);
486 #endif
487 }
488
489 static int task_lat_can_store_latency(struct task_lat *task)
490 {
491         return task->latency_buffer_idx < task->latency_buffer_size;
492 }
493
494 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
495 {
496         uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
497
498         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
499
500         if (task_lat_can_store_latency(task)) {
501                 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
502         }
503 }
504
505 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
506 {
507         struct task_lat *task = (struct task_lat *)tbase;
508         int rc;
509
510 #if RTE_VERSION < RTE_VERSION_NUM(16,4,0,0)
511         // On more recent DPDK, we use the speed_capa of the port, and not the negotiated speed
512         // If link is down, link_speed is 0
513         if (unlikely(task->link_speed == 0)) {
514                 if (task->port && task->port->link_speed != 0) {
515                         task->link_speed = task->port->link_speed * 125000L;
516                         plog_info("\tPort %u: link speed is %ld Mbps\n",
517                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
518                 } else if (n_pkts) {
519                         return task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
520                 } else {
521                         return 0;
522                 }
523         }
524 #endif
525         if (n_pkts == 0) {
526                 task->begin = tbase->aux->tsc_rx.before;
527                 return 0;
528         }
529
530         task_lat_update_lat_test(task);
531
532         // Remember those packets with bad length or bad signature
533         uint32_t non_dp_count = 0;
534         uint64_t pkt_bad_len_sig[(MAX_RX_PKT_ALL + 63) / 64];
535 #define BIT64_SET(a64, bit)     a64[bit / 64] |=  (((uint64_t)1) << (bit & 63))
536 #define BIT64_CLR(a64, bit)     a64[bit / 64] &= ~(((uint64_t)1) << (bit & 63))
537 #define BIT64_TEST(a64, bit)    a64[bit / 64]  &  (((uint64_t)1) << (bit & 63))
538
539         /* Go once through all received packets and read them.  If
540            packet has just been modified by another core, the cost of
541            latency will be partialy amortized though the bulk size */
542         for (uint16_t j = 0; j < n_pkts; ++j) {
543                 struct rte_mbuf *mbuf = mbufs[j];
544                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
545
546                 // Remember those packets which are too short to hold the values that we expect
547                 if (unlikely(rte_pktmbuf_pkt_len(mbuf) < task->min_pkt_len)) {
548                         BIT64_SET(pkt_bad_len_sig, j);
549                         non_dp_count++;
550                 } else
551                         BIT64_CLR(pkt_bad_len_sig, j);
552         }
553
554         if (task->sig_pos) {
555                 for (uint16_t j = 0; j < n_pkts; ++j) {
556                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
557                                 continue;
558                         // Remember those packets with bad signature
559                         if (likely(*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig))
560                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
561                         else {
562                                 BIT64_SET(pkt_bad_len_sig, j);
563                                 non_dp_count++;
564                         }
565                 }
566         } else {
567                 for (uint16_t j = 0; j < n_pkts; ++j) {
568                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
569                                 continue;
570                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
571                 }
572         }
573
574         uint32_t bytes_total_in_bulk = 0;
575         // Find RX time of first packet, for RX accuracy
576         for (uint16_t j = 0; j < n_pkts; ++j) {
577                 uint16_t flipped = n_pkts - 1 - j;
578
579                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
580                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
581         }
582
583         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
584
585         uint64_t rx_time_err;
586         uint64_t pkt_rx_time64 = tsc_extrapolate_backward(task->link_speed, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
587         if (unlikely((task->begin >> LATENCY_ACCURACY) > pkt_rx_time64)) {
588                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
589                 rx_time_err = pkt_rx_time64 - (task->last_pkts_tsc >> LATENCY_ACCURACY);
590         } else {
591                 rx_time_err = pkt_rx_time64 - (task->begin >> LATENCY_ACCURACY);
592         }
593
594         TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, non_dp_count);
595         for (uint16_t j = 0; j < n_pkts; ++j) {
596                 // Used to display % of packets within accuracy limit vs. total number of packets (used_col)
597                 task->lat_test->tot_all_pkts++;
598
599                 // Skip those packets with bad length or bad signature
600                 if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
601                         continue;
602
603                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
604                 uint8_t *hdr = rx_pkt_meta->hdr;
605
606                 uint32_t pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
607                 uint32_t pkt_tx_time = rx_pkt_meta->pkt_tx_time;
608
609                 uint8_t generator_id;
610                 uint32_t packet_id;
611                 if (task->unique_id_pos) {
612                         struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
613                         unique_id_get(unique_id, &generator_id, &packet_id);
614
615                         if (unlikely(generator_id >= task->generator_count)) {
616                                 /* No need to remember unexpected packet at this stage
617                                 BIT64_SET(pkt_bad_len_sig, j);
618                                 */
619                                 // Skip unexpected packet
620                                 continue;
621                         }
622
623                         lat_test_add_lost(task->lat_test, task_lat_early_loss_detect(task, packet_id, generator_id));
624                 } else {
625                         generator_id = 0;
626                         packet_id = task->rx_packet_index;
627                 }
628
629                 /* If accuracy is enabled, latency is reported with a
630                    delay of ACCURACY_BUFFER_SIZE packets since the generator puts the
631                    accuracy for packet N into packet N + ACCURACY_BUFFER_SIZE. The delay
632                    ensures that all reported latencies have both rx
633                    and tx error. */
634                 if (task->accur_pos) {
635                         uint32_t tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
636
637                         struct delayed_latency_entry *delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_BUFFER_SIZE);
638
639                         if (delayed_latency_entry) {
640                                 task_lat_store_lat(task,
641                                                    delayed_latency_entry->rx_packet_id,
642                                                    delayed_latency_entry->pkt_rx_time,
643                                                    delayed_latency_entry->pkt_tx_time,
644                                                    delayed_latency_entry->rx_time_err,
645                                                    tx_time_err,
646                                                    delayed_latency_entry->tx_packet_id,
647                                                    delayed_latency_entry->generator_id);
648                         }
649
650                         delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
651                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
652                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
653                         delayed_latency_entry->rx_time_err = rx_time_err;
654                         delayed_latency_entry->rx_packet_id = task->rx_packet_index;
655                         delayed_latency_entry->tx_packet_id = packet_id;
656                         delayed_latency_entry->generator_id = generator_id;
657                 } else {
658                         task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
659                 }
660
661                 // Bad/unexpected packets do not need to be indexed
662                 task->rx_packet_index++;
663         }
664
665         task->begin = tbase->aux->tsc_rx.before;
666         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
667
668         rc = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
669         // non_dp_count should not be drop-handled, as there are all by definition considered as not handled
670         // RX = DISCARDED + HANDLED + NON_DP + (TX - TX_NON_DP) + TX_FAIL
671         TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, -non_dp_count);
672         return rc;
673 }
674
675 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
676 {
677         const int socket_id = rte_lcore_to_socket_id(core_id);
678         char name[256];
679         size_t latency_buffer_mem_size = 0;
680
681         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
682                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
683
684         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
685
686         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
687         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
688
689         sprintf(name, "latency.rx_%u.txt", core_id);
690         task->fp_rx = fopen(name, "w+");
691         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
692
693         sprintf(name, "latency.tx_%u.txt", core_id);
694         task->fp_tx = fopen(name, "w+");
695         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
696
697         task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
698         PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
699 }
700
701 static void task_init_generator_count(struct task_lat *task)
702 {
703         uint8_t *generator_count = prox_sh_find_system("generator_count");
704
705         if (generator_count == NULL) {
706                 task->generator_count = 1;
707                 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
708         } else
709                 task->generator_count = *generator_count;
710         plog_info("\tLatency using %u generators\n", task->generator_count);
711 }
712
713 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
714 {
715         size_t eld_mem_size;
716
717         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
718         task->eld = prox_zmalloc(eld_mem_size, socket_id);
719         PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
720 }
721
722 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
723 {
724         task->limit = nsec_to_tsc(accuracy_limit_nsec);
725 }
726
727 static void lat_start(struct task_base *tbase)
728 {
729         struct task_lat *task = (struct task_lat *)tbase;
730
731         if (task->port) {
732                 // task->port->link_speed reports the link speed in Mbps e.g. 40k for a 40 Gbps NIC.
733                 // task->link_speed reports link speed in Bytes per sec.
734 #if RTE_VERSION < RTE_VERSION_NUM(16,4,0,0)
735                 // It can be 0 if link is down, and must hence be updated in fast path.
736                 task->link_speed = task->port->link_speed * 125000L;
737                 if (task->link_speed)
738                         plog_info("\tPort %u: link speed is %ld Mbps\n",
739                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
740                 else
741                         plog_info("\tPort %u: link speed is %ld Mbps - link might be down\n",
742                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
743 #else
744                 if (task->port->link_speed == UINT32_MAX)
745                         task->link_speed = UINT64_MAX;
746                 else {
747                         task->link_speed = task->port->link_speed * 125000L;
748                         plog_info("\tPort %u: link max speed is %ld Mbps\n",
749                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
750                 }
751 #endif
752         }
753 }
754
755 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
756 {
757         struct task_lat *task = (struct task_lat *)tbase;
758         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
759
760         task->lat_pos = targ->lat_pos;
761         task->accur_pos = targ->accur_pos;
762         task->sig_pos = targ->sig_pos;
763         task->sig = targ->sig;
764
765         task->unique_id_pos = targ->packet_id_pos;
766         task->latency_buffer_size = targ->latency_buffer_size;
767
768         PROX_PANIC(task->lat_pos == 0, "Missing 'lat pos' parameter in config file\n");
769         uint16_t min_pkt_len = task->lat_pos + sizeof(uint32_t);
770         if (task->unique_id_pos && (
771                 min_pkt_len < task->unique_id_pos + sizeof(struct unique_id)))
772                 min_pkt_len = task->unique_id_pos + sizeof(struct unique_id);
773         if (task->accur_pos && (
774                 min_pkt_len < task->accur_pos + sizeof(uint32_t)))
775                 min_pkt_len = task->accur_pos + sizeof(uint32_t);
776         if (task->sig_pos && (
777                 min_pkt_len < task->sig_pos + sizeof(uint32_t)))
778                 min_pkt_len = task->sig_pos + sizeof(uint32_t);
779         task->min_pkt_len = min_pkt_len;
780
781         task_init_generator_count(task);
782
783         if (task->latency_buffer_size) {
784                 init_task_lat_latency_buffer(task, targ->lconf->id);
785         }
786
787         if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
788                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
789         }
790
791         if (task->accur_pos) {
792                 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
793                 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
794                 for (uint i = 0; i < task->generator_count; i++) {
795                         task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
796                         PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
797                 }
798                 if (task->unique_id_pos == 0) {
799                         /* When using accuracy feature, the accuracy from TX is written ACCURACY_BUFFER_SIZE packets later
800                         * We can only retrieve the good packet if a packet id is written to it.
801                         * Otherwise we will use the packet RECEIVED ACCURACY_BUFFER_SIZE packets ago which is OK if
802                         * packets are not re-ordered. If packets are re-ordered, then the matching between
803                         * the tx accuracy znd the latency is wrong.
804                         */
805                         plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
806                 }
807         }
808
809         task->lt[0].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
810         task->lt[1].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
811         if (task->unique_id_pos) {
812                 task_lat_init_eld(task, socket_id);
813                 task_lat_reset_eld(task);
814         }
815         task->lat_test = &task->lt[task->using_lt];
816
817         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
818         task->rx_pkt_meta = prox_zmalloc(MAX_RX_PKT_ALL * sizeof(*task->rx_pkt_meta), socket_id);
819         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
820
821         task->link_speed = UINT64_MAX;
822         if (targ->nb_rxports) {
823                 // task->port structure is only used while starting handle_lat to get the link_speed.
824                 // link_speed can not be quiried at init as the port has not been initialized yet.
825                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
826                 task->port = port;
827         }
828 }
829
830 static struct task_init task_init_lat = {
831         .mode_str = "lat",
832         .init = init_task_lat,
833         .handle = handle_lat_bulk,
834         .start = lat_start,
835         .stop = lat_stop,
836         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_RX_ALL | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
837         .size = sizeof(struct task_lat)
838 };
839
840 __attribute__((constructor)) static void reg_task_lat(void)
841 {
842         reg_task(&task_init_lat);
843 }