2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
19 #include <rte_cycles.h>
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
28 #include "task_init.h"
29 #include "task_base.h"
34 #include "prox_shared.h"
36 #define DEFAULT_BUCKET_SIZE 10
39 uint32_t rx_packet_index;
40 uint32_t tx_packet_index;
45 uint16_t port_queue_id;
55 struct delayed_latency_entry {
56 uint32_t rx_packet_idx;
62 struct delayed_latency {
63 struct delayed_latency_entry entries[64];
66 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency *delayed_latency, uint32_t rx_packet_idx)
68 if (delayed_latency->entries[rx_packet_idx % 64].rx_packet_idx == rx_packet_idx)
69 return &delayed_latency->entries[rx_packet_idx % 64];
74 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency *delayed_latency, uint32_t rx_packet_idx)
76 delayed_latency->entries[rx_packet_idx % 64].rx_packet_idx = rx_packet_idx;
77 return &delayed_latency->entries[rx_packet_idx % 64];
80 struct rx_pkt_meta_data {
83 uint32_t bytes_after_in_bulk;
87 struct task_base base;
89 uint64_t rx_packet_index;
90 uint64_t last_pkts_tsc;
91 struct delayed_latency delayed_latency;
92 struct lat_info *latency_buffer;
93 uint32_t latency_buffer_idx;
94 uint32_t latency_buffer_size;
97 uint16_t unique_id_pos;
101 volatile uint16_t use_lt; /* which lt to use, */
102 volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
103 struct lat_test lt[2];
104 struct lat_test *lat_test;
105 uint32_t generator_count;
106 struct early_loss_detect *eld;
107 struct rx_pkt_meta_data *rx_pkt_meta;
112 static uint32_t abs_diff(uint32_t a, uint32_t b)
114 return a < b? UINT32_MAX - (b - a - 1) : a - b;
117 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
119 if (task->use_lt == task->using_lt)
120 return &task->lt[!task->using_lt];
124 void task_lat_use_other_latency_meassurement(struct task_lat *task)
126 task->use_lt = !task->using_lt;
129 static void task_lat_update_lat_test(struct task_lat *task)
131 if (task->use_lt != task->using_lt) {
132 task->using_lt = task->use_lt;
133 task->lat_test = &task->lt[task->using_lt];
134 task->lat_test->accuracy_limit_tsc = task->limit;
138 static int compare_tx_time(const void *val1, const void *val2)
140 const struct lat_info *ptr1 = val1;
141 const struct lat_info *ptr2 = val2;
143 return ptr1->tx_time - ptr2->tx_time;
146 static int compare_queue_id(const void *val1, const void *val2)
148 return compare_tx_time(val1, val2);
151 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
153 uint32_t id, time, old_id = 0, old_time = 0, n_overflow = 0;
155 for (uint32_t i = 0; i < count; i++) {
156 id = lat->port_queue_id;
159 // Same queue id as previous entry; time should always increase
160 if (time < old_time) {
163 lat->tx_time += UINT32_MAX * n_overflow;
166 // Different queue_id, time starts again at 0
174 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
176 struct lat_test *lat_test = task->lat_test;
178 for (uint32_t j = 0; j < task->generator_count; j++) {
179 struct early_loss_detect *eld = &task->eld[j];
181 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
185 static void task_lat_reset_eld(struct task_lat *task)
187 for (uint32_t j = 0; j < task->generator_count; j++) {
188 early_loss_detect_reset(&task->eld[j]);
192 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
194 uint64_t min_tsc = UINT64_MAX;
196 for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
197 if (min_tsc > task->latency_buffer[i].tx_time)
198 min_tsc = task->latency_buffer[i].tx_time;
201 return min_tsc << LATENCY_ACCURACY;
204 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
206 uint64_t lat = abs_diff(lat_info->rx_time, lat_info->tx_time);
208 return lat << LATENCY_ACCURACY;
211 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
213 return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
216 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
218 return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
221 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
223 return ((uint64_t)lat_info) << LATENCY_ACCURACY;
226 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
228 return ((uint64_t)lat_info) << LATENCY_ACCURACY;
231 static void lat_write_latency_to_file(struct task_lat *task)
236 min_tsc = lat_latency_buffer_get_min_tsc(task);
238 // Dumping all packet statistics
239 fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
240 fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
241 for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
242 struct lat_info *lat_info = &task->latency_buffer[i];
243 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
244 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
245 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
247 fprintf(task->fp_rx, "%u%d;%d;%ld;%lu;%lu\n",
248 lat_info->rx_packet_index,
249 lat_info->port_queue_id,
250 lat_info->tx_packet_index,
251 tsc_to_nsec(lat_tsc),
252 tsc_to_nsec(rx_tsc - min_tsc),
253 tsc_to_nsec(tx_tsc - min_tsc));
256 // To detect dropped packets, we need to sort them based on TX
257 plogx_info("Sorting packets based on queue_id\n");
258 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_queue_id);
259 plogx_info("Adapting tx_time\n");
260 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
261 plogx_info("Sorting packets based on tx_time\n");
262 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
263 plogx_info("Sorted packets based on tx_time\n");
265 // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
266 fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
267 fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
269 uint32_t prev_tx_packet_index = -1;
270 for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
271 struct lat_info *lat_info = &task->latency_buffer[i];
272 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
273 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
274 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
275 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
276 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
278 /* Packet n + 64 delivers the TX error for packet n,
279 hence the last 64 packets do no have TX error. */
280 if (i + 64 >= task->latency_buffer_idx) {
283 // Log dropped packet
284 n_loss = lat_info->tx_packet_index - prev_tx_packet_index - 1;
286 fprintf(task->fp_tx, "===> %d;%d;0;0;0;0; lost %d packets <===\n",
287 lat_info->port_queue_id,
288 lat_info->tx_packet_index - n_loss, n_loss);
290 fprintf(task->fp_tx, "%d;%d;%u;%lu;%lu;%lu;%lu;%lu\n",
291 lat_info->port_queue_id,
292 lat_info->tx_packet_index,
293 lat_info->rx_packet_index,
294 tsc_to_nsec(lat_tsc),
295 tsc_to_nsec(tx_tsc - min_tsc),
296 tsc_to_nsec(rx_tsc - min_tsc),
297 tsc_to_nsec(tx_err_tsc),
298 tsc_to_nsec(rx_err_tsc));
300 fprintf(task->fp_tx, ";%d from %d;%lu;%lu;%lu",
301 lat_info->id_in_bulk,
303 tsc_to_nsec(lat_info->begin - min_tsc),
304 tsc_to_nsec(lat_info->before - min_tsc),
305 tsc_to_nsec(lat_info->after - min_tsc));
307 fprintf(task->fp_tx, "\n");
308 prev_tx_packet_index = lat_info->tx_packet_index;
312 task->latency_buffer_idx = 0;
315 static void lat_stop(struct task_base *tbase)
317 struct task_lat *task = (struct task_lat *)tbase;
319 if (task->unique_id_pos) {
320 task_lat_count_remaining_lost_packets(task);
321 task_lat_reset_eld(task);
323 if (task->latency_buffer)
324 lat_write_latency_to_file(task);
328 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
330 struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
332 lat_info->bulk_size = bulk_size;
333 lat_info->id_in_bulk = id_in_bulk;
334 lat_info->begin = task->begin;
335 lat_info->before = task->base.aux->tsc_rx.before;
336 lat_info->after = task->base.aux->tsc_rx.after;
340 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, struct unique_id *unique_id, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err)
342 struct lat_info *lat_info;
343 uint8_t generator_id = 0;
344 uint32_t packet_index = 0;
347 unique_id_get(unique_id, &generator_id, &packet_index);
349 /* If unique_id_pos is specified then latency is stored per
350 packet being sent. Lost packets are detected runtime, and
351 latency stored for those packets will be 0 */
352 lat_info = &task->latency_buffer[task->latency_buffer_idx++];
353 lat_info->rx_packet_index = task->latency_buffer_idx - 1;
354 lat_info->tx_packet_index = packet_index;
355 lat_info->port_queue_id = generator_id;
356 lat_info->rx_time = rx_time;
357 lat_info->tx_time = tx_time;
358 lat_info->rx_err = rx_err;
359 lat_info->tx_err = tx_err;
362 static uint32_t task_lat_early_loss_detect(struct task_lat *task, struct unique_id *unique_id)
364 struct early_loss_detect *eld;
365 uint8_t generator_id;
366 uint32_t packet_index;
368 unique_id_get(unique_id, &generator_id, &packet_index);
370 if (generator_id >= task->generator_count)
373 eld = &task->eld[generator_id];
375 return early_loss_detect_add(eld, packet_index);
378 static uint64_t tsc_extrapolate_backward(uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
380 uint64_t tsc = tsc_from - rte_get_tsc_hz()*bytes/1250000000;
381 if (likely(tsc > tsc_minimum))
387 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
389 uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
390 size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
392 bucket_id = bucket_id < bucket_count? bucket_id : bucket_count;
393 lat_test->buckets[bucket_id]++;
396 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
398 lat_test->lost_packets += lost_packets;
401 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
403 lat_test->tot_all_pkts++;
405 if (error > lat_test->accuracy_limit_tsc)
407 lat_test->tot_pkts++;
409 lat_test->tot_lat += lat_tsc;
410 lat_test->tot_lat_error += error;
412 /* (a +- b)^2 = a^2 +- (2ab + b^2) */
413 lat_test->var_lat += lat_tsc * lat_tsc;
414 lat_test->var_lat_error += 2 * lat_tsc * error;
415 lat_test->var_lat_error += error * error;
417 if (lat_tsc > lat_test->max_lat) {
418 lat_test->max_lat = lat_tsc;
419 lat_test->max_lat_error = error;
421 if (lat_tsc < lat_test->min_lat) {
422 lat_test->min_lat = lat_tsc;
423 lat_test->min_lat_error = error;
426 #ifdef LATENCY_HISTOGRAM
427 lat_test_histogram_add(lat_test, lat_tsc);
431 static int task_lat_can_store_latency(struct task_lat *task)
433 return task->latency_buffer_idx < task->latency_buffer_size;
436 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, struct unique_id *unique_id)
440 uint32_t lat_tsc = abs_diff(rx_time, tx_time) << LATENCY_ACCURACY;
442 lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
444 if (task_lat_can_store_latency(task)) {
445 task_lat_store_lat_buf(task, rx_packet_index, unique_id, rx_time, tx_time, rx_error, tx_error);
449 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
451 struct task_lat *task = (struct task_lat *)tbase;
452 uint64_t rx_time_err;
454 uint32_t pkt_rx_time, pkt_tx_time;
457 task->begin = tbase->aux->tsc_rx.before;
461 task_lat_update_lat_test(task);
463 const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
464 uint32_t tx_time_err = 0;
466 /* Go once through all received packets and read them. If
467 packet has just been modified by another core, the cost of
468 latency will be partialy amortized though the bulk size */
469 for (uint16_t j = 0; j < n_pkts; ++j) {
470 struct rte_mbuf *mbuf = mbufs[j];
471 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
473 for (uint16_t j = 0; j < n_pkts; ++j) {
477 for (uint16_t j = 0; j < n_pkts; ++j) {
478 if (*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig)
479 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
481 task->rx_pkt_meta[j].pkt_tx_time = 0;
484 for (uint16_t j = 0; j < n_pkts; ++j) {
485 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
489 uint32_t bytes_total_in_bulk = 0;
490 // Find RX time of first packet, for RX accuracy
491 for (uint16_t j = 0; j < n_pkts; ++j) {
492 uint16_t flipped = n_pkts - 1 - j;
494 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
495 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
498 pkt_rx_time = tsc_extrapolate_backward(rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
499 if ((uint32_t)((task->begin >> LATENCY_ACCURACY)) > pkt_rx_time) {
500 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
501 rx_time_err = pkt_rx_time - (uint32_t)(task->last_pkts_tsc >> LATENCY_ACCURACY);
503 rx_time_err = pkt_rx_time - (uint32_t)(task->begin >> LATENCY_ACCURACY);
506 struct unique_id *unique_id = NULL;
507 struct delayed_latency_entry *delayed_latency_entry;
509 for (uint16_t j = 0; j < n_pkts; ++j) {
510 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
511 uint8_t *hdr = rx_pkt_meta->hdr;
513 pkt_rx_time = tsc_extrapolate_backward(rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
514 pkt_tx_time = rx_pkt_meta->pkt_tx_time;
516 if (task->unique_id_pos) {
517 unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
519 uint32_t n_loss = task_lat_early_loss_detect(task, unique_id);
520 lat_test_add_lost(task->lat_test, n_loss);
523 /* If accuracy is enabled, latency is reported with a
524 delay of 64 packets since the generator puts the
525 accuracy for packet N into packet N + 64. The delay
526 ensures that all reported latencies have both rx
528 if (task->accur_pos) {
529 tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
531 delayed_latency_entry = delayed_latency_get(&task->delayed_latency, task->rx_packet_index - 64);
533 if (delayed_latency_entry) {
534 task_lat_store_lat(task,
535 task->rx_packet_index,
536 delayed_latency_entry->pkt_rx_time,
537 delayed_latency_entry->pkt_tx_time,
538 delayed_latency_entry->rx_time_err,
543 delayed_latency_entry = delayed_latency_create(&task->delayed_latency, task->rx_packet_index);
544 delayed_latency_entry->pkt_rx_time = pkt_rx_time;
545 delayed_latency_entry->pkt_tx_time = pkt_tx_time;
546 delayed_latency_entry->rx_time_err = rx_time_err;
548 task_lat_store_lat(task,
549 task->rx_packet_index,
556 task->rx_packet_index++;
559 ret = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
560 task->begin = tbase->aux->tsc_rx.before;
561 task->last_pkts_tsc = tbase->aux->tsc_rx.after;
565 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
567 const int socket_id = rte_lcore_to_socket_id(core_id);
569 size_t latency_buffer_mem_size = 0;
571 if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
572 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
574 latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
576 task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
577 PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %ld kbytes for %s\n", latency_buffer_mem_size / 1024, name);
579 sprintf(name, "latency.rx_%d.txt", core_id);
580 task->fp_rx = fopen(name, "w+");
581 PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
583 sprintf(name, "latency.tx_%d.txt", core_id);
584 task->fp_tx = fopen(name, "w+");
585 PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
588 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
590 uint8_t *generator_count = prox_sh_find_system("generator_count");
593 if (generator_count == NULL)
594 task->generator_count = 0;
596 task->generator_count = *generator_count;
598 eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
599 task->eld = prox_zmalloc(eld_mem_size, socket_id);
602 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
604 task->limit = nsec_to_tsc(accuracy_limit_nsec);
607 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
609 struct task_lat *task = (struct task_lat *)tbase;
610 const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
612 task->lat_pos = targ->lat_pos;
613 task->accur_pos = targ->accur_pos;
614 task->sig_pos = targ->sig_pos;
615 task->sig = targ->sig;
617 task->unique_id_pos = targ->packet_id_pos;
618 task->latency_buffer_size = targ->latency_buffer_size;
620 if (task->latency_buffer_size) {
621 init_task_lat_latency_buffer(task, targ->lconf->id);
624 if (targ->bucket_size < LATENCY_ACCURACY) {
625 targ->bucket_size = DEFAULT_BUCKET_SIZE;
628 task->lt[0].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
629 task->lt[1].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
630 if (task->unique_id_pos) {
631 task_lat_init_eld(task, socket_id);
632 task_lat_reset_eld(task);
634 task->lat_test = &task->lt[task->using_lt];
636 task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
637 task->rx_pkt_meta = prox_zmalloc(MAX_RX_PKT_ALL * sizeof(*task->rx_pkt_meta), socket_id);
638 PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
641 static struct task_init task_init_lat = {
643 .init = init_task_lat,
644 .handle = handle_lat_bulk,
646 .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_RX_ALL | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
647 .size = sizeof(struct task_lat)
650 __attribute__((constructor)) static void reg_task_lat(void)
652 reg_task(&task_init_lat);