Merge "Support async operation in handle_esp"
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     10
38 #define ACCURACY_BUFFER_SIZE    64
39
40 struct lat_info {
41         uint32_t rx_packet_index;
42         uint64_t tx_packet_index;
43         uint32_t tx_err;
44         uint32_t rx_err;
45         uint64_t rx_time;
46         uint64_t tx_time;
47         uint16_t port_queue_id;
48 #ifdef LAT_DEBUG
49         uint16_t id_in_bulk;
50         uint16_t bulk_size;
51         uint64_t begin;
52         uint64_t after;
53         uint64_t before;
54 #endif
55 };
56
57 struct delayed_latency_entry {
58         uint32_t rx_packet_id;
59         uint32_t tx_packet_id;
60         uint32_t packet_id;
61         uint8_t generator_id;
62         uint64_t pkt_rx_time;
63         uint64_t pkt_tx_time;
64         uint64_t rx_time_err;
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
68 {
69         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70         if (delayed_latency_entry->packet_id == packet_id)
71                 return delayed_latency_entry;
72         else
73                 return NULL;
74 }
75
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
77 {
78         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79         delayed_latency_entry->packet_id = packet_id;
80         return delayed_latency_entry;
81 }
82
83 struct rx_pkt_meta_data {
84         uint8_t  *hdr;
85         uint32_t pkt_tx_time;
86         uint32_t bytes_after_in_bulk;
87 };
88
89 struct task_lat {
90         struct task_base base;
91         uint64_t limit;
92         uint64_t rx_packet_index;
93         uint64_t last_pkts_tsc;
94         struct delayed_latency_entry **delayed_latency_entries;
95         struct lat_info *latency_buffer;
96         uint32_t latency_buffer_idx;
97         uint32_t latency_buffer_size;
98         uint64_t begin;
99         uint16_t lat_pos;
100         uint16_t unique_id_pos;
101         uint16_t accur_pos;
102         uint16_t sig_pos;
103         uint32_t sig;
104         volatile uint16_t use_lt; /* which lt to use, */
105         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
106         struct lat_test lt[2];
107         struct lat_test *lat_test;
108         uint32_t generator_count;
109         struct early_loss_detect *eld;
110         struct rx_pkt_meta_data *rx_pkt_meta;
111         uint64_t link_speed;
112         // Following fields are only used when starting or stopping, not in general runtime
113         uint64_t *prev_tx_packet_index;
114         FILE *fp_rx;
115         FILE *fp_tx;
116         struct prox_port_cfg *port;
117 };
118 /* This function calculate the difference between rx and tx_time
119  * Both values are uint32_t (see handle_lat_bulk)
120  * rx time should be higher than tx_time...except every UINT32_MAX
121  * cycles, when rx_time overflows.
122  * As the return value is also uint32_t, returning (rx_time - tx_time)
123  * is also fine when it overflows.
124  */
125 static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
126 {
127         return rx_time - tx_time;
128 }
129
130 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
131 {
132         if (task->use_lt == task->using_lt)
133                 return &task->lt[!task->using_lt];
134         return NULL;
135 }
136
137 void task_lat_use_other_latency_meassurement(struct task_lat *task)
138 {
139         task->use_lt = !task->using_lt;
140 }
141
142 static void task_lat_update_lat_test(struct task_lat *task)
143 {
144         if (task->use_lt != task->using_lt) {
145                 task->using_lt = task->use_lt;
146                 task->lat_test = &task->lt[task->using_lt];
147                 task->lat_test->accuracy_limit_tsc = task->limit;
148         }
149 }
150
151 static int compare_tx_time(const void *val1, const void *val2)
152 {
153         const struct lat_info *ptr1 = val1;
154         const struct lat_info *ptr2 = val2;
155
156         return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
157 }
158
159 static int compare_tx_packet_index(const void *val1, const void *val2)
160 {
161         const struct lat_info *ptr1 = val1;
162         const struct lat_info *ptr2 = val2;
163
164         return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
165 }
166
167 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
168 {
169         uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
170         uint32_t small = UINT32_MAX >> 1;
171
172         lat++;
173
174         /* Buffer is sorted so far by RX time.
175          * We might have packets being reordered by SUT.
176          *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
177          * Note that:
178          *      - overflow only happens if receiving and storing 4 billions packets...
179          *      - a absolute difference of less than 2 billion packets is not considered as an overflow
180          */
181         for (uint32_t i = 1; i < count; i++) {
182                 tx_packet_index = lat->tx_packet_index;
183                 if (tx_packet_index > old_tx_packet_index) {
184                         if (tx_packet_index - old_tx_packet_index < small) {
185                                 // The diff is small => increasing index count
186                         } else {
187                                 // The diff is big => it is more likely that the previous packet was overflow
188                                 n_overflow--;
189                         }
190                 } else {
191                         if (old_tx_packet_index - tx_packet_index < small) {
192                                 // The diff is small => packet reorder
193                         } else {
194                                 // The diff is big => it is more likely that this is an overflow
195                                 n_overflow++;
196                         }
197                 }
198                 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
199                 old_tx_packet_index = tx_packet_index;
200                 lat++;
201         }
202 }
203
204 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
205 {
206         uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
207         uint32_t small = UINT32_MAX >> 1;
208         lat++;
209
210         /*
211          * Same algorithm as above, but with time.
212          * Note that:
213          *      - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
214          *      - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
215          *              => algorithm does not work if receiving less than 1 packet every 2 seconds
216          */
217         for (uint32_t i = 1; i < count; i++) {
218                 tx_time = lat->tx_time;
219                 if (tx_time > old_tx_time) {
220                         if (tx_time - old_tx_time > small) {
221                                 n_overflow--;
222                         }
223                 } else {
224                         if (old_tx_time - tx_time > small) {
225                                 n_overflow++;
226                         }
227                 }
228                 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
229                 old_tx_time = tx_time;
230                 lat++;
231         }
232 }
233
234 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
235 {
236         struct lat_test *lat_test = task->lat_test;
237
238         for (uint32_t j = 0; j < task->generator_count; j++) {
239                 struct early_loss_detect *eld = &task->eld[j];
240
241                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
242         }
243 }
244
245 static void task_lat_reset_eld(struct task_lat *task)
246 {
247         for (uint32_t j = 0; j < task->generator_count; j++) {
248                 early_loss_detect_reset(&task->eld[j]);
249         }
250 }
251
252 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
253 {
254         uint64_t min_tsc = UINT64_MAX;
255
256         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
257                 if (min_tsc > task->latency_buffer[i].tx_time)
258                         min_tsc = task->latency_buffer[i].tx_time;
259         }
260
261         return min_tsc << LATENCY_ACCURACY;
262 }
263
264 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
265 {
266         uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
267
268         return lat << LATENCY_ACCURACY;
269 }
270
271 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
272 {
273         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
274 }
275
276 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
277 {
278         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
279 }
280
281 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
282 {
283         return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
284 }
285
286 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
287 {
288         return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
289 }
290
291 static void lat_write_latency_to_file(struct task_lat *task)
292 {
293         uint64_t min_tsc;
294         uint64_t n_loss;
295
296         min_tsc = lat_latency_buffer_get_min_tsc(task);
297
298         // Dumping all packet statistics
299         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
300         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
301         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
302                 struct lat_info *lat_info = &task->latency_buffer[i];
303                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
304                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
305                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
306
307                 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
308                         lat_info->rx_packet_index,
309                         lat_info->port_queue_id,
310                         lat_info->tx_packet_index,
311                         tsc_to_nsec(lat_tsc),
312                         tsc_to_nsec(rx_tsc - min_tsc),
313                         tsc_to_nsec(tx_tsc - min_tsc));
314         }
315
316         // To detect dropped packets, we need to sort them based on TX
317         if (task->unique_id_pos) {
318                 plogx_info("Adapting tx_packet_index\n");
319                 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
320                 plogx_info("Sorting packets based on tx_packet_index\n");
321                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
322                 plogx_info("Sorted packets based on packet_index\n");
323         } else {
324                 plogx_info("Adapting tx_time\n");
325                 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
326                 plogx_info("Sorting packets based on tx_time\n");
327                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
328                 plogx_info("Sorted packets based on packet_time\n");
329         }
330
331         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
332         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
333         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
334
335         for (uint32_t i = 0; i < task->generator_count;i++)
336                 task->prev_tx_packet_index[i] = -1;
337
338         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
339                 struct lat_info *lat_info = &task->latency_buffer[i];
340                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
341                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
342                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
343                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
344                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
345
346                 /* Packet n + ACCURACY_BUFFER_SIZE delivers the TX error for packet n,
347                    hence the last ACCURACY_BUFFER_SIZE packets do no have TX error. */
348                 if (i + ACCURACY_BUFFER_SIZE >= task->latency_buffer_idx) {
349                         tx_err_tsc = 0;
350                 }
351
352                 if (lat_info->port_queue_id >= task->generator_count) {
353                         plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
354                                 lat_info->port_queue_id, lat_info->tx_packet_index);
355                         continue;
356                 }
357                 // Log dropped packet
358                 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
359                 if (n_loss)
360                         fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
361                                 lat_info->port_queue_id,
362                                 lat_info->tx_packet_index - n_loss, n_loss);
363                 // Log next packet
364                 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
365                         lat_info->port_queue_id,
366                         lat_info->tx_packet_index,
367                         lat_info->rx_packet_index,
368                         tsc_to_nsec(lat_tsc),
369                         tsc_to_nsec(tx_tsc - min_tsc),
370                         tsc_to_nsec(rx_tsc - min_tsc),
371                         tsc_to_nsec(tx_err_tsc),
372                         tsc_to_nsec(rx_err_tsc));
373 #ifdef LAT_DEBUG
374                 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
375                         lat_info->id_in_bulk,
376                         lat_info->bulk_size,
377                         tsc_to_nsec(lat_info->begin - min_tsc),
378                         tsc_to_nsec(lat_info->before - min_tsc),
379                         tsc_to_nsec(lat_info->after - min_tsc));
380 #endif
381                 fprintf(task->fp_tx, "\n");
382                 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
383         }
384         fflush(task->fp_rx);
385         fflush(task->fp_tx);
386         task->latency_buffer_idx = 0;
387 }
388
389 static void lat_stop(struct task_base *tbase)
390 {
391         struct task_lat *task = (struct task_lat *)tbase;
392
393         if (task->unique_id_pos) {
394                 task_lat_count_remaining_lost_packets(task);
395                 task_lat_reset_eld(task);
396         }
397         if (task->latency_buffer)
398                 lat_write_latency_to_file(task);
399 }
400
401 #ifdef LAT_DEBUG
402 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
403 {
404         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
405
406         lat_info->bulk_size = bulk_size;
407         lat_info->id_in_bulk = id_in_bulk;
408         lat_info->begin = task->begin;
409         lat_info->before = task->base.aux->tsc_rx.before;
410         lat_info->after = task->base.aux->tsc_rx.after;
411 }
412 #endif
413
414 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
415 {
416         struct lat_info *lat_info;
417
418         /* If unique_id_pos is specified then latency is stored per
419            packet being sent. Lost packets are detected runtime, and
420            latency stored for those packets will be 0 */
421         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
422         lat_info->rx_packet_index = rx_packet_index;
423         lat_info->tx_packet_index = packet_id;
424         lat_info->port_queue_id = generator_id;
425         lat_info->rx_time = rx_time;
426         lat_info->tx_time = tx_time;
427         lat_info->rx_err = rx_err;
428         lat_info->tx_err = tx_err;
429 }
430
431 static uint32_t task_lat_early_loss_detect(struct task_lat *task, struct unique_id *unique_id)
432 {
433         struct early_loss_detect *eld;
434         uint8_t generator_id;
435         uint32_t packet_index;
436
437         unique_id_get(unique_id, &generator_id, &packet_index);
438
439         if (generator_id >= task->generator_count)
440                 return 0;
441
442         eld = &task->eld[generator_id];
443
444         return early_loss_detect_add(eld, packet_index);
445 }
446
447 static uint64_t tsc_extrapolate_backward(uint64_t link_speed, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
448 {
449         uint64_t tsc = tsc_from - (rte_get_tsc_hz()*bytes)/link_speed;
450         if (likely(tsc > tsc_minimum))
451                 return tsc;
452         else
453                 return tsc_minimum;
454 }
455
456 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
457 {
458         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
459         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
460
461         bucket_id = bucket_id < bucket_count? bucket_id : bucket_count;
462         lat_test->buckets[bucket_id]++;
463 }
464
465 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
466 {
467         lat_test->lost_packets += lost_packets;
468 }
469
470 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
471 {
472         if (error > lat_test->accuracy_limit_tsc)
473                 return;
474         lat_test->tot_pkts++;
475
476         lat_test->tot_lat += lat_tsc;
477         lat_test->tot_lat_error += error;
478
479         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
480         lat_test->var_lat += lat_tsc * lat_tsc;
481         lat_test->var_lat_error += 2 * lat_tsc * error;
482         lat_test->var_lat_error += error * error;
483
484         if (lat_tsc > lat_test->max_lat) {
485                 lat_test->max_lat = lat_tsc;
486                 lat_test->max_lat_error = error;
487         }
488         if (lat_tsc < lat_test->min_lat) {
489                 lat_test->min_lat = lat_tsc;
490                 lat_test->min_lat_error = error;
491         }
492
493 #ifdef LATENCY_HISTOGRAM
494         lat_test_histogram_add(lat_test, lat_tsc);
495 #endif
496 }
497
498 static int task_lat_can_store_latency(struct task_lat *task)
499 {
500         return task->latency_buffer_idx < task->latency_buffer_size;
501 }
502
503 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
504 {
505         if (tx_time == 0)
506                 return;
507         uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
508
509         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
510
511         if (task_lat_can_store_latency(task)) {
512                 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
513         }
514 }
515
516 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
517 {
518         struct task_lat *task = (struct task_lat *)tbase;
519         uint64_t rx_time_err;
520
521         uint32_t pkt_rx_time, pkt_tx_time;
522
523         // If link is down, link_speed is 0
524         if (unlikely(task->link_speed == 0)) {
525                 if (task->port && task->port->link_speed != 0) {
526                         task->link_speed = task->port->link_speed * 125000L;
527                         plog_info("\tPort %u: link speed is %ld Mbps\n",
528                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
529                 } else
530                         return 0;
531         }
532
533         if (n_pkts == 0) {
534                 task->begin = tbase->aux->tsc_rx.before;
535                 return 0;
536         }
537
538         task_lat_update_lat_test(task);
539
540         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
541         uint32_t tx_time_err = 0;
542
543         /* Go once through all received packets and read them.  If
544            packet has just been modified by another core, the cost of
545            latency will be partialy amortized though the bulk size */
546         for (uint16_t j = 0; j < n_pkts; ++j) {
547                 struct rte_mbuf *mbuf = mbufs[j];
548                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
549         }
550
551         if (task->sig) {
552                 for (uint16_t j = 0; j < n_pkts; ++j) {
553                         if (*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig)
554                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
555                         else
556                                 task->rx_pkt_meta[j].pkt_tx_time = 0;
557                 }
558         } else {
559                 for (uint16_t j = 0; j < n_pkts; ++j) {
560                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
561                 }
562         }
563
564         uint32_t bytes_total_in_bulk = 0;
565         // Find RX time of first packet, for RX accuracy
566         for (uint16_t j = 0; j < n_pkts; ++j) {
567                 uint16_t flipped = n_pkts - 1 - j;
568
569                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
570                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
571         }
572
573         pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
574         if ((uint32_t)((task->begin >> LATENCY_ACCURACY)) > pkt_rx_time) {
575                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
576                 rx_time_err = pkt_rx_time - (uint32_t)(task->last_pkts_tsc >> LATENCY_ACCURACY);
577         } else {
578                 rx_time_err = pkt_rx_time - (uint32_t)(task->begin >> LATENCY_ACCURACY);
579         }
580
581         struct unique_id *unique_id = NULL;
582         struct delayed_latency_entry *delayed_latency_entry;
583         uint32_t packet_id, generator_id;
584
585         for (uint16_t j = 0; j < n_pkts; ++j) {
586                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
587                 uint8_t *hdr = rx_pkt_meta->hdr;
588
589                 pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
590                 pkt_tx_time = rx_pkt_meta->pkt_tx_time;
591
592                 if (task->unique_id_pos) {
593                         unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
594
595                         uint32_t n_loss = task_lat_early_loss_detect(task, unique_id);
596                         packet_id = unique_id->packet_id;
597                         generator_id = unique_id->generator_id;
598                         lat_test_add_lost(task->lat_test, n_loss);
599                 } else {
600                         packet_id = task->rx_packet_index;
601                         generator_id = 0;
602                 }
603                 task->lat_test->tot_all_pkts++;
604
605                 /* If accuracy is enabled, latency is reported with a
606                    delay of ACCURACY_BUFFER_SIZE packets since the generator puts the
607                    accuracy for packet N into packet N + ACCURACY_BUFFER_SIZE. The delay
608                    ensures that all reported latencies have both rx
609                    and tx error. */
610                 if (task->accur_pos) {
611                         tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
612
613                         delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_BUFFER_SIZE);
614
615                         if (delayed_latency_entry) {
616                                 task_lat_store_lat(task,
617                                                    delayed_latency_entry->rx_packet_id,
618                                                    delayed_latency_entry->pkt_rx_time,
619                                                    delayed_latency_entry->pkt_tx_time,
620                                                    delayed_latency_entry->rx_time_err,
621                                                    tx_time_err,
622                                                    delayed_latency_entry->tx_packet_id,
623                                                    delayed_latency_entry->generator_id);
624                         }
625
626                         delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
627                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
628                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
629                         delayed_latency_entry->rx_time_err = rx_time_err;
630                         delayed_latency_entry->rx_packet_id = task->rx_packet_index;
631                         delayed_latency_entry->tx_packet_id = packet_id;
632                         delayed_latency_entry->generator_id = generator_id;
633                 } else {
634                         task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
635                 }
636                 task->rx_packet_index++;
637         }
638         int ret;
639         ret = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
640         task->begin = tbase->aux->tsc_rx.before;
641         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
642         return ret;
643 }
644
645 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
646 {
647         const int socket_id = rte_lcore_to_socket_id(core_id);
648         char name[256];
649         size_t latency_buffer_mem_size = 0;
650
651         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
652                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
653
654         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
655
656         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
657         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
658
659         sprintf(name, "latency.rx_%u.txt", core_id);
660         task->fp_rx = fopen(name, "w+");
661         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
662
663         sprintf(name, "latency.tx_%u.txt", core_id);
664         task->fp_tx = fopen(name, "w+");
665         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
666
667         task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
668         PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
669 }
670
671 static void task_init_generator_count(struct task_lat *task)
672 {
673         uint8_t *generator_count = prox_sh_find_system("generator_count");
674
675         if (generator_count == NULL) {
676                 task->generator_count = 1;
677                 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
678         } else
679                 task->generator_count = *generator_count;
680         plog_info("\tLatency using %u generators\n", task->generator_count);
681 }
682
683 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
684 {
685         size_t eld_mem_size;
686
687         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
688         task->eld = prox_zmalloc(eld_mem_size, socket_id);
689         PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
690 }
691
692 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
693 {
694         task->limit = nsec_to_tsc(accuracy_limit_nsec);
695 }
696
697 static void lat_start(struct task_base *tbase)
698 {
699         struct task_lat *task = (struct task_lat *)tbase;
700
701         if (task->port) {
702                 // task->port->link_speed reports the link speed in Mbps e.g. 40k for a 40 Gbps NIC.
703                 // task->link_speed reports link speed in Bytes per sec.
704                 // It can be 0 if link is down, and must hence be updated in fast path.
705                 task->link_speed = task->port->link_speed * 125000L;
706                 if (task->link_speed)
707                         plog_info("\tPort %u: link speed is %ld Mbps\n",
708                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
709                 else
710                         plog_info("\tPort %u: link speed is %ld Mbps - link might be down\n",
711                                 (uint8_t)(task->port - prox_port_cfg), 8 * task->link_speed / 1000000);
712         }
713 }
714
715 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
716 {
717         struct task_lat *task = (struct task_lat *)tbase;
718         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
719
720         task->lat_pos = targ->lat_pos;
721         task->accur_pos = targ->accur_pos;
722         task->sig_pos = targ->sig_pos;
723         task->sig = targ->sig;
724
725         task->unique_id_pos = targ->packet_id_pos;
726         task->latency_buffer_size = targ->latency_buffer_size;
727
728         task_init_generator_count(task);
729
730         if (task->latency_buffer_size) {
731                 init_task_lat_latency_buffer(task, targ->lconf->id);
732         }
733
734         if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
735                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
736         }
737
738         if (task->accur_pos) {
739                 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
740                 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
741                 for (uint i = 0; i < task->generator_count; i++) {
742                         task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
743                         PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
744                 }
745                 if (task->unique_id_pos == 0) {
746                         /* When using accuracy feature, the accuracy from TX is written ACCURACY_BUFFER_SIZE packets later
747                         * We can only retrieve the good packet if a packet id is written to it.
748                         * Otherwise we will use the packet RECEIVED ACCURACY_BUFFER_SIZE packets ago which is OK if
749                         * packets are not re-ordered. If packets are re-ordered, then the matching between
750                         * the tx accuracy znd the latency is wrong.
751                         */
752                         plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
753                 }
754         }
755
756         task->lt[0].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
757         task->lt[1].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
758         if (task->unique_id_pos) {
759                 task_lat_init_eld(task, socket_id);
760                 task_lat_reset_eld(task);
761         }
762         task->lat_test = &task->lt[task->using_lt];
763
764         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
765         task->rx_pkt_meta = prox_zmalloc(MAX_RX_PKT_ALL * sizeof(*task->rx_pkt_meta), socket_id);
766         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
767
768         task->link_speed = UINT64_MAX;
769         if (targ->nb_rxports) {
770                 // task->port structure is only used while starting handle_lat to get the link_speed.
771                 // link_speed can not be quiried at init as the port has not been initialized yet.
772                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
773                 task->port = port;
774         }
775 }
776
777 static struct task_init task_init_lat = {
778         .mode_str = "lat",
779         .init = init_task_lat,
780         .handle = handle_lat_bulk,
781         .start = lat_start,
782         .stop = lat_stop,
783         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_RX_ALL | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
784         .size = sizeof(struct task_lat)
785 };
786
787 __attribute__((constructor)) static void reg_task_lat(void)
788 {
789         reg_task(&task_init_lat);
790 }