Merge "PROX generator: performance optimization (3/4)"
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     10
38 #define ACCURACY_BUFFER_SIZE    64
39
40 struct lat_info {
41         uint32_t rx_packet_index;
42         uint64_t tx_packet_index;
43         uint32_t tx_err;
44         uint32_t rx_err;
45         uint64_t rx_time;
46         uint64_t tx_time;
47         uint16_t port_queue_id;
48 #ifdef LAT_DEBUG
49         uint16_t id_in_bulk;
50         uint16_t bulk_size;
51         uint64_t begin;
52         uint64_t after;
53         uint64_t before;
54 #endif
55 };
56
57 struct delayed_latency_entry {
58         uint32_t rx_packet_id;
59         uint32_t tx_packet_id;
60         uint32_t packet_id;
61         uint8_t generator_id;
62         uint64_t pkt_rx_time;
63         uint64_t pkt_tx_time;
64         uint64_t rx_time_err;
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
68 {
69         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70         if (delayed_latency_entry->packet_id == packet_id)
71                 return delayed_latency_entry;
72         else
73                 return NULL;
74 }
75
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
77 {
78         struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79         delayed_latency_entry->packet_id = packet_id;
80         return delayed_latency_entry;
81 }
82
83 struct rx_pkt_meta_data {
84         uint8_t  *hdr;
85         uint32_t pkt_tx_time;
86         uint32_t bytes_after_in_bulk;
87 };
88
89 struct task_lat {
90         struct task_base base;
91         uint64_t limit;
92         uint64_t rx_packet_index;
93         uint64_t last_pkts_tsc;
94         struct delayed_latency_entry **delayed_latency_entries;
95         struct lat_info *latency_buffer;
96         uint32_t latency_buffer_idx;
97         uint32_t latency_buffer_size;
98         uint64_t begin;
99         uint16_t lat_pos;
100         uint16_t unique_id_pos;
101         uint16_t accur_pos;
102         uint16_t sig_pos;
103         uint32_t sig;
104         volatile uint16_t use_lt; /* which lt to use, */
105         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
106         struct lat_test lt[2];
107         struct lat_test *lat_test;
108         uint32_t generator_count;
109         uint16_t min_pkt_len;
110         struct early_loss_detect *eld;
111         struct rx_pkt_meta_data *rx_pkt_meta;
112         // Following fields are only used when starting or stopping, not in general runtime
113         uint64_t *prev_tx_packet_index;
114         FILE *fp_rx;
115         FILE *fp_tx;
116         struct prox_port_cfg *port;
117         uint64_t *bytes_to_tsc;
118 };
119 /* This function calculate the difference between rx and tx_time
120  * Both values are uint32_t (see handle_lat_bulk)
121  * rx time should be higher than tx_time...except every UINT32_MAX
122  * cycles, when rx_time overflows.
123  * As the return value is also uint32_t, returning (rx_time - tx_time)
124  * is also fine when it overflows.
125  */
126 static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
127 {
128         return rx_time - tx_time;
129 }
130
131 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
132 {
133         if (task->use_lt == task->using_lt)
134                 return &task->lt[!task->using_lt];
135         return NULL;
136 }
137
138 void task_lat_use_other_latency_meassurement(struct task_lat *task)
139 {
140         task->use_lt = !task->using_lt;
141 }
142
143 static void task_lat_update_lat_test(struct task_lat *task)
144 {
145         if (task->use_lt != task->using_lt) {
146                 task->using_lt = task->use_lt;
147                 task->lat_test = &task->lt[task->using_lt];
148                 task->lat_test->accuracy_limit_tsc = task->limit;
149         }
150 }
151
152 static int compare_tx_time(const void *val1, const void *val2)
153 {
154         const struct lat_info *ptr1 = val1;
155         const struct lat_info *ptr2 = val2;
156
157         return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
158 }
159
160 static int compare_tx_packet_index(const void *val1, const void *val2)
161 {
162         const struct lat_info *ptr1 = val1;
163         const struct lat_info *ptr2 = val2;
164
165         return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
166 }
167
168 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
169 {
170         uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
171         uint32_t small = UINT32_MAX >> 1;
172
173         lat++;
174
175         /* Buffer is sorted so far by RX time.
176          * We might have packets being reordered by SUT.
177          *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
178          * Note that:
179          *      - overflow only happens if receiving and storing 4 billions packets...
180          *      - a absolute difference of less than 2 billion packets is not considered as an overflow
181          */
182         for (uint32_t i = 1; i < count; i++) {
183                 tx_packet_index = lat->tx_packet_index;
184                 if (tx_packet_index > old_tx_packet_index) {
185                         if (tx_packet_index - old_tx_packet_index < small) {
186                                 // The diff is small => increasing index count
187                         } else {
188                                 // The diff is big => it is more likely that the previous packet was overflow
189                                 n_overflow--;
190                         }
191                 } else {
192                         if (old_tx_packet_index - tx_packet_index < small) {
193                                 // The diff is small => packet reorder
194                         } else {
195                                 // The diff is big => it is more likely that this is an overflow
196                                 n_overflow++;
197                         }
198                 }
199                 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
200                 old_tx_packet_index = tx_packet_index;
201                 lat++;
202         }
203 }
204
205 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
206 {
207         uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
208         uint32_t small = UINT32_MAX >> 1;
209         lat++;
210
211         /*
212          * Same algorithm as above, but with time.
213          * Note that:
214          *      - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
215          *      - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
216          *              => algorithm does not work if receiving less than 1 packet every 2 seconds
217          */
218         for (uint32_t i = 1; i < count; i++) {
219                 tx_time = lat->tx_time;
220                 if (tx_time > old_tx_time) {
221                         if (tx_time - old_tx_time > small) {
222                                 n_overflow--;
223                         }
224                 } else {
225                         if (old_tx_time - tx_time > small) {
226                                 n_overflow++;
227                         }
228                 }
229                 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
230                 old_tx_time = tx_time;
231                 lat++;
232         }
233 }
234
235 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
236 {
237         struct lat_test *lat_test = task->lat_test;
238
239         for (uint32_t j = 0; j < task->generator_count; j++) {
240                 struct early_loss_detect *eld = &task->eld[j];
241
242                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
243         }
244 }
245
246 static void task_lat_reset_eld(struct task_lat *task)
247 {
248         for (uint32_t j = 0; j < task->generator_count; j++) {
249                 early_loss_detect_reset(&task->eld[j]);
250         }
251 }
252
253 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
254 {
255         uint64_t min_tsc = UINT64_MAX;
256
257         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
258                 if (min_tsc > task->latency_buffer[i].tx_time)
259                         min_tsc = task->latency_buffer[i].tx_time;
260         }
261
262         return min_tsc << LATENCY_ACCURACY;
263 }
264
265 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
266 {
267         uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
268
269         return lat << LATENCY_ACCURACY;
270 }
271
272 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
273 {
274         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
275 }
276
277 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
278 {
279         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
280 }
281
282 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
283 {
284         return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
285 }
286
287 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
288 {
289         return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
290 }
291
292 static void lat_write_latency_to_file(struct task_lat *task)
293 {
294         uint64_t min_tsc;
295         uint64_t n_loss;
296
297         min_tsc = lat_latency_buffer_get_min_tsc(task);
298
299         // Dumping all packet statistics
300         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
301         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
302         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
303                 struct lat_info *lat_info = &task->latency_buffer[i];
304                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
305                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
306                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
307
308                 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
309                         lat_info->rx_packet_index,
310                         lat_info->port_queue_id,
311                         lat_info->tx_packet_index,
312                         tsc_to_nsec(lat_tsc),
313                         tsc_to_nsec(rx_tsc - min_tsc),
314                         tsc_to_nsec(tx_tsc - min_tsc));
315         }
316
317         // To detect dropped packets, we need to sort them based on TX
318         if (task->unique_id_pos) {
319                 plogx_info("Adapting tx_packet_index\n");
320                 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
321                 plogx_info("Sorting packets based on tx_packet_index\n");
322                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
323                 plogx_info("Sorted packets based on packet_index\n");
324         } else {
325                 plogx_info("Adapting tx_time\n");
326                 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
327                 plogx_info("Sorting packets based on tx_time\n");
328                 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
329                 plogx_info("Sorted packets based on packet_time\n");
330         }
331
332         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
333         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
334         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
335
336         for (uint32_t i = 0; i < task->generator_count;i++)
337                 task->prev_tx_packet_index[i] = -1;
338
339         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
340                 struct lat_info *lat_info = &task->latency_buffer[i];
341                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
342                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
343                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
344                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
345                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
346
347                 /* Packet n + ACCURACY_BUFFER_SIZE delivers the TX error for packet n,
348                    hence the last ACCURACY_BUFFER_SIZE packets do no have TX error. */
349                 if (i + ACCURACY_BUFFER_SIZE >= task->latency_buffer_idx) {
350                         tx_err_tsc = 0;
351                 }
352
353                 if (lat_info->port_queue_id >= task->generator_count) {
354                         plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
355                                 lat_info->port_queue_id, lat_info->tx_packet_index);
356                         continue;
357                 }
358                 // Log dropped packet
359                 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
360                 if (n_loss)
361                         fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
362                                 lat_info->port_queue_id,
363                                 lat_info->tx_packet_index - n_loss, n_loss);
364                 // Log next packet
365                 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
366                         lat_info->port_queue_id,
367                         lat_info->tx_packet_index,
368                         lat_info->rx_packet_index,
369                         tsc_to_nsec(lat_tsc),
370                         tsc_to_nsec(tx_tsc - min_tsc),
371                         tsc_to_nsec(rx_tsc - min_tsc),
372                         tsc_to_nsec(tx_err_tsc),
373                         tsc_to_nsec(rx_err_tsc));
374 #ifdef LAT_DEBUG
375                 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
376                         lat_info->id_in_bulk,
377                         lat_info->bulk_size,
378                         tsc_to_nsec(lat_info->begin - min_tsc),
379                         tsc_to_nsec(lat_info->before - min_tsc),
380                         tsc_to_nsec(lat_info->after - min_tsc));
381 #endif
382                 fprintf(task->fp_tx, "\n");
383                 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
384         }
385         fflush(task->fp_rx);
386         fflush(task->fp_tx);
387         task->latency_buffer_idx = 0;
388 }
389
390 static void lat_stop(struct task_base *tbase)
391 {
392         struct task_lat *task = (struct task_lat *)tbase;
393
394         if (task->unique_id_pos) {
395                 task_lat_count_remaining_lost_packets(task);
396                 task_lat_reset_eld(task);
397         }
398         if (task->latency_buffer)
399                 lat_write_latency_to_file(task);
400 }
401
402 #ifdef LAT_DEBUG
403 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
404 {
405         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
406
407         lat_info->bulk_size = bulk_size;
408         lat_info->id_in_bulk = id_in_bulk;
409         lat_info->begin = task->begin;
410         lat_info->before = task->base.aux->tsc_rx.before;
411         lat_info->after = task->base.aux->tsc_rx.after;
412 }
413 #endif
414
415 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
416 {
417         struct lat_info *lat_info;
418
419         /* If unique_id_pos is specified then latency is stored per
420            packet being sent. Lost packets are detected runtime, and
421            latency stored for those packets will be 0 */
422         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
423         lat_info->rx_packet_index = rx_packet_index;
424         lat_info->tx_packet_index = packet_id;
425         lat_info->port_queue_id = generator_id;
426         lat_info->rx_time = rx_time;
427         lat_info->tx_time = tx_time;
428         lat_info->rx_err = rx_err;
429         lat_info->tx_err = tx_err;
430 }
431
432 static uint32_t task_lat_early_loss_detect(struct task_lat *task, uint32_t packet_id, uint8_t generator_id)
433 {
434         struct early_loss_detect *eld = &task->eld[generator_id];
435         return early_loss_detect_add(eld, packet_id);
436 }
437
438 static uint64_t tsc_extrapolate_backward(struct task_lat *task, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
439 {
440 #ifdef NO_EXTRAPOLATION
441         uint64_t tsc = tsc_from;
442 #else
443         uint64_t tsc = tsc_from - task->bytes_to_tsc[bytes];
444 #endif
445         if (likely(tsc > tsc_minimum))
446                 return tsc;
447         else
448                 return tsc_minimum;
449 }
450
451 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
452 {
453         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
454         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
455
456         bucket_id = bucket_id < bucket_count? bucket_id : bucket_count;
457         lat_test->buckets[bucket_id]++;
458 }
459
460 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
461 {
462         lat_test->lost_packets += lost_packets;
463 }
464
465 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
466 {
467         if (error > lat_test->accuracy_limit_tsc)
468                 return;
469         lat_test->tot_pkts++;
470
471         lat_test->tot_lat += lat_tsc;
472         lat_test->tot_lat_error += error;
473
474         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
475         lat_test->var_lat += lat_tsc * lat_tsc;
476         lat_test->var_lat_error += 2 * lat_tsc * error;
477         lat_test->var_lat_error += error * error;
478
479         if (lat_tsc > lat_test->max_lat) {
480                 lat_test->max_lat = lat_tsc;
481                 lat_test->max_lat_error = error;
482         }
483         if (lat_tsc < lat_test->min_lat) {
484                 lat_test->min_lat = lat_tsc;
485                 lat_test->min_lat_error = error;
486         }
487
488 #ifdef LATENCY_HISTOGRAM
489         lat_test_histogram_add(lat_test, lat_tsc);
490 #endif
491 }
492
493 static int task_lat_can_store_latency(struct task_lat *task)
494 {
495         return task->latency_buffer_idx < task->latency_buffer_size;
496 }
497
498 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
499 {
500         uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
501
502         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
503
504         if (task_lat_can_store_latency(task)) {
505                 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
506         }
507 }
508
509 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
510 {
511         struct task_lat *task = (struct task_lat *)tbase;
512         int rc;
513
514         if (n_pkts == 0) {
515                 task->begin = tbase->aux->tsc_rx.before;
516                 return 0;
517         }
518
519         task_lat_update_lat_test(task);
520
521         // Remember those packets with bad length or bad signature
522         uint32_t non_dp_count = 0;
523         uint64_t pkt_bad_len_sig = 0;
524 #define BIT64_SET(a64, bit)     a64 |=  (((uint64_t)1) << (bit & 63))
525 #define BIT64_CLR(a64, bit)     a64 &= ~(((uint64_t)1) << (bit & 63))
526 #define BIT64_TEST(a64, bit)    a64  &  (((uint64_t)1) << (bit & 63))
527
528         /* Go once through all received packets and read them.  If
529            packet has just been modified by another core, the cost of
530            latency will be partialy amortized though the bulk size */
531         for (uint16_t j = 0; j < n_pkts; ++j) {
532                 struct rte_mbuf *mbuf = mbufs[j];
533                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
534
535                 // Remember those packets which are too short to hold the values that we expect
536                 if (unlikely(rte_pktmbuf_pkt_len(mbuf) < task->min_pkt_len)) {
537                         BIT64_SET(pkt_bad_len_sig, j);
538                         non_dp_count++;
539                 } else
540                         BIT64_CLR(pkt_bad_len_sig, j);
541         }
542
543         if (task->sig_pos) {
544                 for (uint16_t j = 0; j < n_pkts; ++j) {
545                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
546                                 continue;
547                         // Remember those packets with bad signature
548                         if (likely(*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig))
549                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
550                         else {
551                                 BIT64_SET(pkt_bad_len_sig, j);
552                                 non_dp_count++;
553                         }
554                 }
555         } else {
556                 for (uint16_t j = 0; j < n_pkts; ++j) {
557                         if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
558                                 continue;
559                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
560                 }
561         }
562
563         uint32_t bytes_total_in_bulk = 0;
564         // Find RX time of first packet, for RX accuracy
565         for (uint16_t j = 0; j < n_pkts; ++j) {
566                 uint16_t flipped = n_pkts - 1 - j;
567
568                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
569                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
570         }
571
572         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
573
574         uint64_t rx_time_err;
575         uint64_t pkt_rx_time64 = tsc_extrapolate_backward(task, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
576         if (unlikely((task->begin >> LATENCY_ACCURACY) > pkt_rx_time64)) {
577                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
578                 rx_time_err = pkt_rx_time64 - (task->last_pkts_tsc >> LATENCY_ACCURACY);
579         } else {
580                 rx_time_err = pkt_rx_time64 - (task->begin >> LATENCY_ACCURACY);
581         }
582
583         TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, non_dp_count);
584         for (uint16_t j = 0; j < n_pkts; ++j) {
585                 // Used to display % of packets within accuracy limit vs. total number of packets (used_col)
586                 task->lat_test->tot_all_pkts++;
587
588                 // Skip those packets with bad length or bad signature
589                 if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
590                         continue;
591
592                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
593                 uint8_t *hdr = rx_pkt_meta->hdr;
594
595                 uint32_t pkt_rx_time = tsc_extrapolate_backward(task, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
596                 uint32_t pkt_tx_time = rx_pkt_meta->pkt_tx_time;
597
598                 uint8_t generator_id;
599                 uint32_t packet_id;
600                 if (task->unique_id_pos) {
601                         struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
602                         unique_id_get(unique_id, &generator_id, &packet_id);
603
604                         if (unlikely(generator_id >= task->generator_count)) {
605                                 /* No need to remember unexpected packet at this stage
606                                 BIT64_SET(pkt_bad_len_sig, j);
607                                 */
608                                 // Skip unexpected packet
609                                 continue;
610                         }
611
612                         lat_test_add_lost(task->lat_test, task_lat_early_loss_detect(task, packet_id, generator_id));
613                 } else {
614                         generator_id = 0;
615                         packet_id = task->rx_packet_index;
616                 }
617
618                 /* If accuracy is enabled, latency is reported with a
619                    delay of ACCURACY_BUFFER_SIZE packets since the generator puts the
620                    accuracy for packet N into packet N + ACCURACY_BUFFER_SIZE. The delay
621                    ensures that all reported latencies have both rx
622                    and tx error. */
623                 if (task->accur_pos) {
624                         uint32_t tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
625
626                         struct delayed_latency_entry *delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_BUFFER_SIZE);
627
628                         if (delayed_latency_entry) {
629                                 task_lat_store_lat(task,
630                                                    delayed_latency_entry->rx_packet_id,
631                                                    delayed_latency_entry->pkt_rx_time,
632                                                    delayed_latency_entry->pkt_tx_time,
633                                                    delayed_latency_entry->rx_time_err,
634                                                    tx_time_err,
635                                                    delayed_latency_entry->tx_packet_id,
636                                                    delayed_latency_entry->generator_id);
637                         }
638
639                         delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
640                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
641                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
642                         delayed_latency_entry->rx_time_err = rx_time_err;
643                         delayed_latency_entry->rx_packet_id = task->rx_packet_index;
644                         delayed_latency_entry->tx_packet_id = packet_id;
645                         delayed_latency_entry->generator_id = generator_id;
646                 } else {
647                         task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
648                 }
649
650                 // Bad/unexpected packets do not need to be indexed
651                 task->rx_packet_index++;
652         }
653
654         if (n_pkts < MAX_PKT_BURST)
655                 task->begin = tbase->aux->tsc_rx.before;
656         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
657
658         rc = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
659         // non_dp_count should not be drop-handled, as there are all by definition considered as not handled
660         // RX = DISCARDED + HANDLED + NON_DP + (TX - TX_NON_DP) + TX_FAIL
661         TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, -non_dp_count);
662         return rc;
663 }
664
665 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
666 {
667         const int socket_id = rte_lcore_to_socket_id(core_id);
668         char name[256];
669         size_t latency_buffer_mem_size = 0;
670
671         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
672                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
673
674         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
675
676         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
677         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
678
679         sprintf(name, "latency.rx_%u.txt", core_id);
680         task->fp_rx = fopen(name, "w+");
681         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
682
683         sprintf(name, "latency.tx_%u.txt", core_id);
684         task->fp_tx = fopen(name, "w+");
685         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
686
687         task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
688         PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
689 }
690
691 static void task_init_generator_count(struct task_lat *task)
692 {
693         uint8_t *generator_count = prox_sh_find_system("generator_count");
694
695         if (generator_count == NULL) {
696                 task->generator_count = 1;
697                 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
698         } else
699                 task->generator_count = *generator_count;
700         plog_info("\tLatency using %u generators\n", task->generator_count);
701 }
702
703 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
704 {
705         size_t eld_mem_size;
706
707         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
708         task->eld = prox_zmalloc(eld_mem_size, socket_id);
709         PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
710 }
711
712 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
713 {
714         task->limit = nsec_to_tsc(accuracy_limit_nsec);
715 }
716
717 static void lat_start(struct task_base *tbase)
718 {
719         struct task_lat *task = (struct task_lat *)tbase;
720
721 }
722
723 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
724 {
725         struct task_lat *task = (struct task_lat *)tbase;
726         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
727
728         task->lat_pos = targ->lat_pos;
729         task->accur_pos = targ->accur_pos;
730         task->sig_pos = targ->sig_pos;
731         task->sig = targ->sig;
732
733         task->unique_id_pos = targ->packet_id_pos;
734         task->latency_buffer_size = targ->latency_buffer_size;
735
736         PROX_PANIC(task->lat_pos == 0, "Missing 'lat pos' parameter in config file\n");
737         uint16_t min_pkt_len = task->lat_pos + sizeof(uint32_t);
738         if (task->unique_id_pos && (
739                 min_pkt_len < task->unique_id_pos + sizeof(struct unique_id)))
740                 min_pkt_len = task->unique_id_pos + sizeof(struct unique_id);
741         if (task->accur_pos && (
742                 min_pkt_len < task->accur_pos + sizeof(uint32_t)))
743                 min_pkt_len = task->accur_pos + sizeof(uint32_t);
744         if (task->sig_pos && (
745                 min_pkt_len < task->sig_pos + sizeof(uint32_t)))
746                 min_pkt_len = task->sig_pos + sizeof(uint32_t);
747         task->min_pkt_len = min_pkt_len;
748
749         task_init_generator_count(task);
750
751         if (task->latency_buffer_size) {
752                 init_task_lat_latency_buffer(task, targ->lconf->id);
753         }
754
755         if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
756                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
757         }
758
759         if (task->accur_pos) {
760                 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
761                 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
762                 for (uint i = 0; i < task->generator_count; i++) {
763                         task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
764                         PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
765                 }
766                 if (task->unique_id_pos == 0) {
767                         /* When using accuracy feature, the accuracy from TX is written ACCURACY_BUFFER_SIZE packets later
768                         * We can only retrieve the good packet if a packet id is written to it.
769                         * Otherwise we will use the packet RECEIVED ACCURACY_BUFFER_SIZE packets ago which is OK if
770                         * packets are not re-ordered. If packets are re-ordered, then the matching between
771                         * the tx accuracy znd the latency is wrong.
772                         */
773                         plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
774                 }
775         }
776
777         task->lt[0].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
778         task->lt[1].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
779         if (task->unique_id_pos) {
780                 task_lat_init_eld(task, socket_id);
781                 task_lat_reset_eld(task);
782         }
783         task->lat_test = &task->lt[task->using_lt];
784
785         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
786         task->rx_pkt_meta = prox_zmalloc(MAX_PKT_BURST * sizeof(*task->rx_pkt_meta), socket_id);
787         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
788
789         uint32_t max_frame_size = MAX_PKT_SIZE;
790         uint64_t bytes_per_hz = UINT64_MAX;
791         if (targ->nb_rxports) {
792                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
793                 max_frame_size = port->mtu + ETHER_HDR_LEN + ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE;
794
795                 // port->max_link_speed reports the maximum, non negotiated ink speed in Mbps e.g. 40k for a 40 Gbps NIC.
796                 // It can be UINT32_MAX (virtual devices or not supported by DPDK < 16.04)
797                 if (port->max_link_speed != UINT32_MAX) {
798                         bytes_per_hz = port->max_link_speed * 125000L;
799                         plog_info("\tPort %u: max link speed is %ld Mbps\n",
800                                 (uint8_t)(port - prox_port_cfg), 8 * bytes_per_hz / 1000000);
801                 }
802         }
803         task->bytes_to_tsc = prox_zmalloc(max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST, rte_lcore_to_socket_id(targ->lconf->id));
804         PROX_PANIC(task->bytes_to_tsc == NULL,
805                 "Failed to allocate %u bytes (in huge pages) for bytes_to_tsc\n", max_frame_size);
806
807         for (unsigned int i = 0; i < max_frame_size * MAX_PKT_BURST ; i++) {
808                 if (bytes_per_hz == UINT64_MAX)
809                         task->bytes_to_tsc[i] = 0;
810                 else
811                         task->bytes_to_tsc[i] = (rte_get_tsc_hz() * i) / bytes_per_hz;
812         }
813 }
814
815 static struct task_init task_init_lat = {
816         .mode_str = "lat",
817         .init = init_task_lat,
818         .handle = handle_lat_bulk,
819         .start = lat_start,
820         .stop = lat_stop,
821         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
822         .size = sizeof(struct task_lat)
823 };
824
825 __attribute__((constructor)) static void reg_task_lat(void)
826 {
827         reg_task(&task_init_lat);
828 }