95ebcc73528771fd44972045aa9a67cc75949e04
[samplevnf.git] / VNFs / DPPD-PROX / handle_lat.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 //#define LAT_DEBUG
18
19 #include <rte_cycles.h>
20 #include <stdio.h>
21 #include <math.h>
22
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
27 #include "log.h"
28 #include "task_init.h"
29 #include "task_base.h"
30 #include "stats.h"
31 #include "lconf.h"
32 #include "quit.h"
33 #include "eld.h"
34 #include "prox_shared.h"
35
36 #define DEFAULT_BUCKET_SIZE     10
37
38 struct lat_info {
39         uint32_t rx_packet_index;
40         uint32_t tx_packet_index;
41         uint32_t tx_err;
42         uint32_t rx_err;
43         uint64_t rx_time;
44         uint64_t tx_time;
45         uint16_t port_queue_id;
46 #ifdef LAT_DEBUG
47         uint16_t id_in_bulk;
48         uint16_t bulk_size;
49         uint64_t begin;
50         uint64_t after;
51         uint64_t before;
52 #endif
53 };
54
55 struct delayed_latency_entry {
56         uint32_t rx_packet_idx;
57         uint64_t pkt_rx_time;
58         uint64_t pkt_tx_time;
59         uint64_t rx_time_err;
60 };
61
62 struct delayed_latency {
63         struct delayed_latency_entry entries[64];
64 };
65
66 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency *delayed_latency, uint32_t rx_packet_idx)
67 {
68         if (delayed_latency->entries[rx_packet_idx % 64].rx_packet_idx == rx_packet_idx)
69                 return &delayed_latency->entries[rx_packet_idx % 64];
70         else
71                 return NULL;
72 }
73
74 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency *delayed_latency, uint32_t rx_packet_idx)
75 {
76         delayed_latency->entries[rx_packet_idx % 64].rx_packet_idx = rx_packet_idx;
77         return &delayed_latency->entries[rx_packet_idx % 64];
78 }
79
80 struct rx_pkt_meta_data {
81         uint8_t  *hdr;
82         uint32_t pkt_tx_time;
83         uint32_t bytes_after_in_bulk;
84 };
85
86 struct task_lat {
87         struct task_base base;
88         uint64_t limit;
89         uint64_t rx_packet_index;
90         uint64_t last_pkts_tsc;
91         struct delayed_latency delayed_latency;
92         struct lat_info *latency_buffer;
93         uint32_t latency_buffer_idx;
94         uint32_t latency_buffer_size;
95         uint64_t begin;
96         uint16_t lat_pos;
97         uint16_t unique_id_pos;
98         uint16_t accur_pos;
99         uint16_t sig_pos;
100         uint32_t sig;
101         volatile uint16_t use_lt; /* which lt to use, */
102         volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
103         struct lat_test lt[2];
104         struct lat_test *lat_test;
105         uint32_t generator_count;
106         struct early_loss_detect *eld;
107         struct rx_pkt_meta_data *rx_pkt_meta;
108         FILE *fp_rx;
109         FILE *fp_tx;
110 };
111
112 static uint32_t abs_diff(uint32_t a, uint32_t b)
113 {
114        return a < b? UINT32_MAX - (b - a - 1) : a - b;
115 }
116
117 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
118 {
119         if (task->use_lt == task->using_lt)
120                 return &task->lt[!task->using_lt];
121         return NULL;
122 }
123
124 void task_lat_use_other_latency_meassurement(struct task_lat *task)
125 {
126         task->use_lt = !task->using_lt;
127 }
128
129 static void task_lat_update_lat_test(struct task_lat *task)
130 {
131         if (task->use_lt != task->using_lt) {
132                 task->using_lt = task->use_lt;
133                 task->lat_test = &task->lt[task->using_lt];
134                 task->lat_test->accuracy_limit_tsc = task->limit;
135         }
136 }
137
138 static int compare_tx_time(const void *val1, const void *val2)
139 {
140         const struct lat_info *ptr1 = val1;
141         const struct lat_info *ptr2 = val2;
142
143         return ptr1->tx_time - ptr2->tx_time;
144 }
145
146 static int compare_queue_id(const void *val1, const void *val2)
147 {
148         return compare_tx_time(val1, val2);
149 }
150
151 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
152 {
153         uint32_t id, time, old_id = 0, old_time = 0, n_overflow = 0;
154
155         for (uint32_t i = 0; i < count; i++) {
156                 id = lat->port_queue_id;
157                 time = lat->tx_time;
158                 if (id == old_id) {
159                         // Same queue id as previous entry; time should always increase
160                         if (time < old_time) {
161                                 n_overflow++;
162                         }
163                         lat->tx_time += UINT32_MAX * n_overflow;
164                         old_time = time;
165                 } else {
166                         // Different queue_id, time starts again at 0
167                         old_id = id;
168                         old_time = 0;
169                         n_overflow = 0;
170                 }
171         }
172 }
173
174 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
175 {
176         struct lat_test *lat_test = task->lat_test;
177
178         for (uint32_t j = 0; j < task->generator_count; j++) {
179                 struct early_loss_detect *eld = &task->eld[j];
180
181                 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
182         }
183 }
184
185 static void task_lat_reset_eld(struct task_lat *task)
186 {
187         for (uint32_t j = 0; j < task->generator_count; j++) {
188                 early_loss_detect_reset(&task->eld[j]);
189         }
190 }
191
192 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
193 {
194         uint64_t min_tsc = UINT64_MAX;
195
196         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
197                 if (min_tsc > task->latency_buffer[i].tx_time)
198                         min_tsc = task->latency_buffer[i].tx_time;
199         }
200
201         return min_tsc << LATENCY_ACCURACY;
202 }
203
204 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
205 {
206         uint64_t lat = abs_diff(lat_info->rx_time, lat_info->tx_time);
207
208         return lat << LATENCY_ACCURACY;
209 }
210
211 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
212 {
213         return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
214 }
215
216 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
217 {
218         return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
219 }
220
221 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
222 {
223         return ((uint64_t)lat_info) << LATENCY_ACCURACY;
224 }
225
226 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
227 {
228         return ((uint64_t)lat_info) << LATENCY_ACCURACY;
229 }
230
231 static void lat_write_latency_to_file(struct task_lat *task)
232 {
233         uint64_t min_tsc;
234         uint32_t n_loss;
235
236         min_tsc = lat_latency_buffer_get_min_tsc(task);
237
238         // Dumping all packet statistics
239         fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
240         fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
241         for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
242                 struct lat_info *lat_info = &task->latency_buffer[i];
243                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
244                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
245                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
246
247                 fprintf(task->fp_rx, "%u%d;%d;%ld;%lu;%lu\n",
248                         lat_info->rx_packet_index,
249                         lat_info->port_queue_id,
250                         lat_info->tx_packet_index,
251                         tsc_to_nsec(lat_tsc),
252                         tsc_to_nsec(rx_tsc - min_tsc),
253                         tsc_to_nsec(tx_tsc - min_tsc));
254         }
255
256         // To detect dropped packets, we need to sort them based on TX
257         plogx_info("Sorting packets based on queue_id\n");
258         qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_queue_id);
259         plogx_info("Adapting tx_time\n");
260         fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
261         plogx_info("Sorting packets based on tx_time\n");
262         qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
263         plogx_info("Sorted packets based on tx_time\n");
264
265         // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
266         fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
267         fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
268
269         uint32_t prev_tx_packet_index = -1;
270         for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
271                 struct lat_info *lat_info = &task->latency_buffer[i];
272                 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
273                 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
274                 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
275                 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
276                 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
277
278                 /* Packet n + 64 delivers the TX error for packet n,
279                    hence the last 64 packets do no have TX error. */
280                 if (i + 64 >= task->latency_buffer_idx) {
281                         tx_err_tsc = 0;
282                 }
283                 // Log dropped packet
284                 n_loss = lat_info->tx_packet_index - prev_tx_packet_index - 1;
285                 if (n_loss)
286                         fprintf(task->fp_tx, "===> %d;%d;0;0;0;0; lost %d packets <===\n",
287                                 lat_info->port_queue_id,
288                                 lat_info->tx_packet_index - n_loss, n_loss);
289                 // Log next packet
290                 fprintf(task->fp_tx, "%d;%d;%u;%lu;%lu;%lu;%lu;%lu\n",
291                         lat_info->port_queue_id,
292                         lat_info->tx_packet_index,
293                         lat_info->rx_packet_index,
294                         tsc_to_nsec(lat_tsc),
295                         tsc_to_nsec(tx_tsc - min_tsc),
296                         tsc_to_nsec(rx_tsc - min_tsc),
297                         tsc_to_nsec(tx_err_tsc),
298                         tsc_to_nsec(rx_err_tsc));
299 #ifdef LAT_DEBUG
300                 fprintf(task->fp_tx, ";%d from %d;%lu;%lu;%lu",
301                         lat_info->id_in_bulk,
302                         lat_info->bulk_size,
303                         tsc_to_nsec(lat_info->begin - min_tsc),
304                         tsc_to_nsec(lat_info->before - min_tsc),
305                         tsc_to_nsec(lat_info->after - min_tsc));
306 #endif
307                 fprintf(task->fp_tx, "\n");
308                 prev_tx_packet_index = lat_info->tx_packet_index;
309         }
310         fflush(task->fp_rx);
311         fflush(task->fp_tx);
312         task->latency_buffer_idx = 0;
313 }
314
315 static void lat_stop(struct task_base *tbase)
316 {
317         struct task_lat *task = (struct task_lat *)tbase;
318
319         if (task->unique_id_pos) {
320                 task_lat_count_remaining_lost_packets(task);
321                 task_lat_reset_eld(task);
322         }
323         if (task->latency_buffer)
324                 lat_write_latency_to_file(task);
325 }
326
327 #ifdef LAT_DEBUG
328 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
329 {
330         struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
331
332         lat_info->bulk_size = bulk_size;
333         lat_info->id_in_bulk = id_in_bulk;
334         lat_info->begin = task->begin;
335         lat_info->before = task->base.aux->tsc_rx.before;
336         lat_info->after = task->base.aux->tsc_rx.after;
337 }
338 #endif
339
340 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, struct unique_id *unique_id, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err)
341 {
342         struct lat_info *lat_info;
343         uint8_t generator_id = 0;
344         uint32_t packet_index = 0;
345
346         if (unique_id)
347                 unique_id_get(unique_id, &generator_id, &packet_index);
348
349         /* If unique_id_pos is specified then latency is stored per
350            packet being sent. Lost packets are detected runtime, and
351            latency stored for those packets will be 0 */
352         lat_info = &task->latency_buffer[task->latency_buffer_idx++];
353         lat_info->rx_packet_index = task->latency_buffer_idx - 1;
354         lat_info->tx_packet_index = packet_index;
355         lat_info->port_queue_id = generator_id;
356         lat_info->rx_time = rx_time;
357         lat_info->tx_time = tx_time;
358         lat_info->rx_err = rx_err;
359         lat_info->tx_err = tx_err;
360 }
361
362 static uint32_t task_lat_early_loss_detect(struct task_lat *task, struct unique_id *unique_id)
363 {
364         struct early_loss_detect *eld;
365         uint8_t generator_id;
366         uint32_t packet_index;
367
368         unique_id_get(unique_id, &generator_id, &packet_index);
369
370         if (generator_id >= task->generator_count)
371                 return 0;
372
373         eld = &task->eld[generator_id];
374
375         return early_loss_detect_add(eld, packet_index);
376 }
377
378 static uint64_t tsc_extrapolate_backward(uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
379 {
380         uint64_t tsc = tsc_from - rte_get_tsc_hz()*bytes/1250000000;
381         if (likely(tsc > tsc_minimum))
382                 return tsc;
383         else
384                 return tsc_minimum;
385 }
386
387 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
388 {
389         uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
390         size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
391
392         bucket_id = bucket_id < bucket_count? bucket_id : bucket_count;
393         lat_test->buckets[bucket_id]++;
394 }
395
396 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
397 {
398         lat_test->lost_packets += lost_packets;
399 }
400
401 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
402 {
403         lat_test->tot_all_pkts++;
404
405         if (error > lat_test->accuracy_limit_tsc)
406                 return;
407         lat_test->tot_pkts++;
408
409         lat_test->tot_lat += lat_tsc;
410         lat_test->tot_lat_error += error;
411
412         /* (a +- b)^2 = a^2 +- (2ab + b^2) */
413         lat_test->var_lat += lat_tsc * lat_tsc;
414         lat_test->var_lat_error += 2 * lat_tsc * error;
415         lat_test->var_lat_error += error * error;
416
417         if (lat_tsc > lat_test->max_lat) {
418                 lat_test->max_lat = lat_tsc;
419                 lat_test->max_lat_error = error;
420         }
421         if (lat_tsc < lat_test->min_lat) {
422                 lat_test->min_lat = lat_tsc;
423                 lat_test->min_lat_error = error;
424         }
425
426 #ifdef LATENCY_HISTOGRAM
427         lat_test_histogram_add(lat_test, lat_tsc);
428 #endif
429 }
430
431 static int task_lat_can_store_latency(struct task_lat *task)
432 {
433         return task->latency_buffer_idx < task->latency_buffer_size;
434 }
435
436 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, struct unique_id *unique_id)
437 {
438         if (tx_time == 0)
439                 return;
440         uint32_t lat_tsc = abs_diff(rx_time, tx_time) << LATENCY_ACCURACY;
441
442         lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
443
444         if (task_lat_can_store_latency(task)) {
445                 task_lat_store_lat_buf(task, rx_packet_index, unique_id, rx_time, tx_time, rx_error, tx_error);
446         }
447 }
448
449 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
450 {
451         struct task_lat *task = (struct task_lat *)tbase;
452         uint64_t rx_time_err;
453
454         uint32_t pkt_rx_time, pkt_tx_time;
455
456         if (n_pkts == 0) {
457                 task->begin = tbase->aux->tsc_rx.before;
458                 return 0;
459         }
460
461         task_lat_update_lat_test(task);
462
463         const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
464         uint32_t tx_time_err = 0;
465
466         /* Go once through all received packets and read them.  If
467            packet has just been modified by another core, the cost of
468            latency will be partialy amortized though the bulk size */
469         for (uint16_t j = 0; j < n_pkts; ++j) {
470                 struct rte_mbuf *mbuf = mbufs[j];
471                 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
472         }
473         for (uint16_t j = 0; j < n_pkts; ++j) {
474         }
475
476         if (task->sig) {
477                 for (uint16_t j = 0; j < n_pkts; ++j) {
478                         if (*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig)
479                                 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
480                         else
481                                 task->rx_pkt_meta[j].pkt_tx_time = 0;
482                 }
483         } else {
484                 for (uint16_t j = 0; j < n_pkts; ++j) {
485                         task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
486                 }
487         }
488
489         uint32_t bytes_total_in_bulk = 0;
490         // Find RX time of first packet, for RX accuracy
491         for (uint16_t j = 0; j < n_pkts; ++j) {
492                 uint16_t flipped = n_pkts - 1 - j;
493
494                 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
495                 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
496         }
497
498         pkt_rx_time = tsc_extrapolate_backward(rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
499         if ((uint32_t)((task->begin >> LATENCY_ACCURACY)) > pkt_rx_time) {
500                 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
501                 rx_time_err = pkt_rx_time - (uint32_t)(task->last_pkts_tsc >> LATENCY_ACCURACY);
502         } else {
503                 rx_time_err = pkt_rx_time - (uint32_t)(task->begin >> LATENCY_ACCURACY);
504         }
505
506         struct unique_id *unique_id = NULL;
507         struct delayed_latency_entry *delayed_latency_entry;
508
509         for (uint16_t j = 0; j < n_pkts; ++j) {
510                 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
511                 uint8_t *hdr = rx_pkt_meta->hdr;
512
513                 pkt_rx_time = tsc_extrapolate_backward(rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
514                 pkt_tx_time = rx_pkt_meta->pkt_tx_time;
515
516                 if (task->unique_id_pos) {
517                         unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
518
519                         uint32_t n_loss = task_lat_early_loss_detect(task, unique_id);
520                         lat_test_add_lost(task->lat_test, n_loss);
521                 }
522
523                 /* If accuracy is enabled, latency is reported with a
524                    delay of 64 packets since the generator puts the
525                    accuracy for packet N into packet N + 64. The delay
526                    ensures that all reported latencies have both rx
527                    and tx error. */
528                 if (task->accur_pos) {
529                         tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
530
531                         delayed_latency_entry = delayed_latency_get(&task->delayed_latency, task->rx_packet_index - 64);
532
533                         if (delayed_latency_entry) {
534                                 task_lat_store_lat(task,
535                                                    task->rx_packet_index,
536                                                    delayed_latency_entry->pkt_rx_time,
537                                                    delayed_latency_entry->pkt_tx_time,
538                                                    delayed_latency_entry->rx_time_err,
539                                                    tx_time_err,
540                                                    unique_id);
541                         }
542
543                         delayed_latency_entry = delayed_latency_create(&task->delayed_latency, task->rx_packet_index);
544                         delayed_latency_entry->pkt_rx_time = pkt_rx_time;
545                         delayed_latency_entry->pkt_tx_time = pkt_tx_time;
546                         delayed_latency_entry->rx_time_err = rx_time_err;
547                 } else {
548                         task_lat_store_lat(task,
549                                            task->rx_packet_index,
550                                            pkt_rx_time,
551                                            pkt_tx_time,
552                                            0,
553                                            0,
554                                            unique_id);
555                 }
556                 task->rx_packet_index++;
557         }
558         int ret;
559         ret = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
560         task->begin = tbase->aux->tsc_rx.before;
561         task->last_pkts_tsc = tbase->aux->tsc_rx.after;
562         return ret;
563 }
564
565 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
566 {
567         const int socket_id = rte_lcore_to_socket_id(core_id);
568         char name[256];
569         size_t latency_buffer_mem_size = 0;
570
571         if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
572                 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
573
574         latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
575
576         task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
577         PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %ld kbytes for %s\n", latency_buffer_mem_size / 1024, name);
578
579         sprintf(name, "latency.rx_%d.txt", core_id);
580         task->fp_rx = fopen(name, "w+");
581         PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
582
583         sprintf(name, "latency.tx_%d.txt", core_id);
584         task->fp_tx = fopen(name, "w+");
585         PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
586 }
587
588 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
589 {
590         uint8_t *generator_count = prox_sh_find_system("generator_count");
591         size_t eld_mem_size;
592
593         if (generator_count == NULL)
594                 task->generator_count = 0;
595         else
596                 task->generator_count = *generator_count;
597
598         eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
599         task->eld = prox_zmalloc(eld_mem_size, socket_id);
600 }
601
602 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
603 {
604         task->limit = nsec_to_tsc(accuracy_limit_nsec);
605 }
606
607 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
608 {
609         struct task_lat *task = (struct task_lat *)tbase;
610         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
611
612         task->lat_pos = targ->lat_pos;
613         task->accur_pos = targ->accur_pos;
614         task->sig_pos = targ->sig_pos;
615         task->sig = targ->sig;
616
617         task->unique_id_pos = targ->packet_id_pos;
618         task->latency_buffer_size = targ->latency_buffer_size;
619
620         if (task->latency_buffer_size) {
621                 init_task_lat_latency_buffer(task, targ->lconf->id);
622         }
623
624         if (targ->bucket_size < LATENCY_ACCURACY) {
625                 targ->bucket_size = DEFAULT_BUCKET_SIZE;
626         }
627
628         task->lt[0].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
629         task->lt[1].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
630         if (task->unique_id_pos) {
631                 task_lat_init_eld(task, socket_id);
632                 task_lat_reset_eld(task);
633         }
634         task->lat_test = &task->lt[task->using_lt];
635
636         task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
637         task->rx_pkt_meta = prox_zmalloc(MAX_RX_PKT_ALL * sizeof(*task->rx_pkt_meta), socket_id);
638         PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
639 }
640
641 static struct task_init task_init_lat = {
642         .mode_str = "lat",
643         .init = init_task_lat,
644         .handle = handle_lat_bulk,
645         .stop = lat_stop,
646         .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_RX_ALL | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
647         .size = sizeof(struct task_lat)
648 };
649
650 __attribute__((constructor)) static void reg_task_lat(void)
651 {
652         reg_task(&task_init_lat);
653 }