2 // Copyright (c) 2010-2019 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
19 #include <rte_cycles.h>
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
28 #include "task_init.h"
29 #include "task_base.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
37 #define DEFAULT_BUCKET_SIZE 11
38 #define ACCURACY_BUFFER_SIZE (2 * ACCURACY_WINDOW)
41 uint32_t rx_packet_index;
42 uint64_t tx_packet_index;
47 uint16_t port_queue_id;
57 struct delayed_latency_entry {
58 uint32_t rx_packet_id;
59 uint32_t tx_packet_id;
63 uint64_t pkt_tx_time; // Time written into packets by gen. Unit is TSC >> LATENCY_ACCURACY
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
69 struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
70 if (delayed_latency_entry->packet_id == packet_id)
71 return delayed_latency_entry;
76 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
78 struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
79 delayed_latency_entry->packet_id = packet_id;
80 return delayed_latency_entry;
83 struct rx_pkt_meta_data {
86 uint32_t bytes_after_in_bulk;
99 struct task_base base;
101 uint64_t rx_packet_index;
102 uint64_t last_pkts_tsc;
103 struct delayed_latency_entry **delayed_latency_entries;
104 struct lat_info *latency_buffer;
105 uint32_t latency_buffer_idx;
106 uint32_t latency_buffer_size;
109 uint16_t unique_id_pos;
113 volatile uint16_t use_lt; /* which lt to use, */
114 volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
115 struct lat_test lt[2];
116 struct lat_test *lat_test;
117 uint32_t generator_count;
118 uint16_t min_pkt_len;
119 struct early_loss_detect *eld;
120 struct rx_pkt_meta_data *rx_pkt_meta;
121 // Following fields are only used when starting or stopping, not in general runtime
122 uint64_t *prev_tx_packet_index;
126 struct prox_port_cfg *port;
127 uint64_t *bytes_to_tsc;
128 uint64_t *previous_packet;
129 uint32_t loss_buffer_size;
130 struct loss_buffer *loss_buffer;
132 uint32_t packet_id_in_flow_pos;
137 /* This function calculate the difference between rx and tx_time
138 * Both values are uint32_t (see handle_lat_bulk)
139 * rx time should be higher than tx_time...except every UINT32_MAX
140 * cycles, when rx_time overflows.
141 * As the return value is also uint32_t, returning (rx_time - tx_time)
142 * is also fine when it overflows.
144 static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
146 return rx_time - tx_time;
149 uint32_t task_lat_get_latency_bucket_size(struct task_lat *task)
151 return task->lat_test->bucket_size;
154 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
156 if (task->use_lt == task->using_lt)
157 return &task->lt[!task->using_lt];
161 void task_lat_use_other_latency_meassurement(struct task_lat *task)
163 task->use_lt = !task->using_lt;
166 static void task_lat_update_lat_test(struct task_lat *task)
168 if (task->use_lt != task->using_lt) {
169 task->using_lt = task->use_lt;
170 task->lat_test = &task->lt[task->using_lt];
171 task->lat_test->accuracy_limit_tsc = task->limit;
175 static int compare_tx_time(const void *val1, const void *val2)
177 const struct lat_info *ptr1 = val1;
178 const struct lat_info *ptr2 = val2;
180 return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
183 static int compare_tx_packet_index(const void *val1, const void *val2)
185 const struct lat_info *ptr1 = val1;
186 const struct lat_info *ptr2 = val2;
188 return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
191 static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
193 uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
194 uint32_t small = UINT32_MAX >> 1;
198 /* Buffer is sorted so far by RX time.
199 * We might have packets being reordered by SUT.
200 * => consider small differences as re-order and big ones as overflow of tx_packet_index.
202 * - overflow only happens if receiving and storing 4 billions packets...
203 * - a absolute difference of less than 2 billion packets is not considered as an overflow
205 for (uint32_t i = 1; i < count; i++) {
206 tx_packet_index = lat->tx_packet_index;
207 if (tx_packet_index > old_tx_packet_index) {
208 if (tx_packet_index - old_tx_packet_index < small) {
209 // The diff is small => increasing index count
211 // The diff is big => it is more likely that the previous packet was overflow
215 if (old_tx_packet_index - tx_packet_index < small) {
216 // The diff is small => packet reorder
218 // The diff is big => it is more likely that this is an overflow
222 lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
223 old_tx_packet_index = tx_packet_index;
228 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
230 uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
231 uint32_t small = UINT32_MAX >> 1;
235 * Same algorithm as above, but with time.
237 * - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
238 * - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
239 * => algorithm does not work if receiving less than 1 packet every 2 seconds
241 for (uint32_t i = 1; i < count; i++) {
242 tx_time = lat->tx_time;
243 if (tx_time > old_tx_time) {
244 if (tx_time - old_tx_time > small) {
248 if (old_tx_time - tx_time > small) {
252 lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
253 old_tx_time = tx_time;
258 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
260 struct lat_test *lat_test = task->lat_test;
262 for (uint32_t j = 0; j < task->generator_count; j++) {
263 struct early_loss_detect *eld = &task->eld[j];
265 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
269 static void task_lat_reset_eld(struct task_lat *task)
271 for (uint32_t j = 0; j < task->generator_count; j++) {
272 early_loss_detect_reset(&task->eld[j]);
276 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
278 uint64_t min_tsc = UINT64_MAX;
280 for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
281 if (min_tsc > task->latency_buffer[i].tx_time)
282 min_tsc = task->latency_buffer[i].tx_time;
285 return min_tsc << LATENCY_ACCURACY;
288 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
290 uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
292 return lat << LATENCY_ACCURACY;
295 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
297 return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
300 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
302 return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
305 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
307 return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
310 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
312 return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
315 static void lat_write_latency_to_file(struct task_lat *task)
320 min_tsc = lat_latency_buffer_get_min_tsc(task);
322 // Dumping all packet statistics
323 fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
324 fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
325 for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
326 struct lat_info *lat_info = &task->latency_buffer[i];
327 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
328 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
329 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
331 fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
332 lat_info->rx_packet_index,
333 lat_info->port_queue_id,
334 lat_info->tx_packet_index,
335 tsc_to_nsec(lat_tsc),
336 tsc_to_nsec(rx_tsc - min_tsc),
337 tsc_to_nsec(tx_tsc - min_tsc));
340 // To detect dropped packets, we need to sort them based on TX
341 if (task->unique_id_pos) {
342 plogx_info("Adapting tx_packet_index\n");
343 fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
344 plogx_info("Sorting packets based on tx_packet_index\n");
345 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
346 plogx_info("Sorted packets based on packet_index\n");
348 plogx_info("Adapting tx_time\n");
349 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
350 plogx_info("Sorting packets based on tx_time\n");
351 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
352 plogx_info("Sorted packets based on packet_time\n");
355 // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
356 fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
357 fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
359 for (uint32_t i = 0; i < task->generator_count;i++)
360 task->prev_tx_packet_index[i] = -1;
362 for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
363 struct lat_info *lat_info = &task->latency_buffer[i];
364 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
365 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
366 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
367 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
368 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
370 /* Packet n + ACCURACY_WINDOW delivers the TX error for packet n,
371 hence the last ACCURACY_WINDOW packets do no have TX error. */
372 if (i + ACCURACY_WINDOW >= task->latency_buffer_idx) {
376 if (lat_info->port_queue_id >= task->generator_count) {
377 plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
378 lat_info->port_queue_id, lat_info->tx_packet_index);
381 // Log dropped packet
382 n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
384 fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
385 lat_info->port_queue_id,
386 lat_info->tx_packet_index - n_loss, n_loss);
388 fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
389 lat_info->port_queue_id,
390 lat_info->tx_packet_index,
391 lat_info->rx_packet_index,
392 tsc_to_nsec(lat_tsc),
393 tsc_to_nsec(tx_tsc - min_tsc),
394 tsc_to_nsec(rx_tsc - min_tsc),
395 tsc_to_nsec(tx_err_tsc),
396 tsc_to_nsec(rx_err_tsc));
398 fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
399 lat_info->id_in_bulk,
401 tsc_to_nsec(lat_info->begin - min_tsc),
402 tsc_to_nsec(lat_info->before - min_tsc),
403 tsc_to_nsec(lat_info->after - min_tsc));
405 fprintf(task->fp_tx, "\n");
406 task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
410 task->latency_buffer_idx = 0;
413 static void lat_stop(struct task_base *tbase)
415 struct task_lat *task = (struct task_lat *)tbase;
417 if (task->unique_id_pos) {
418 task_lat_count_remaining_lost_packets(task);
419 task_lat_reset_eld(task);
420 memset(task->previous_packet, 0, sizeof(task->previous_packet) * task->generator_count);
422 if (task->loss_id && task->fp_loss) {
423 for (uint i = 0; i < task->loss_id; i++) {
424 fprintf(task->fp_loss, "packet %d: %d\n", task->loss_buffer[i].packet_id, task->loss_buffer[i].n);
427 task->lat_test->lost_packets = 0;
428 if (task->latency_buffer)
429 lat_write_latency_to_file(task);
433 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
435 struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
437 lat_info->bulk_size = bulk_size;
438 lat_info->id_in_bulk = id_in_bulk;
439 lat_info->begin = task->begin;
440 lat_info->before = task->base.aux->tsc_rx.before;
441 lat_info->after = task->base.aux->tsc_rx.after;
445 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
447 struct lat_info *lat_info;
449 /* If unique_id_pos is specified then latency is stored per
450 packet being sent. Lost packets are detected runtime, and
451 latency stored for those packets will be 0 */
452 lat_info = &task->latency_buffer[task->latency_buffer_idx++];
453 lat_info->rx_packet_index = rx_packet_index;
454 lat_info->tx_packet_index = packet_id;
455 lat_info->port_queue_id = generator_id;
456 lat_info->rx_time = rx_time;
457 lat_info->tx_time = tx_time;
458 lat_info->rx_err = rx_err;
459 lat_info->tx_err = tx_err;
462 static uint32_t task_lat_early_loss_detect(struct task_lat *task, uint32_t packet_id, uint8_t generator_id)
464 struct early_loss_detect *eld = &task->eld[generator_id];
465 return early_loss_detect_add(eld, packet_id);
468 static void lat_test_check_duplicate(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
470 struct early_loss_detect *eld = &task->eld[generator_id];
471 uint32_t old_queue_id, queue_pos;
473 queue_pos = packet_id & PACKET_QUEUE_MASK;
474 old_queue_id = eld->entries[queue_pos];
475 if ((packet_id >> PACKET_QUEUE_BITS) == old_queue_id)
476 lat_test->duplicate++;
479 static uint64_t tsc_extrapolate_backward(struct task_lat *task, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
481 #ifdef NO_LAT_EXTRAPOLATION
482 uint64_t tsc = tsc_from;
484 uint64_t tsc = tsc_from - task->bytes_to_tsc[bytes];
486 if (likely(tsc > tsc_minimum))
492 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
494 uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
495 size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
497 bucket_id = bucket_id < bucket_count? bucket_id : (bucket_count - 1);
498 lat_test->buckets[bucket_id]++;
501 static void lat_test_check_flow_ordering(struct task_lat *task, struct lat_test *lat_test, int32_t flow_id, uint32_t packet_id)
503 if (packet_id < task->flows[flow_id].packet_id) {
504 lat_test->mis_ordered++;
505 lat_test->extent += task->flows[flow_id].packet_id - packet_id;
507 task->flows[flow_id].packet_id = packet_id;
510 static void lat_test_check_ordering(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
512 if (packet_id < task->previous_packet[generator_id]) {
513 lat_test->mis_ordered++;
514 lat_test->extent += task->previous_packet[generator_id] - packet_id;
516 task->previous_packet[generator_id] = packet_id;
519 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
521 lat_test->lost_packets += lost_packets;
524 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
526 if (error > lat_test->accuracy_limit_tsc)
528 lat_test->tot_pkts++;
530 lat_test->tot_lat += lat_tsc;
531 lat_test->tot_lat_error += error;
533 /* (a +- b)^2 = a^2 +- (2ab + b^2) */
534 lat_test->var_lat += lat_tsc * lat_tsc;
535 lat_test->var_lat_error += 2 * lat_tsc * error;
536 lat_test->var_lat_error += error * error;
538 if (lat_tsc > lat_test->max_lat) {
539 lat_test->max_lat = lat_tsc;
540 lat_test->max_lat_error = error;
542 if (lat_tsc < lat_test->min_lat) {
543 lat_test->min_lat = lat_tsc;
544 lat_test->min_lat_error = error;
547 #ifdef LATENCY_HISTOGRAM
548 lat_test_histogram_add(lat_test, lat_tsc);
552 static int task_lat_can_store_latency(struct task_lat *task)
554 return task->latency_buffer_idx < task->latency_buffer_size;
557 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
559 uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
561 lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
563 if (task_lat_can_store_latency(task)) {
564 task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
568 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
570 struct task_lat *task = (struct task_lat *)tbase;
571 static int max_flows_printed = 0;
575 task->begin = tbase->aux->tsc_rx.before;
579 task_lat_update_lat_test(task);
581 // Remember those packets with bad length or bad signature
582 uint32_t non_dp_count = 0;
583 uint64_t pkt_bad_len_sig = 0;
584 #define BIT64_SET(a64, bit) a64 |= (((uint64_t)1) << (bit & 63))
585 #define BIT64_CLR(a64, bit) a64 &= ~(((uint64_t)1) << (bit & 63))
586 #define BIT64_TEST(a64, bit) a64 & (((uint64_t)1) << (bit & 63))
588 /* Go once through all received packets and read them. If
589 packet has just been modified by another core, the cost of
590 latency will be partialy amortized though the bulk size */
591 for (uint16_t j = 0; j < n_pkts; ++j) {
592 struct rte_mbuf *mbuf = mbufs[j];
593 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
595 // Remember those packets which are too short to hold the values that we expect
596 if (unlikely(rte_pktmbuf_pkt_len(mbuf) < task->min_pkt_len)) {
597 BIT64_SET(pkt_bad_len_sig, j);
600 BIT64_CLR(pkt_bad_len_sig, j);
604 for (uint16_t j = 0; j < n_pkts; ++j) {
605 if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
607 // Remember those packets with bad signature
608 if (likely(*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig))
609 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
611 BIT64_SET(pkt_bad_len_sig, j);
616 for (uint16_t j = 0; j < n_pkts; ++j) {
617 if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
619 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
623 uint32_t bytes_total_in_bulk = 0;
624 // Find RX time of first packet, for RX accuracy
625 for (uint16_t j = 0; j < n_pkts; ++j) {
626 uint16_t flipped = n_pkts - 1 - j;
628 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
629 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
632 const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
634 uint64_t rx_time_err;
635 uint64_t pkt_rx_time64 = tsc_extrapolate_backward(task, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
636 if (unlikely((task->begin >> LATENCY_ACCURACY) > pkt_rx_time64)) {
637 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
638 rx_time_err = pkt_rx_time64 - (task->last_pkts_tsc >> LATENCY_ACCURACY);
640 rx_time_err = pkt_rx_time64 - (task->begin >> LATENCY_ACCURACY);
643 TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, non_dp_count);
644 for (uint16_t j = 0; j < n_pkts; ++j) {
645 // Used to display % of packets within accuracy limit vs. total number of packets (used_col)
646 task->lat_test->tot_all_pkts++;
648 // Skip those packets with bad length or bad signature
649 if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
652 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
653 uint8_t *hdr = rx_pkt_meta->hdr;
655 uint32_t pkt_rx_time = tsc_extrapolate_backward(task, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
656 uint32_t pkt_tx_time = rx_pkt_meta->pkt_tx_time;
658 uint8_t generator_id;
660 int32_t flow_id = -1;
661 if (task->flow_id_pos) {
662 flow_id = *(int32_t *)(hdr + task->flow_id_pos);
663 if (unlikely(flow_id >= (int32_t)(task->flow_count))) {
665 if (!max_flows_printed) {
666 plog_info("Too many flows - increase flow count (only printed once)\n");
667 max_flows_printed = 1;
672 if (task->packet_id_in_flow_pos && (flow_id != -1)) {
673 uint32_t packet_id_in_flow;
674 struct unique_id *unique_id = (struct unique_id *)(hdr + task->packet_id_in_flow_pos);
675 unique_id_get(unique_id, &generator_id, &packet_id_in_flow);
676 lat_test_check_flow_ordering(task, task->lat_test, flow_id + generator_id * task->generator_count, packet_id_in_flow);
678 if (task->unique_id_pos) {
679 struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
680 unique_id_get(unique_id, &generator_id, &packet_id);
682 if (unlikely(generator_id >= task->generator_count)) {
683 /* No need to remember unexpected packet at this stage
684 BIT64_SET(pkt_bad_len_sig, j);
686 // Skip unexpected packet
690 lat_test_check_ordering(task, task->lat_test, packet_id, generator_id);
692 lat_test_check_duplicate(task, task->lat_test, packet_id, generator_id);
693 uint32_t loss = task_lat_early_loss_detect(task, packet_id, generator_id);
695 lat_test_add_lost(task->lat_test, loss);
696 if (task->loss_id < task->loss_buffer_size) {
697 task->loss_buffer[task->loss_id].packet_id = packet_id;
698 task->loss_buffer[task->loss_id++].n = loss;
703 packet_id = task->rx_packet_index;
706 /* If accuracy is enabled, latency is reported with a
707 delay of ACCURACY_WINDOW packets since the generator puts the
708 accuracy for packet N into packet N + ACCURACY_WINDOW. The delay
709 ensures that all reported latencies have both rx
711 if (task->accur_pos) {
712 uint32_t tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
714 struct delayed_latency_entry *delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_WINDOW);
716 if (delayed_latency_entry) {
717 task_lat_store_lat(task,
718 delayed_latency_entry->rx_packet_id,
719 delayed_latency_entry->pkt_rx_time,
720 delayed_latency_entry->pkt_tx_time,
721 delayed_latency_entry->rx_time_err,
723 delayed_latency_entry->tx_packet_id,
724 delayed_latency_entry->generator_id);
727 delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
728 delayed_latency_entry->pkt_rx_time = pkt_rx_time;
729 delayed_latency_entry->pkt_tx_time = pkt_tx_time;
730 delayed_latency_entry->rx_time_err = rx_time_err;
731 delayed_latency_entry->rx_packet_id = task->rx_packet_index;
732 delayed_latency_entry->tx_packet_id = packet_id;
733 delayed_latency_entry->generator_id = generator_id;
735 task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
738 // Bad/unexpected packets do not need to be indexed
739 task->rx_packet_index++;
742 if (n_pkts < MAX_PKT_BURST)
743 task->begin = tbase->aux->tsc_rx.before;
744 task->last_pkts_tsc = tbase->aux->tsc_rx.after;
746 rc = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
747 // non_dp_count should not be drop-handled, as there are all by definition considered as not handled
748 // RX = DISCARDED + HANDLED + NON_DP + (TX - TX_NON_DP) + TX_FAIL
749 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, -non_dp_count);
753 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
755 const int socket_id = rte_lcore_to_socket_id(core_id);
757 size_t latency_buffer_mem_size = 0;
759 if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
760 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
762 latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
764 task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
765 PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
767 sprintf(name, "latency.rx_%u.txt", core_id);
768 task->fp_rx = fopen(name, "w+");
769 PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
771 sprintf(name, "latency.tx_%u.txt", core_id);
772 task->fp_tx = fopen(name, "w+");
773 PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
775 task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
776 PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
779 static void task_init_generator_count(struct task_lat *task)
781 uint8_t *generator_count = prox_sh_find_system("generator_count");
783 if (generator_count == NULL) {
784 task->generator_count = 1;
785 plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
787 task->generator_count = *generator_count;
788 plog_info("\t\tLatency using %u generators\n", task->generator_count);
791 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
795 eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
796 task->eld = prox_zmalloc(eld_mem_size, socket_id);
797 PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
800 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
802 task->limit = nsec_to_tsc(accuracy_limit_nsec);
805 static void lat_start(struct task_base *tbase)
807 struct task_lat *task = (struct task_lat *)tbase;
811 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
813 struct task_lat *task = (struct task_lat *)tbase;
814 const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
816 task->lat_pos = targ->lat_pos;
817 task->accur_pos = targ->accur_pos;
818 task->sig_pos = targ->sig_pos;
819 task->sig = targ->sig;
820 task->packet_id_in_flow_pos = targ->packet_id_in_flow_pos;
821 task->flow_id_pos = targ->flow_id_pos;
823 task->unique_id_pos = targ->packet_id_pos;
824 task->latency_buffer_size = targ->latency_buffer_size;
826 PROX_PANIC(task->lat_pos == 0, "Missing 'lat pos' parameter in config file\n");
827 uint16_t min_pkt_len = task->lat_pos + sizeof(uint32_t);
828 if (task->unique_id_pos && (
829 min_pkt_len < task->unique_id_pos + sizeof(struct unique_id)))
830 min_pkt_len = task->unique_id_pos + sizeof(struct unique_id);
831 if (task->accur_pos && (
832 min_pkt_len < task->accur_pos + sizeof(uint32_t)))
833 min_pkt_len = task->accur_pos + sizeof(uint32_t);
834 if (task->sig_pos && (
835 min_pkt_len < task->sig_pos + sizeof(uint32_t)))
836 min_pkt_len = task->sig_pos + sizeof(uint32_t);
837 task->min_pkt_len = min_pkt_len;
839 task_init_generator_count(task);
841 if (task->latency_buffer_size) {
842 init_task_lat_latency_buffer(task, targ->lconf->id);
845 if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
846 targ->bucket_size = DEFAULT_BUCKET_SIZE;
849 if (task->accur_pos) {
850 task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
851 PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
852 for (uint i = 0; i < task->generator_count; i++) {
853 task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
854 PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
856 if (task->unique_id_pos == 0) {
857 /* When using accuracy feature, the accuracy from TX is written ACCURACY_WINDOW packets later
858 * We can only retrieve the good packet if a packet id is written to it.
859 * Otherwise we will use the packet RECEIVED ACCURACY_WINDOW packets ago which is OK if
860 * packets are not re-ordered. If packets are re-ordered, then the matching between
861 * the TX accuracy and the latency is wrong.
863 plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
867 task->lt[0].min_lat = -1;
868 task->lt[1].min_lat = -1;
869 task->lt[0].bucket_size = targ->bucket_size;
870 task->lt[1].bucket_size = targ->bucket_size;
871 if (task->unique_id_pos) {
872 task_lat_init_eld(task, socket_id);
873 task_lat_reset_eld(task);
874 task->previous_packet = prox_zmalloc(sizeof(task->previous_packet) * task->generator_count , socket_id);
875 PROX_PANIC(task->previous_packet == NULL, "Failed to allocate array for storing previous packet\n");
877 task->lat_test = &task->lt[task->using_lt];
879 task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
880 task->rx_pkt_meta = prox_zmalloc(MAX_PKT_BURST * sizeof(*task->rx_pkt_meta), socket_id);
881 PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
883 uint32_t max_frame_size = MAX_PKT_SIZE;
884 uint64_t bytes_per_hz = UINT64_MAX;
885 if (targ->nb_rxports) {
886 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
887 max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE;
889 // port->max_link_speed reports the maximum, non negotiated ink speed in Mbps e.g. 40k for a 40 Gbps NIC.
890 // It can be UINT32_MAX (virtual devices or not supported by DPDK < 16.04)
891 if (port->max_link_speed != UINT32_MAX) {
892 bytes_per_hz = port->max_link_speed * 125000L;
893 plog_info("\t\tPort %u: max link speed is %ld Mbps\n",
894 (uint8_t)(port - prox_port_cfg), 8 * bytes_per_hz / 1000000);
897 task->loss_buffer_size = targ->loss_buffer_size;
898 if (task->loss_buffer_size) {
900 sprintf(name, "loss_%u.txt", targ->lconf->id);
901 task->fp_loss = fopen(name, "w+");
902 PROX_PANIC(task->fp_loss == NULL, "Failed to open %s\n", name);
904 task->loss_buffer = prox_zmalloc(task->loss_buffer_size * sizeof(struct loss_buffer), rte_lcore_to_socket_id(targ->lconf->id));
905 PROX_PANIC(task->loss_buffer == NULL,
906 "Failed to allocate %lu bytes (in huge pages) for loss_buffer\n", task->loss_buffer_size * sizeof(struct loss_buffer));
908 task->bytes_to_tsc = prox_zmalloc(max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST, rte_lcore_to_socket_id(targ->lconf->id));
909 PROX_PANIC(task->bytes_to_tsc == NULL,
910 "Failed to allocate %lu bytes (in huge pages) for bytes_to_tsc\n", max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST);
912 // There are cases where hz estimate might be slighly over-estimated
913 // This results in too much extrapolation
914 // Only account for 99% of extrapolation to handle cases with up to 1% error clocks
915 for (unsigned int i = 0; i < max_frame_size * MAX_PKT_BURST ; i++) {
916 if (bytes_per_hz == UINT64_MAX)
917 task->bytes_to_tsc[i] = 0;
919 task->bytes_to_tsc[i] = (rte_get_tsc_hz() * i * 0.99) / bytes_per_hz;
921 task->flow_count = targ->flow_count;
922 PROX_PANIC(task->flow_id_pos && (task->flow_count == 0), "flow_count must be configured when flow_id_pos is set\n");
923 if (task->flow_count) {
924 task->flows = prox_zmalloc(task->flow_count * sizeof(struct flows) * task->generator_count, rte_lcore_to_socket_id(targ->lconf->id));
925 PROX_PANIC(task->flows == NULL,
926 "Failed to allocate %lu bytes (in huge pages) for flows\n", task->flow_count * sizeof(struct flows) * task->generator_count);
930 static struct task_init task_init_lat = {
932 .init = init_task_lat,
933 .handle = handle_lat_bulk,
936 .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
937 .size = sizeof(struct task_lat)
940 __attribute__((constructor)) static void reg_task_lat(void)
942 reg_task(&task_init_lat);