2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
19 #include <rte_cycles.h>
23 #include "handle_gen.h"
24 #include "prox_malloc.h"
25 #include "mbuf_utils.h"
26 #include "handle_lat.h"
28 #include "task_init.h"
29 #include "task_base.h"
34 #include "prox_shared.h"
35 #include "prox_port_cfg.h"
37 #define DEFAULT_BUCKET_SIZE 10
40 uint32_t rx_packet_index;
41 uint32_t tx_packet_index;
46 uint16_t port_queue_id;
56 struct delayed_latency_entry {
57 uint32_t rx_packet_idx;
63 struct delayed_latency {
64 struct delayed_latency_entry entries[64];
67 static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency *delayed_latency, uint32_t rx_packet_idx)
69 if (delayed_latency->entries[rx_packet_idx % 64].rx_packet_idx == rx_packet_idx)
70 return &delayed_latency->entries[rx_packet_idx % 64];
75 static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency *delayed_latency, uint32_t rx_packet_idx)
77 delayed_latency->entries[rx_packet_idx % 64].rx_packet_idx = rx_packet_idx;
78 return &delayed_latency->entries[rx_packet_idx % 64];
81 struct rx_pkt_meta_data {
84 uint32_t bytes_after_in_bulk;
88 struct task_base base;
90 uint64_t rx_packet_index;
91 uint64_t last_pkts_tsc;
92 struct delayed_latency delayed_latency;
93 struct lat_info *latency_buffer;
94 uint32_t latency_buffer_idx;
95 uint32_t latency_buffer_size;
98 uint16_t unique_id_pos;
102 volatile uint16_t use_lt; /* which lt to use, */
103 volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
104 struct lat_test lt[2];
105 struct lat_test *lat_test;
106 uint32_t generator_count;
107 struct early_loss_detect *eld;
108 struct rx_pkt_meta_data *rx_pkt_meta;
112 struct prox_port_cfg *port;
115 static uint32_t abs_diff(uint32_t a, uint32_t b)
117 return a < b? UINT32_MAX - (b - a - 1) : a - b;
120 struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
122 if (task->use_lt == task->using_lt)
123 return &task->lt[!task->using_lt];
127 void task_lat_use_other_latency_meassurement(struct task_lat *task)
129 task->use_lt = !task->using_lt;
132 static void task_lat_update_lat_test(struct task_lat *task)
134 if (task->use_lt != task->using_lt) {
135 task->using_lt = task->use_lt;
136 task->lat_test = &task->lt[task->using_lt];
137 task->lat_test->accuracy_limit_tsc = task->limit;
141 static int compare_tx_time(const void *val1, const void *val2)
143 const struct lat_info *ptr1 = val1;
144 const struct lat_info *ptr2 = val2;
146 return ptr1->tx_time - ptr2->tx_time;
149 static int compare_queue_id(const void *val1, const void *val2)
151 return compare_tx_time(val1, val2);
154 static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
156 uint32_t id, time, old_id = 0, old_time = 0, n_overflow = 0;
158 for (uint32_t i = 0; i < count; i++) {
159 id = lat->port_queue_id;
162 // Same queue id as previous entry; time should always increase
163 if (time < old_time) {
166 lat->tx_time += UINT32_MAX * n_overflow;
169 // Different queue_id, time starts again at 0
177 static void task_lat_count_remaining_lost_packets(struct task_lat *task)
179 struct lat_test *lat_test = task->lat_test;
181 for (uint32_t j = 0; j < task->generator_count; j++) {
182 struct early_loss_detect *eld = &task->eld[j];
184 lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
188 static void task_lat_reset_eld(struct task_lat *task)
190 for (uint32_t j = 0; j < task->generator_count; j++) {
191 early_loss_detect_reset(&task->eld[j]);
195 static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
197 uint64_t min_tsc = UINT64_MAX;
199 for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
200 if (min_tsc > task->latency_buffer[i].tx_time)
201 min_tsc = task->latency_buffer[i].tx_time;
204 return min_tsc << LATENCY_ACCURACY;
207 static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
209 uint64_t lat = abs_diff(lat_info->rx_time, lat_info->tx_time);
211 return lat << LATENCY_ACCURACY;
214 static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
216 return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
219 static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
221 return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
224 static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
226 return ((uint64_t)lat_info) << LATENCY_ACCURACY;
229 static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
231 return ((uint64_t)lat_info) << LATENCY_ACCURACY;
234 static void lat_write_latency_to_file(struct task_lat *task)
239 min_tsc = lat_latency_buffer_get_min_tsc(task);
241 // Dumping all packet statistics
242 fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
243 fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
244 for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
245 struct lat_info *lat_info = &task->latency_buffer[i];
246 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
247 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
248 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
250 fprintf(task->fp_rx, "%u%d;%d;%ld;%lu;%lu\n",
251 lat_info->rx_packet_index,
252 lat_info->port_queue_id,
253 lat_info->tx_packet_index,
254 tsc_to_nsec(lat_tsc),
255 tsc_to_nsec(rx_tsc - min_tsc),
256 tsc_to_nsec(tx_tsc - min_tsc));
259 // To detect dropped packets, we need to sort them based on TX
260 plogx_info("Sorting packets based on queue_id\n");
261 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_queue_id);
262 plogx_info("Adapting tx_time\n");
263 fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
264 plogx_info("Sorting packets based on tx_time\n");
265 qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
266 plogx_info("Sorted packets based on tx_time\n");
268 // A packet is marked as dropped if 2 packets received from the same queue are not consecutive
269 fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
270 fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
272 uint32_t prev_tx_packet_index = -1;
273 for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
274 struct lat_info *lat_info = &task->latency_buffer[i];
275 uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
276 uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
277 uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
278 uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
279 uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
281 /* Packet n + 64 delivers the TX error for packet n,
282 hence the last 64 packets do no have TX error. */
283 if (i + 64 >= task->latency_buffer_idx) {
286 // Log dropped packet
287 n_loss = lat_info->tx_packet_index - prev_tx_packet_index - 1;
289 fprintf(task->fp_tx, "===> %d;%d;0;0;0;0; lost %d packets <===\n",
290 lat_info->port_queue_id,
291 lat_info->tx_packet_index - n_loss, n_loss);
293 fprintf(task->fp_tx, "%d;%d;%u;%lu;%lu;%lu;%lu;%lu\n",
294 lat_info->port_queue_id,
295 lat_info->tx_packet_index,
296 lat_info->rx_packet_index,
297 tsc_to_nsec(lat_tsc),
298 tsc_to_nsec(tx_tsc - min_tsc),
299 tsc_to_nsec(rx_tsc - min_tsc),
300 tsc_to_nsec(tx_err_tsc),
301 tsc_to_nsec(rx_err_tsc));
303 fprintf(task->fp_tx, ";%d from %d;%lu;%lu;%lu",
304 lat_info->id_in_bulk,
306 tsc_to_nsec(lat_info->begin - min_tsc),
307 tsc_to_nsec(lat_info->before - min_tsc),
308 tsc_to_nsec(lat_info->after - min_tsc));
310 fprintf(task->fp_tx, "\n");
311 prev_tx_packet_index = lat_info->tx_packet_index;
315 task->latency_buffer_idx = 0;
318 static void lat_stop(struct task_base *tbase)
320 struct task_lat *task = (struct task_lat *)tbase;
322 if (task->unique_id_pos) {
323 task_lat_count_remaining_lost_packets(task);
324 task_lat_reset_eld(task);
326 if (task->latency_buffer)
327 lat_write_latency_to_file(task);
331 static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
333 struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];
335 lat_info->bulk_size = bulk_size;
336 lat_info->id_in_bulk = id_in_bulk;
337 lat_info->begin = task->begin;
338 lat_info->before = task->base.aux->tsc_rx.before;
339 lat_info->after = task->base.aux->tsc_rx.after;
343 static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, struct unique_id *unique_id, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err)
345 struct lat_info *lat_info;
346 uint8_t generator_id = 0;
347 uint32_t packet_index = 0;
350 unique_id_get(unique_id, &generator_id, &packet_index);
352 /* If unique_id_pos is specified then latency is stored per
353 packet being sent. Lost packets are detected runtime, and
354 latency stored for those packets will be 0 */
355 lat_info = &task->latency_buffer[task->latency_buffer_idx++];
356 lat_info->rx_packet_index = task->latency_buffer_idx - 1;
357 lat_info->tx_packet_index = packet_index;
358 lat_info->port_queue_id = generator_id;
359 lat_info->rx_time = rx_time;
360 lat_info->tx_time = tx_time;
361 lat_info->rx_err = rx_err;
362 lat_info->tx_err = tx_err;
365 static uint32_t task_lat_early_loss_detect(struct task_lat *task, struct unique_id *unique_id)
367 struct early_loss_detect *eld;
368 uint8_t generator_id;
369 uint32_t packet_index;
371 unique_id_get(unique_id, &generator_id, &packet_index);
373 if (generator_id >= task->generator_count)
376 eld = &task->eld[generator_id];
378 return early_loss_detect_add(eld, packet_index);
381 static uint64_t tsc_extrapolate_backward(uint64_t link_speed, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
383 uint64_t tsc = tsc_from - (rte_get_tsc_hz()*bytes)/link_speed;
384 if (likely(tsc > tsc_minimum))
390 static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
392 uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
393 size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
395 bucket_id = bucket_id < bucket_count? bucket_id : bucket_count;
396 lat_test->buckets[bucket_id]++;
399 static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
401 lat_test->lost_packets += lost_packets;
404 static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
406 lat_test->tot_all_pkts++;
408 if (error > lat_test->accuracy_limit_tsc)
410 lat_test->tot_pkts++;
412 lat_test->tot_lat += lat_tsc;
413 lat_test->tot_lat_error += error;
415 /* (a +- b)^2 = a^2 +- (2ab + b^2) */
416 lat_test->var_lat += lat_tsc * lat_tsc;
417 lat_test->var_lat_error += 2 * lat_tsc * error;
418 lat_test->var_lat_error += error * error;
420 if (lat_tsc > lat_test->max_lat) {
421 lat_test->max_lat = lat_tsc;
422 lat_test->max_lat_error = error;
424 if (lat_tsc < lat_test->min_lat) {
425 lat_test->min_lat = lat_tsc;
426 lat_test->min_lat_error = error;
429 #ifdef LATENCY_HISTOGRAM
430 lat_test_histogram_add(lat_test, lat_tsc);
434 static int task_lat_can_store_latency(struct task_lat *task)
436 return task->latency_buffer_idx < task->latency_buffer_size;
439 static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, struct unique_id *unique_id)
443 uint32_t lat_tsc = abs_diff(rx_time, tx_time) << LATENCY_ACCURACY;
445 lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
447 if (task_lat_can_store_latency(task)) {
448 task_lat_store_lat_buf(task, rx_packet_index, unique_id, rx_time, tx_time, rx_error, tx_error);
452 static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
454 struct task_lat *task = (struct task_lat *)tbase;
455 uint64_t rx_time_err;
457 uint32_t pkt_rx_time, pkt_tx_time;
460 task->begin = tbase->aux->tsc_rx.before;
464 task_lat_update_lat_test(task);
466 const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
467 uint32_t tx_time_err = 0;
469 /* Go once through all received packets and read them. If
470 packet has just been modified by another core, the cost of
471 latency will be partialy amortized though the bulk size */
472 for (uint16_t j = 0; j < n_pkts; ++j) {
473 struct rte_mbuf *mbuf = mbufs[j];
474 task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
476 for (uint16_t j = 0; j < n_pkts; ++j) {
480 for (uint16_t j = 0; j < n_pkts; ++j) {
481 if (*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig)
482 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
484 task->rx_pkt_meta[j].pkt_tx_time = 0;
487 for (uint16_t j = 0; j < n_pkts; ++j) {
488 task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
492 uint32_t bytes_total_in_bulk = 0;
493 // Find RX time of first packet, for RX accuracy
494 for (uint16_t j = 0; j < n_pkts; ++j) {
495 uint16_t flipped = n_pkts - 1 - j;
497 task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
498 bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
501 pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
502 if ((uint32_t)((task->begin >> LATENCY_ACCURACY)) > pkt_rx_time) {
503 // Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
504 rx_time_err = pkt_rx_time - (uint32_t)(task->last_pkts_tsc >> LATENCY_ACCURACY);
506 rx_time_err = pkt_rx_time - (uint32_t)(task->begin >> LATENCY_ACCURACY);
509 struct unique_id *unique_id = NULL;
510 struct delayed_latency_entry *delayed_latency_entry;
512 for (uint16_t j = 0; j < n_pkts; ++j) {
513 struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
514 uint8_t *hdr = rx_pkt_meta->hdr;
516 pkt_rx_time = tsc_extrapolate_backward(task->link_speed, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
517 pkt_tx_time = rx_pkt_meta->pkt_tx_time;
519 if (task->unique_id_pos) {
520 unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
522 uint32_t n_loss = task_lat_early_loss_detect(task, unique_id);
523 lat_test_add_lost(task->lat_test, n_loss);
526 /* If accuracy is enabled, latency is reported with a
527 delay of 64 packets since the generator puts the
528 accuracy for packet N into packet N + 64. The delay
529 ensures that all reported latencies have both rx
531 if (task->accur_pos) {
532 tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
534 delayed_latency_entry = delayed_latency_get(&task->delayed_latency, task->rx_packet_index - 64);
536 if (delayed_latency_entry) {
537 task_lat_store_lat(task,
538 task->rx_packet_index,
539 delayed_latency_entry->pkt_rx_time,
540 delayed_latency_entry->pkt_tx_time,
541 delayed_latency_entry->rx_time_err,
546 delayed_latency_entry = delayed_latency_create(&task->delayed_latency, task->rx_packet_index);
547 delayed_latency_entry->pkt_rx_time = pkt_rx_time;
548 delayed_latency_entry->pkt_tx_time = pkt_tx_time;
549 delayed_latency_entry->rx_time_err = rx_time_err;
551 task_lat_store_lat(task,
552 task->rx_packet_index,
559 task->rx_packet_index++;
562 ret = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
563 task->begin = tbase->aux->tsc_rx.before;
564 task->last_pkts_tsc = tbase->aux->tsc_rx.after;
568 static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
570 const int socket_id = rte_lcore_to_socket_id(core_id);
572 size_t latency_buffer_mem_size = 0;
574 if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
575 task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;
577 latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
579 task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
580 PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %ld kbytes for %s\n", latency_buffer_mem_size / 1024, name);
582 sprintf(name, "latency.rx_%d.txt", core_id);
583 task->fp_rx = fopen(name, "w+");
584 PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
586 sprintf(name, "latency.tx_%d.txt", core_id);
587 task->fp_tx = fopen(name, "w+");
588 PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
591 static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
593 uint8_t *generator_count = prox_sh_find_system("generator_count");
596 if (generator_count == NULL)
597 task->generator_count = 0;
599 task->generator_count = *generator_count;
601 eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
602 task->eld = prox_zmalloc(eld_mem_size, socket_id);
605 void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
607 task->limit = nsec_to_tsc(accuracy_limit_nsec);
610 static void lat_start(struct task_base *tbase)
612 struct task_lat *task = (struct task_lat *)tbase;
615 // task->port->link->speed reports the link speed in Mbps e.g. 40k for a 40 Gbps NIC
616 // task->link_speed reported link speed in Bytes per sec.
617 task->link_speed = task->port->link_speed * 125000L;
618 plog_info("\tReceiving at %ld Mbps\n", 8 * task->link_speed / 1000000);
622 static void init_task_lat(struct task_base *tbase, struct task_args *targ)
624 struct task_lat *task = (struct task_lat *)tbase;
625 const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
627 task->lat_pos = targ->lat_pos;
628 task->accur_pos = targ->accur_pos;
629 task->sig_pos = targ->sig_pos;
630 task->sig = targ->sig;
632 task->unique_id_pos = targ->packet_id_pos;
633 task->latency_buffer_size = targ->latency_buffer_size;
635 if (task->latency_buffer_size) {
636 init_task_lat_latency_buffer(task, targ->lconf->id);
639 if (targ->bucket_size < LATENCY_ACCURACY) {
640 targ->bucket_size = DEFAULT_BUCKET_SIZE;
643 task->lt[0].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
644 task->lt[1].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
645 if (task->unique_id_pos) {
646 task_lat_init_eld(task, socket_id);
647 task_lat_reset_eld(task);
649 task->lat_test = &task->lt[task->using_lt];
651 task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
652 task->rx_pkt_meta = prox_zmalloc(MAX_RX_PKT_ALL * sizeof(*task->rx_pkt_meta), socket_id);
653 PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
655 task->link_speed = UINT64_MAX;
656 if (targ->nb_rxports) {
657 // task->port structure is only used while starting handle_lat to get the link_speed.
658 // link_speed can not be quiried at init as the port has not been initialized yet.
659 struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
664 static struct task_init task_init_lat = {
666 .init = init_task_lat,
667 .handle = handle_lat_bulk,
670 .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_RX_ALL | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
671 .size = sizeof(struct task_lat)
674 __attribute__((constructor)) static void reg_task_lat(void)
676 reg_task(&task_init_lat);