Merge "Fix memory leak in L3 submode"
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     10
38 #define ACCURACY_BUFFER_SIZE    64
39
40 struct lat_info {
41         uint32_t rx_packet_index;
42         uint64_t tx_packet_index;
43         uint32_t tx_err;
44         uint32_t rx_err;
45         uint64_t rx_time;
46         uint64_t tx_time;
47         uint16_t port_queue_id;
48 #ifdef LAT_DEBUG
49         uint16_t id_in_bulk;
50         uint16_t bulk_size;
51         uint64_t begin;
52         uint64_t after;
53         uint64_t before;
54 #endif
55 };
56
57 struct delayed_latency_entry {
58         uint32_t rx_packet_id;
59         uint32_t tx_packet_id;
60         uint32_t packet_id;
61         uint8_t generator_id;
62         uint64_t pkt_rx_time;
63         uint64_t pkt_tx_time;
64         uint64_t rx_time_err;
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
68 {
69         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70         if (delayed_latency_entry->packet_id == packet_id)
71                 return delayed_latency_entry;
72         else
73                 return NULL;
74 }
75
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
77 {
78         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79         delayed_latency_entry->packet_id = packet_id;
80         return delayed_latency_entry;
81 }
82
83 struct rx_pkt_meta_data {
84         uint8_t  *hdr;
85         uint32_t pkt_tx_time;
86         uint32_t bytes_after_in_bulk;
87 };
88
89 struct task_lat {
90         struct task_base base;
91         uint64_t limit;
92         uint64_t rx_packet_index;
93         uint64_t last_pkts_tsc;
94         struct delayed_latency_entry **delayed_latency_entries;
95         struct lat_info *latency_buffer;
96         uint32_t latency_buffer_idx;
97         uint32_t latency_buffer_size;
98         uint64_t begin;
99         uint16_t lat_pos;
100         uint16_t unique_id_pos;
101         uint16_t accur_pos;
102         uint16_t sig_pos;
103         uint32_t sig;
104         volatile uint16_t use_lt; /* which lt to use, */
105         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
106         struct lat_test lt[2];
107         struct lat_test *lat_test;
108         uint32_t generator_count;
109         uint16_t min_pkt_len;
110         struct early_loss_detect *eld;
111         struct rx_pkt_meta_data *rx_pkt_meta;
112         uint64_t link_speed;
113         // Following fields are only used when starting or stopping, not in general runtime
114         uint64_t *prev_tx_packet_index;
115         FILE *fp_rx;
116         FILE *fp_tx;
117         struct prox_port_cfg *port;
118 };
119 /* This function calculate the difference between rx and tx_time
120  * Both values are uint32_t (see handle_lat_bulk)
121  * rx time should be higher than tx_time...except every UINT32_MAX
122  * cycles, when rx_time overflows.
123  * As the return value is also uint32_t, returning (rx_time - tx_time)
124  * is also fine when it overflows.
125  */
126 static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
127 {
128         return rx_time - tx_time;
129 }
130
131 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
132 {
133         if (task->use_lt == task->using_lt)
134                 return &task->lt[!task->using_lt];
135         return NULL;
136 }
137
138 void task_lat_use_other_latency_meassurement(struct task_lat *task)
139 {
140         task->use_lt = !task->using_lt;
141 }
142
143 static void task_lat_update_lat_test(struct task_lat *task)
144 {
145         if (task->use_lt != task->using_lt) {
146                 task->using_lt = task->use_lt;
147                 task->lat_test = &task->lt[task->using_lt];
148                 task->lat_test->accuracy_limit_tsc = task->limit;
149         }
150 }
151
152 static int compare_tx_time(const void *val1, const void *val2)
153 {
154         const struct lat_info *ptr1 = val1;
155         const struct lat_info *ptr2 = val2;
156
157         return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
158 }
159
160 static int compare_tx_packet_index(const void *val1, const void *val2)
161 {
162         const struct lat_info *ptr1 = val1;
163         const struct lat_info *ptr2 = val2;
164
165         return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
166 }
167
168 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
169 {
170         uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
171         uint32_t small = UINT32_MAX >> 1;
172
173         lat++;
174
175         /* Buffer is sorted so far by RX time.
176          * We might have packets being reordered by SUT.
177          *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
178          * Note that:
179          *      - overflow only happens if receiving and storing 4 billions packets...
180          *      - a absolute difference of less than 2 billion packets is not considered as an overflow
181          */
182         for (uint32_t i = 1; i < count; i++) {
183                 tx_packet_index = lat->tx_packet_index;
184                 if (tx_packet_index > old_tx_packet_index) {
185                         if (tx_packet_index - old_tx_packet_index < small) {
186                                 // The diff is small => increasing index count
187                         } else {
188                                 // The diff is big => it is more likely that the previous packet was overflow
189                                 n_overflow--;
190                         }
191                 } else {
192                         if (old_tx_packet_index - tx_packet_index < small) {
193                                 // The diff is small => packet reorder
194                         } else {
195                                 // The diff is big => it is more likely that this is an overflow
196                                 n_overflow++;
197                         }
198                 }
199                 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
200                 old_tx_packet_index = tx_packet_index;
201                 lat++;
202         }
203 }
204
205 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
206 {
207         uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
208         uint32_t small = UINT32_MAX >> 1;
209         lat++;
210
211         /*
212          * Same algorithm as above, but with time.
213          * Note that:
214          *      - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
215          *      - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
216          *              => algorithm does not work if receiving less than 1 packet every 2 seconds
217          */
218         for (uint32_t i = 1; i < count; i++) {
219                 tx_time = lat->tx_time;
220                 if (tx_time > old_tx_time) {
221                         if (tx_time - old_tx_time > small) {
222                                 n_overflow--;
223                         }
224                 } else {
225                         if (old_tx_time - tx_time > small) {
226                                 n_overflow++;
227                         }
228                 }
229                 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
230                 old_tx_time = tx_time;
231                 lat++;
232         }
233 }
234
235 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
236 {
237         struct lat_test *lat_test = task->lat_test;
238
239         for (uint32_t j = 0; j < task->generator_count; j++) {
240                 struct early_loss_detect *eld = &task->eld[j];
241
242                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
243         }
244 }
245
246 static void task_lat_reset_eld(struct task_lat *task)
247 {
248         for (uint32_t j = 0; j < task->generator_count; j++) {
249                 early_loss_detect_reset(&task->eld[j]);
250         }
251 }
252
253 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
254 {
255         uint64_t min_tsc = UINT64_MAX;
256
257         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
258                 if (min_tsc > task->latency_buffer[i].tx_time)
259                         min_tsc = task->latency_buffer[i].tx_time;
260         }
261
262         return min_tsc << LATENCY_ACCURACY;
263 }
264
265 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
266 {
267         uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
268
269         return lat << LATENCY_ACCURACY;
270 }
271
272 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
273 {
274         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
275 }
276
277 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
278 {
279         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
280 }
281
282 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
283 {
284         return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
285 }
286
287 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
288 {
289         return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
290 }
291
292 static void lat_write_latency_to_file(struct task_lat *task)
293 {
294         uint64_t min_tsc;
295         uint64_t n_loss;
296
297         min_tsc = lat_latency_buffer_get_min_tsc(task);
298
299         // Dumping all packet statistics
300         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
301         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
302         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
303                 struct lat_info *lat_info = &task->latency_buffer[i];
304                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
305                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
306                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
307
308                 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
309                         lat_info->rx_packet_index,
310                         lat_info->port_queue_id,
311                         lat_info->tx_packet_index,
312                         tsc_to_nsec(lat_tsc),
313                         tsc_to_nsec(rx_tsc - min_tsc),
314                         tsc_to_nsec(tx_tsc - min_tsc));
315         }
316
317         // To detect dropped packets, we need to sort them based on TX
318         if (task->unique_id_pos) {
319                 plogx_info("Adapting tx_packet_index\n");
320                 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
321                 plogx_info("Sorting packets based on tx_packet_index\n");
322                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
323                 plogx_info("Sorted packets based on packet_index\n");
324         } else {
325                 plogx_info("Adapting tx_time\n");
326                 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
327                 plogx_info("Sorting packets based on tx_time\n");
328                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
329                 plogx_info("Sorted packets based on packet_time\n");
330         }
331
332         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
333         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
334         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
335
336         for (uint32_t i = 0; i < task->generator_count;i++)
337                 task->prev_tx_packet_index[i] = -1;
338
339         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
340                 struct lat_info *lat_info = &task->latency_buffer[i];
341                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
342                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
343                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
344                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
345                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
346
347                 /* Packet n + ACCURACY_BUFFER_SIZE delivers the TX error for packet n,
348                    hence the last ACCURACY_BUFFER_SIZE packets do no have TX error. */
349                 if (i + ACCURACY_BUFFER_SIZE >= task->latency_buffer_idx) {
350                         tx_err_tsc = 0;
351                 }
352
353                 if (lat_info->port_queue_id >= task->generator_count) {
354                         plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
355                                 lat_info->port_queue_id, lat_info->tx_packet_index);
356                         continue;
357                 }
358                 // Log dropped packet
359                 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
360                 if (n_loss)
361                         fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
362                                 lat_info->port_queue_id,
363                                 lat_info->tx_packet_index - n_loss, n_loss);
364                 // Log next packet
365                 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
366                         lat_info->port_queue_id,
367                         lat_info->tx_packet_index,
368                         lat_info->rx_packet_index,
369                         tsc_to_nsec(lat_tsc),
370                         tsc_to_nsec(tx_tsc - min_tsc),
371                         tsc_to_nsec(rx_tsc - min_tsc),
372                         tsc_to_nsec(tx_err_tsc),
373                         tsc_to_nsec(rx_err_tsc));
374 #ifdef LAT_DEBUG
375                 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
376                         lat_info->id_in_bulk,
377                         lat_info->bulk_size,
378                         tsc_to_nsec(lat_info->begin - min_tsc),
379                         tsc_to_nsec(lat_info->before - min_tsc),
380                         tsc_to_nsec(lat_info->after - min_tsc));
381 #endif
382                 fprintf(task->fp_tx, "\n");
383                 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
384         }
385         fflush(task->fp_rx);
386         fflush(task->fp_tx);
387         task->latency_buffer_idx = 0;
388 }
389
390 static void lat_stop(struct task_base *tbase)
391 {
392         struct task_lat *task = (struct task_lat *)tbase;
393
394         if (task->unique_id_pos) {
395                 task_lat_count_remaining_lost_packets(task);
396                 task_lat_reset_eld(task);
397         }
398         if (task->latency_buffer)
399                 lat_write_latency_to_file(task);
400 }
401
402 #ifdef LAT_DEBUG
403 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
404 {
405         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
406
407         lat_info->bulk_size = bulk_size;
408         lat_info->id_in_bulk = id_in_bulk;
409         lat_info->begin = task->begin;
410         lat_info->before = task->base.aux->tsc_rx.before;
411         lat_info->after = task->base.aux->tsc_rx.after;
412 }
413 #endif
414
415 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
416 {
417         struct lat_info *lat_info;
418
419         /* If unique_id_pos is specified then latency is stored per
420            packet being sent. Lost packets are detected runtime, and
421            latency stored for those packets will be 0 */
422         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
423         lat_info->rx_packet_index = rx_packet_index;
424         lat_info->tx_packet_index = packet_id;
425         lat_info->port_queue_id = generator_id;
426         lat_info->rx_time = rx_time;
427         lat_info->tx_time = tx_time;
428         lat_info->rx_err = rx_err;
429         lat_info->tx_err = tx_err;
430 }
431
432 static uint32_t task_lat_early_loss_detect(struct task_lat *task, uint32_t packet_id, uint8_t generator_id)
433 {
434         struct early_loss_detect *eld = &task->eld[generator_id];
435         return early_loss_detect_add(eld, packet_id);
436 }
437
438 static uint64_t tsc_extrapolate_backward(uint64_t link_speed, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
439 {
440         uint64_t tsc = tsc_from - (rte_get_tsc_hz()*bytes)/link_speed;
441         if (likely(tsc > tsc_minimum))
442                 return tsc;
443         else
444                 return tsc_minimum;
445 }
446
447 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
448 {
449         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
450         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
451
452         bucket_id = bucket_id < bucket_count? bucket_id : bucket_count;
453         lat_test->buckets[bucket_id]++;
454 }
455
456 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
457 {
458         lat_test->lost_packets += lost_packets;
459 }
460
461 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
462 {
463         if (error > lat_test->accuracy_limit_tsc)
464                 return;
465         lat_test->tot_pkts++;
466
467         lat_test->tot_lat += lat_tsc;
468         lat_test->tot_lat_error += error;
469
470         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
471         lat_test->var_lat += lat_tsc * lat_tsc;
472         lat_test->var_lat_error += 2 * lat_tsc * error;
473         lat_test->var_lat_error += error * error;
474
475         if (lat_tsc > lat_test->max_lat) {
476                 lat_test->max_lat = lat_tsc;
477                 lat_test->max_lat_error = error;
478         }
479         if (lat_tsc < lat_test->min_lat) {
480                 lat_test->min_lat = lat_tsc;
481                 lat_test->min_lat_error = error;
482         }
483
484 #ifdef LATENCY_HISTOGRAM
485         lat_test_histogram_add(lat_test, lat_tsc);
486 #endif
487 }
488
489 static int task_lat_can_store_latency(struct task_lat *task)
490 {
491         return task->latency_buffer_idx < task->latency_buffer_size;
492 }
493
494 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
495 {
496         uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
497
498         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
499
500         if (task_lat_can_store_latency(task)) {
501                 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
502         }
503 }
504
505 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
506 {
507         struct task_lat *task = (struct task_lat *)tbase;
508         int rc;
509
510         // If link is down, link_speed is 0
511         if (unlikely(task->link_speed == 0)) {
512                 if (task->port && task->port->link_speed != 0) {
513                         task->link_speed = task->port->link_speed * 125000L;
514                         plog_info("\tPort %u: link speed is %ld Mbps\n",
515                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
516                 } else if (n_pkts) {
517                         return task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
518                 } else {
519                         return 0;
520                 }
521         }
522
523         if (n_pkts == 0) {
524                 task->begin = tbase->aux->tsc_rx.before;
525                 return 0;
526         }
527
528         task_lat_update_lat_test(task);
529
530         // Remember those packets with bad length or bad signature
531         uint32_t non_dp_count = 0;
532         uint64_t pkt_bad_len_sig[(MAX_RX_PKT_ALL + 63) / 64];
533 #define BIT64_SET(a64, bit)     a64[bit / 64] |=  (((uint64_t)1) << (bit & 63))
534 #define BIT64_CLR(a64, bit)     a64[bit / 64] &= ~(((uint64_t)1) << (bit & 63))
535 #define BIT64_TEST(a64, bit)    a64[bit / 64]  &  (((uint64_t)1) << (bit & 63))
536
537         /* Go once through all received packets and read them.  If
538            packet has just been modified by another core, the cost of
539            latency will be partialy amortized though the bulk size */
540         for (uint16_t j = 0; j < n_pkts; ++j) {
541                 struct rte_mbuf *mbuf = mbufs[j];
542                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
543
544                 // Remember those packets which are too short to hold the values that we expect
545                 if (unlikely(rte_pktmbuf_pkt_len(mbuf) < task->min_pkt_len)) {
546                         BIT64_SET(pkt_bad_len_sig, j);
547                         non_dp_count++;
548                 } else
549                         BIT64_CLR(pkt_bad_len_sig, j);
550         }
551
552         if (task->sig_pos) {
553                 for (uint16_t j = 0; j < n_pkts; ++j) {
554                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
555                                 continue;
556                         // Remember those packets with bad signature
557                         if (likely(*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig))
558                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
559                         else {
560                                 BIT64_SET(pkt_bad_len_sig, j);
561                                 non_dp_count++;
562                         }
563                 }
564         } else {
565                 for (uint16_t j = 0; j < n_pkts; ++j) {
566                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
567                                 continue;
568                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
569                 }
570         }
571
572         uint32_t bytes_total_in_bulk = 0;
573         // Find RX time of first packet, for RX accuracy
574         for (uint16_t j = 0; j < n_pkts; ++j) {
575                 uint16_t flipped = n_pkts - 1 - j;
576
577                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
578                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
579         }
580
581         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
582
583         uint64_t rx_time_err;
584         uint64_t pkt_rx_time64 = tsc_extrapolate_backward(task->link_speed, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
585         if (unlikely((task->begin >> LATENCY_ACCURACY) > pkt_rx_time64)) {
586                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
587                 rx_time_err = pkt_rx_time64 - (task->last_pkts_tsc >> LATENCY_ACCURACY);
588         } else {
589                 rx_time_err = pkt_rx_time64 - (task->begin >> LATENCY_ACCURACY);
590         }
591
592         TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, non_dp_count);
593         for (uint16_t j = 0; j < n_pkts; ++j) {
594                 // Used to display % of packets within accuracy limit vs. total number of packets (used_col)
595                 task->lat_test->tot_all_pkts++;
596
597                 // Skip those packets with bad length or bad signature
598                 if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
599                         continue;
600
601                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
602                 uint8_t *hdr = rx_pkt_meta->hdr;
603
604                 uint32_t pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
605                 uint32_t pkt_tx_time = rx_pkt_meta->pkt_tx_time;
606
607                 uint8_t generator_id;
608                 uint32_t packet_id;
609                 if (task->unique_id_pos) {
610                         struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
611                         unique_id_get(unique_id, &generator_id, &packet_id);
612
613                         if (unlikely(generator_id >= task->generator_count)) {
614                                 /* No need to remember unexpected packet at this stage
615                                 BIT64_SET(pkt_bad_len_sig, j);
616                                 */
617                                 // Skip unexpected packet
618                                 continue;
619                         }
620
621                         lat_test_add_lost(task->lat_test, task_lat_early_loss_detect(task, packet_id, generator_id));
622                 } else {
623                         generator_id = 0;
624                         packet_id = task->rx_packet_index;
625                 }
626
627                 /* If accuracy is enabled, latency is reported with a
628                    delay of ACCURACY_BUFFER_SIZE packets since the generator puts the
629                    accuracy for packet N into packet N + ACCURACY_BUFFER_SIZE. The delay
630                    ensures that all reported latencies have both rx
631                    and tx error. */
632                 if (task->accur_pos) {
633                         uint32_t tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
634
635                         struct delayed_latency_entry *delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_BUFFER_SIZE);
636
637                         if (delayed_latency_entry) {
638                                 task_lat_store_lat(task,
639                                                    delayed_latency_entry->rx_packet_id,
640                                                    delayed_latency_entry->pkt_rx_time,
641                                                    delayed_latency_entry->pkt_tx_time,
642                                                    delayed_latency_entry->rx_time_err,
643                                                    tx_time_err,
644                                                    delayed_latency_entry->tx_packet_id,
645                                                    delayed_latency_entry->generator_id);
646                         }
647
648                         delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
649                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
650                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
651                         delayed_latency_entry->rx_time_err = rx_time_err;
652                         delayed_latency_entry->rx_packet_id = task->rx_packet_index;
653                         delayed_latency_entry->tx_packet_id = packet_id;
654                         delayed_latency_entry->generator_id = generator_id;
655                 } else {
656                         task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
657                 }
658
659                 // Bad/unexpected packets do not need to be indexed
660                 task->rx_packet_index++;
661         }
662
663         task->begin = tbase->aux->tsc_rx.before;
664         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
665
666         rc = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
667         // non_dp_count should not be drop-handled, as there are all by definition considered as not handled
668         // RX = DISCARDED + HANDLED + NON_DP + (TX - TX_NON_DP) + TX_FAIL
669         TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, -non_dp_count);
670         return rc;
671 }
672
673 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
674 {
675         const int socket_id = rte_lcore_to_socket_id(core_id);
676         char name[256];
677         size_t latency_buffer_mem_size = 0;
678
679         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
680                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
681
682         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
683
684         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
685         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
686
687         sprintf(name, "latency.rx_%u.txt", core_id);
688         task->fp_rx = fopen(name, "w+");
689         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
690
691         sprintf(name, "latency.tx_%u.txt", core_id);
692         task->fp_tx = fopen(name, "w+");
693         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
694
695         task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
696         PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
697 }
698
699 static void task_init_generator_count(struct task_lat *task)
700 {
701         uint8_t *generator_count = prox_sh_find_system("generator_count");
702
703         if (generator_count == NULL) {
704                 task->generator_count = 1;
705                 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
706         } else
707                 task->generator_count = *generator_count;
708         plog_info("\tLatency using %u generators\n", task->generator_count);
709 }
710
711 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
712 {
713         size_t eld_mem_size;
714
715         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
716         task->eld = prox_zmalloc(eld_mem_size, socket_id);
717         PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
718 }
719
720 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
721 {
722         task->limit = nsec_to_tsc(accuracy_limit_nsec);
723 }
724
725 static void lat_start(struct task_base *tbase)
726 {
727         struct task_lat *task = (struct task_lat *)tbase;
728
729         if (task->port) {
730                 // task->port->link_speed reports the link speed in Mbps e.g. 40k for a 40 Gbps NIC.
731                 // task->link_speed reports link speed in Bytes per sec.
732                 // It can be 0 if link is down, and must hence be updated in fast path.
733                 task->link_speed = task->port->link_speed * 125000L;
734                 if (task->link_speed)
735                         plog_info("\tPort %u: link speed is %ld Mbps\n",
736                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
737                 else
738                         plog_info("\tPort %u: link speed is %ld Mbps - link might be down\n",
739                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
740         }
741 }
742
743 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
744 {
745         struct task_lat *task = (struct task_lat *)tbase;
746         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
747
748         task->lat_pos = targ->lat_pos;
749         task->accur_pos = targ->accur_pos;
750         task->sig_pos = targ->sig_pos;
751         task->sig = targ->sig;
752
753         task->unique_id_pos = targ->packet_id_pos;
754         task->latency_buffer_size = targ->latency_buffer_size;
755
756         PROX_PANIC(task->lat_pos == 0, "Missing 'lat pos' parameter in config file\n");
757         uint16_t min_pkt_len = task->lat_pos + sizeof(uint32_t);
758         if (task->unique_id_pos && (
759                 min_pkt_len < task->unique_id_pos + sizeof(struct unique_id)))
760                 min_pkt_len = task->unique_id_pos + sizeof(struct unique_id);
761         if (task->accur_pos && (
762                 min_pkt_len < task->accur_pos + sizeof(uint32_t)))
763                 min_pkt_len = task->accur_pos + sizeof(uint32_t);
764         if (task->sig_pos && (
765                 min_pkt_len < task->sig_pos + sizeof(uint32_t)))
766                 min_pkt_len = task->sig_pos + sizeof(uint32_t);
767         task->min_pkt_len = min_pkt_len;
768
769         task_init_generator_count(task);
770
771         if (task->latency_buffer_size) {
772                 init_task_lat_latency_buffer(task, targ->lconf->id);
773         }
774
775         if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
776                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
777         }
778
779         if (task->accur_pos) {
780                 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
781                 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
782                 for (uint i = 0; i < task->generator_count; i++) {
783                         task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
784                         PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
785                 }
786                 if (task->unique_id_pos == 0) {
787                         /* When using accuracy feature, the accuracy from TX is written ACCURACY_BUFFER_SIZE packets later
788                         * We can only retrieve the good packet if a packet id is written to it.
789                         * Otherwise we will use the packet RECEIVED ACCURACY_BUFFER_SIZE packets ago which is OK if
790                         * packets are not re-ordered. If packets are re-ordered, then the matching between
791                         * the tx accuracy znd the latency is wrong.
792                         */
793                         plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
794                 }
795         }
796
797         task->lt[0].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
798         task->lt[1].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
799         if (task->unique_id_pos) {
800                 task_lat_init_eld(task, socket_id);
801                 task_lat_reset_eld(task);
802         }
803         task->lat_test = &task->lt[task->using_lt];
804
805         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
806         task->rx_pkt_meta = prox_zmalloc(MAX_RX_PKT_ALL * sizeof(*task->rx_pkt_meta), socket_id);
807         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
808
809         task->link_speed = UINT64_MAX;
810         if (targ->nb_rxports) {
811                 // task->port structure is only used while starting handle_lat to get the link_speed.
812                 // link_speed can not be quiried at init as the port has not been initialized yet.
813                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
814                 task->port = port;
815         }
816 }
817
818 static struct task_init task_init_lat = {
819         .mode_str = "lat",
820         .init = init_task_lat,
821         .handle = handle_lat_bulk,
822         .start = lat_start,
823         .stop = lat_stop,
824         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_RX_ALL | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
825         .size = sizeof(struct task_lat)
826 };
827
828 __attribute__((constructor)) static void reg_task_lat(void)
829 {
830         reg_task(&task_init_lat);
831 }