Fix extrapolation used in latency measurements
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
36
37 #define DEFAULT_BUCKET_SIZE     10
38
39 struct lat_info {
40         uint32_t rx_packet_index;
41         uint32_t tx_packet_index;
42         uint32_t tx_err;
43         uint32_t rx_err;
44         uint64_t rx_time;
45         uint64_t tx_time;
46         uint16_t port_queue_id;
47 #ifdef LAT_DEBUG
48         uint16_t id_in_bulk;
49         uint16_t bulk_size;
50         uint64_t begin;
51         uint64_t after;
52         uint64_t before;
53 #endif
54 };
55
56 struct delayed_latency_entry {
57         uint32_t rx_packet_idx;
58         uint64_t pkt_rx_time;
59         uint64_t pkt_tx_time;
60         uint64_t rx_time_err;
61 };
62
63 struct delayed_latency {
64         struct delayed_latency_entry entries[64];
65 };
66
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency *delayed_latency, uint32_t rx_packet_idx)
68 {
69         if (delayed_latency->entries[rx_packet_idx % 64].rx_packet_idx == rx_packet_idx)
70                 return &delayed_latency->entries[rx_packet_idx % 64];
71         else
72                 return NULL;
73 }
74
75 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency *delayed_latency, uint32_t rx_packet_idx)
76 {
77         delayed_latency->entries[rx_packet_idx % 64].rx_packet_idx = rx_packet_idx;
78         return &delayed_latency->entries[rx_packet_idx % 64];
79 }
80
81 struct rx_pkt_meta_data {
82         uint8_t  *hdr;
83         uint32_t pkt_tx_time;
84         uint32_t bytes_after_in_bulk;
85 };
86
87 struct task_lat {
88         struct task_base base;
89         uint64_t limit;
90         uint64_t rx_packet_index;
91         uint64_t last_pkts_tsc;
92         struct delayed_latency delayed_latency;
93         struct lat_info *latency_buffer;
94         uint32_t latency_buffer_idx;
95         uint32_t latency_buffer_size;
96         uint64_t begin;
97         uint16_t lat_pos;
98         uint16_t unique_id_pos;
99         uint16_t accur_pos;
100         uint16_t sig_pos;
101         uint32_t sig;
102         volatile uint16_t use_lt; /* which lt to use, */
103         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
104         struct lat_test lt[2];
105         struct lat_test *lat_test;
106         uint32_t generator_count;
107         struct early_loss_detect *eld;
108         struct rx_pkt_meta_data *rx_pkt_meta;
109         uint64_t link_speed;
110         FILE *fp_rx;
111         FILE *fp_tx;
112         struct prox_port_cfg *port;
113 };
114
115 static uint32_t abs_diff(uint32_t a, uint32_t b)
116 {
117        return a < b? UINT32_MAX - (b - a - 1) : a - b;
118 }
119
120 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
121 {
122         if (task->use_lt == task->using_lt)
123                 return &task->lt[!task->using_lt];
124         return NULL;
125 }
126
127 void task_lat_use_other_latency_meassurement(struct task_lat *task)
128 {
129         task->use_lt = !task->using_lt;
130 }
131
132 static void task_lat_update_lat_test(struct task_lat *task)
133 {
134         if (task->use_lt != task->using_lt) {
135                 task->using_lt = task->use_lt;
136                 task->lat_test = &task->lt[task->using_lt];
137                 task->lat_test->accuracy_limit_tsc = task->limit;
138         }
139 }
140
141 static int compare_tx_time(const void *val1, const void *val2)
142 {
143         const struct lat_info *ptr1 = val1;
144         const struct lat_info *ptr2 = val2;
145
146         return ptr1->tx_time - ptr2->tx_time;
147 }
148
149 static int compare_queue_id(const void *val1, const void *val2)
150 {
151         return compare_tx_time(val1, val2);
152 }
153
154 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
155 {
156         uint32_t id, time, old_id = 0, old_time = 0, n_overflow = 0;
157
158         for (uint32_t i = 0; i < count; i++) {
159                 id = lat->port_queue_id;
160                 time = lat->tx_time;
161                 if (id == old_id) {
162                         // Same queue id as previous entry; time should always increase
163                         if (time < old_time) {
164                                 n_overflow++;
165                         }
166                         lat->tx_time += UINT32_MAX * n_overflow;
167                         old_time = time;
168                 } else {
169                         // Different queue_id, time starts again at 0
170                         old_id = id;
171                         old_time = 0;
172                         n_overflow = 0;
173                 }
174         }
175 }
176
177 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
178 {
179         struct lat_test *lat_test = task->lat_test;
180
181         for (uint32_t j = 0; j < task->generator_count; j++) {
182                 struct early_loss_detect *eld = &task->eld[j];
183
184                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
185         }
186 }
187
188 static void task_lat_reset_eld(struct task_lat *task)
189 {
190         for (uint32_t j = 0; j < task->generator_count; j++) {
191                 early_loss_detect_reset(&task->eld[j]);
192         }
193 }
194
195 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
196 {
197         uint64_t min_tsc = UINT64_MAX;
198
199         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
200                 if (min_tsc > task->latency_buffer[i].tx_time)
201                         min_tsc = task->latency_buffer[i].tx_time;
202         }
203
204         return min_tsc << LATENCY_ACCURACY;
205 }
206
207 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
208 {
209         uint64_t lat = abs_diff(lat_info->rx_time, lat_info->tx_time);
210
211         return lat << LATENCY_ACCURACY;
212 }
213
214 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
215 {
216         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
217 }
218
219 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
220 {
221         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
222 }
223
224 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
225 {
226         return ((uint64_t)lat_info) << LATENCY_ACCURACY;
227 }
228
229 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
230 {
231         return ((uint64_t)lat_info) << LATENCY_ACCURACY;
232 }
233
234 static void lat_write_latency_to_file(struct task_lat *task)
235 {
236         uint64_t min_tsc;
237         uint32_t n_loss;
238
239         min_tsc = lat_latency_buffer_get_min_tsc(task);
240
241         // Dumping all packet statistics
242         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
243         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
244         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
245                 struct lat_info *lat_info = &task->latency_buffer[i];
246                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
247                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
248                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
249
250                 fprintf(task->fp_rx, "%u%d;%d;%ld;%lu;%lu\n",
251                         lat_info->rx_packet_index,
252                         lat_info->port_queue_id,
253                         lat_info->tx_packet_index,
254                         tsc_to_nsec(lat_tsc),
255                         tsc_to_nsec(rx_tsc - min_tsc),
256                         tsc_to_nsec(tx_tsc - min_tsc));
257         }
258
259         // To detect dropped packets, we need to sort them based on TX
260         plogx_info("Sorting packets based on queue_id\n");
261         qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_queue_id);
262         plogx_info("Adapting tx_time\n");
263         fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
264         plogx_info("Sorting packets based on tx_time\n");
265         qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
266         plogx_info("Sorted packets based on tx_time\n");
267
268         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
269         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
270         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
271
272         uint32_t prev_tx_packet_index = -1;
273         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
274                 struct lat_info *lat_info = &task->latency_buffer[i];
275                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
276                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
277                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
278                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
279                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
280
281                 /* Packet n + 64 delivers the TX error for packet n,
282                    hence the last 64 packets do no have TX error. */
283                 if (i + 64 >= task->latency_buffer_idx) {
284                         tx_err_tsc = 0;
285                 }
286                 // Log dropped packet
287                 n_loss = lat_info->tx_packet_index - prev_tx_packet_index - 1;
288                 if (n_loss)
289                         fprintf(task->fp_tx, "===> %d;%d;0;0;0;0; lost %d packets <===\n",
290                                 lat_info->port_queue_id,
291                                 lat_info->tx_packet_index - n_loss, n_loss);
292                 // Log next packet
293                 fprintf(task->fp_tx, "%d;%d;%u;%lu;%lu;%lu;%lu;%lu\n",
294                         lat_info->port_queue_id,
295                         lat_info->tx_packet_index,
296                         lat_info->rx_packet_index,
297                         tsc_to_nsec(lat_tsc),
298                         tsc_to_nsec(tx_tsc - min_tsc),
299                         tsc_to_nsec(rx_tsc - min_tsc),
300                         tsc_to_nsec(tx_err_tsc),
301                         tsc_to_nsec(rx_err_tsc));
302 #ifdef LAT_DEBUG
303                 fprintf(task->fp_tx, ";%d from %d;%lu;%lu;%lu",
304                         lat_info->id_in_bulk,
305                         lat_info->bulk_size,
306                         tsc_to_nsec(lat_info->begin - min_tsc),
307                         tsc_to_nsec(lat_info->before - min_tsc),
308                         tsc_to_nsec(lat_info->after - min_tsc));
309 #endif
310                 fprintf(task->fp_tx, "\n");
311                 prev_tx_packet_index = lat_info->tx_packet_index;
312         }
313         fflush(task->fp_rx);
314         fflush(task->fp_tx);
315         task->latency_buffer_idx = 0;
316 }
317
318 static void lat_stop(struct task_base *tbase)
319 {
320         struct task_lat *task = (struct task_lat *)tbase;
321
322         if (task->unique_id_pos) {
323                 task_lat_count_remaining_lost_packets(task);
324                 task_lat_reset_eld(task);
325         }
326         if (task->latency_buffer)
327                 lat_write_latency_to_file(task);
328 }
329
330 #ifdef LAT_DEBUG
331 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
332 {
333         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
334
335         lat_info->bulk_size = bulk_size;
336         lat_info->id_in_bulk = id_in_bulk;
337         lat_info->begin = task->begin;
338         lat_info->before = task->base.aux->tsc_rx.before;
339         lat_info->after = task->base.aux->tsc_rx.after;
340 }
341 #endif
342
343 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, struct unique_id *unique_id, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err)
344 {
345         struct lat_info *lat_info;
346         uint8_t generator_id = 0;
347         uint32_t packet_index = 0;
348
349         if (unique_id)
350                 unique_id_get(unique_id, &generator_id, &packet_index);
351
352         /* If unique_id_pos is specified then latency is stored per
353            packet being sent. Lost packets are detected runtime, and
354            latency stored for those packets will be 0 */
355         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
356         lat_info->rx_packet_index = task->latency_buffer_idx - 1;
357         lat_info->tx_packet_index = packet_index;
358         lat_info->port_queue_id = generator_id;
359         lat_info->rx_time = rx_time;
360         lat_info->tx_time = tx_time;
361         lat_info->rx_err = rx_err;
362         lat_info->tx_err = tx_err;
363 }
364
365 static uint32_t task_lat_early_loss_detect(struct task_lat *task, struct unique_id *unique_id)
366 {
367         struct early_loss_detect *eld;
368         uint8_t generator_id;
369         uint32_t packet_index;
370
371         unique_id_get(unique_id, &generator_id, &packet_index);
372
373         if (generator_id >= task->generator_count)
374                 return 0;
375
376         eld = &task->eld[generator_id];
377
378         return early_loss_detect_add(eld, packet_index);
379 }
380
381 static uint64_t tsc_extrapolate_backward(uint64_t link_speed, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
382 {
383         uint64_t tsc = tsc_from - (rte_get_tsc_hz()*bytes)/link_speed;
384         if (likely(tsc > tsc_minimum))
385                 return tsc;
386         else
387                 return tsc_minimum;
388 }
389
390 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
391 {
392         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
393         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
394
395         bucket_id = bucket_id < bucket_count? bucket_id : bucket_count;
396         lat_test->buckets[bucket_id]++;
397 }
398
399 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
400 {
401         lat_test->lost_packets += lost_packets;
402 }
403
404 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
405 {
406         lat_test->tot_all_pkts++;
407
408         if (error > lat_test->accuracy_limit_tsc)
409                 return;
410         lat_test->tot_pkts++;
411
412         lat_test->tot_lat += lat_tsc;
413         lat_test->tot_lat_error += error;
414
415         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
416         lat_test->var_lat += lat_tsc * lat_tsc;
417         lat_test->var_lat_error += 2 * lat_tsc * error;
418         lat_test->var_lat_error += error * error;
419
420         if (lat_tsc > lat_test->max_lat) {
421                 lat_test->max_lat = lat_tsc;
422                 lat_test->max_lat_error = error;
423         }
424         if (lat_tsc < lat_test->min_lat) {
425                 lat_test->min_lat = lat_tsc;
426                 lat_test->min_lat_error = error;
427         }
428
429 #ifdef LATENCY_HISTOGRAM
430         lat_test_histogram_add(lat_test, lat_tsc);
431 #endif
432 }
433
434 static int task_lat_can_store_latency(struct task_lat *task)
435 {
436         return task->latency_buffer_idx < task->latency_buffer_size;
437 }
438
439 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, struct unique_id *unique_id)
440 {
441         if (tx_time == 0)
442                 return;
443         uint32_t lat_tsc = abs_diff(rx_time, tx_time) << LATENCY_ACCURACY;
444
445         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
446
447         if (task_lat_can_store_latency(task)) {
448                 task_lat_store_lat_buf(task, rx_packet_index, unique_id, rx_time, tx_time, rx_error, tx_error);
449         }
450 }
451
452 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
453 {
454         struct task_lat *task = (struct task_lat *)tbase;
455         uint64_t rx_time_err;
456
457         uint32_t pkt_rx_time, pkt_tx_time;
458
459         if (n_pkts == 0) {
460                 task->begin = tbase->aux->tsc_rx.before;
461                 return 0;
462         }
463
464         task_lat_update_lat_test(task);
465
466         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
467         uint32_t tx_time_err = 0;
468
469         /* Go once through all received packets and read them.  If
470            packet has just been modified by another core, the cost of
471            latency will be partialy amortized though the bulk size */
472         for (uint16_t j = 0; j < n_pkts; ++j) {
473                 struct rte_mbuf *mbuf = mbufs[j];
474                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
475         }
476         for (uint16_t j = 0; j < n_pkts; ++j) {
477         }
478
479         if (task->sig) {
480                 for (uint16_t j = 0; j < n_pkts; ++j) {
481                         if (*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig)
482                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
483                         else
484                                 task->rx_pkt_meta[j].pkt_tx_time = 0;
485                 }
486         } else {
487                 for (uint16_t j = 0; j < n_pkts; ++j) {
488                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
489                 }
490         }
491
492         uint32_t bytes_total_in_bulk = 0;
493         // Find RX time of first packet, for RX accuracy
494         for (uint16_t j = 0; j < n_pkts; ++j) {
495                 uint16_t flipped = n_pkts - 1 - j;
496
497                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
498                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
499         }
500
501         pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
502         if ((uint32_t)((task->begin >> LATENCY_ACCURACY)) > pkt_rx_time) {
503                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
504                 rx_time_err = pkt_rx_time - (uint32_t)(task->last_pkts_tsc >> LATENCY_ACCURACY);
505         } else {
506                 rx_time_err = pkt_rx_time - (uint32_t)(task->begin >> LATENCY_ACCURACY);
507         }
508
509         struct unique_id *unique_id = NULL;
510         struct delayed_latency_entry *delayed_latency_entry;
511
512         for (uint16_t j = 0; j < n_pkts; ++j) {
513                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
514                 uint8_t *hdr = rx_pkt_meta->hdr;
515
516                 pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
517                 pkt_tx_time = rx_pkt_meta->pkt_tx_time;
518
519                 if (task->unique_id_pos) {
520                         unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
521
522                         uint32_t n_loss = task_lat_early_loss_detect(task, unique_id);
523                         lat_test_add_lost(task->lat_test, n_loss);
524                 }
525
526                 /* If accuracy is enabled, latency is reported with a
527                    delay of 64 packets since the generator puts the
528                    accuracy for packet N into packet N + 64. The delay
529                    ensures that all reported latencies have both rx
530                    and tx error. */
531                 if (task->accur_pos) {
532                         tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
533
534                         delayed_latency_entry = delayed_latency_get(&task->delayed_latency, task->rx_packet_index - 64);
535
536                         if (delayed_latency_entry) {
537                                 task_lat_store_lat(task,
538                                                    task->rx_packet_index,
539                                                    delayed_latency_entry->pkt_rx_time,
540                                                    delayed_latency_entry->pkt_tx_time,
541                                                    delayed_latency_entry->rx_time_err,
542                                                    tx_time_err,
543                                                    unique_id);
544                         }
545
546                         delayed_latency_entry = delayed_latency_create(&task->delayed_latency, task->rx_packet_index);
547                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
548                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
549                         delayed_latency_entry->rx_time_err = rx_time_err;
550                 } else {
551                         task_lat_store_lat(task,
552                                            task->rx_packet_index,
553                                            pkt_rx_time,
554                                            pkt_tx_time,
555                                            0,
556                                            0,
557                                            unique_id);
558                 }
559                 task->rx_packet_index++;
560         }
561         int ret;
562         ret = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
563         task->begin = tbase->aux->tsc_rx.before;
564         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
565         return ret;
566 }
567
568 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
569 {
570         const int socket_id = rte_lcore_to_socket_id(core_id);
571         char name[256];
572         size_t latency_buffer_mem_size = 0;
573
574         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
575                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
576
577         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
578
579         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
580         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %ld kbytes for %s\n", latency_buffer_mem_size / 1024, name);
581
582         sprintf(name, "latency.rx_%d.txt", core_id);
583         task->fp_rx = fopen(name, "w+");
584         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
585
586         sprintf(name, "latency.tx_%d.txt", core_id);
587         task->fp_tx = fopen(name, "w+");
588         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
589 }
590
591 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
592 {
593         uint8_t *generator_count = prox_sh_find_system("generator_count");
594         size_t eld_mem_size;
595
596         if (generator_count == NULL)
597                 task->generator_count = 0;
598         else
599                 task->generator_count = *generator_count;
600
601         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
602         task->eld = prox_zmalloc(eld_mem_size, socket_id);
603 }
604
605 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
606 {
607         task->limit = nsec_to_tsc(accuracy_limit_nsec);
608 }
609
610 static void lat_start(struct task_base *tbase)
611 {
612         struct task_lat *task = (struct task_lat *)tbase;
613
614         if (task->port) {
615                 // task->port->link->speed reports the link speed in Mbps e.g. 40k for a 40 Gbps NIC
616                 // task->link_speed reported link speed in Bytes per sec.
617                 task->link_speed = task->port->link_speed * 125000L;
618                 plog_info("\tReceiving at %ld Mbps\n", 8 * task->link_speed / 1000000);
619         }
620 }
621
622 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
623 {
624         struct task_lat *task = (struct task_lat *)tbase;
625         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
626
627         task->lat_pos = targ->lat_pos;
628         task->accur_pos = targ->accur_pos;
629         task->sig_pos = targ->sig_pos;
630         task->sig = targ->sig;
631
632         task->unique_id_pos = targ->packet_id_pos;
633         task->latency_buffer_size = targ->latency_buffer_size;
634
635         if (task->latency_buffer_size) {
636                 init_task_lat_latency_buffer(task, targ->lconf->id);
637         }
638
639         if (targ->bucket_size < LATENCY_ACCURACY) {
640                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
641         }
642
643         task->lt[0].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
644         task->lt[1].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
645         if (task->unique_id_pos) {
646                 task_lat_init_eld(task, socket_id);
647                 task_lat_reset_eld(task);
648         }
649         task->lat_test = &task->lt[task->using_lt];
650
651         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
652         task->rx_pkt_meta = prox_zmalloc(MAX_RX_PKT_ALL * sizeof(*task->rx_pkt_meta), socket_id);
653         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
654
655         task->link_speed = UINT64_MAX;
656         if (targ->nb_rxports) {
657                 // task->port structure is only used while starting handle_lat to get the link_speed.
658                 // link_speed can not be quiried at init as the port has not been initialized yet.
659                 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
660                 task->port = port;
661         }
662 }
663
664 static struct task_init task_init_lat = {
665         .mode_str = "lat",
666         .init = init_task_lat,
667         .handle = handle_lat_bulk,
668         .start = lat_start,
669         .stop = lat_stop,
670         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_RX_ALL | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
671         .size = sizeof(struct task_lat)
672 };
673
674 __attribute__((constructor)) static void reg_task_lat(void)
675 {
676         reg_task(&task_init_lat);
677 }