2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include <rte_ethdev.h>
18 #include <rte_version.h>
22 #include "task_base.h"
25 #include "prox_assert.h"
27 #include "mbuf_utils.h"
28 #include "handle_master.h"
30 static void buf_pkt_single(struct task_base *tbase, struct rte_mbuf *mbuf, const uint8_t out)
32 const uint16_t prod = tbase->ws_mbuf->idx[out].prod++;
33 tbase->ws_mbuf->mbuf[out][prod & WS_MBUF_MASK] = mbuf;
36 static inline void buf_pkt_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
38 for (uint16_t j = 0; j < n_pkts; ++j) {
39 if (unlikely(out[j] >= OUT_HANDLED)) {
40 rte_pktmbuf_free(mbufs[j]);
41 if (out[j] == OUT_HANDLED)
42 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
44 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
47 buf_pkt_single(tbase, mbufs[j], out[j]);
53 int tx_pkt_l3(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
56 int first = 0, ret, ok = 0, rc;
57 const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
58 struct rte_mbuf *arp_mbuf = NULL; // used when one need to send both an ARP and a mbuf
60 for (int j = 0; j < n_pkts; j++) {
61 if ((out) && (out[j] >= OUT_HANDLED))
63 if (unlikely((rc = write_dst_mac(tbase, mbufs[j], &ip_dst)) != SEND_MBUF)) {
65 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
71 // We re-use the mbuf - no need to create a arp_mbuf and delete the existing mbuf
72 mbufs[j]->port = tbase->l3.reachable_port_id;
73 tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, REQ_MAC_TO_CTRL, mbufs[j], tbase->l3.core_id, tbase->l3.task_id, ip_dst);
75 case SEND_MBUF_AND_ARP:
76 // We send the mbuf and an ARP - we need to allocate another mbuf for ARP
77 ret = rte_mempool_get(tbase->l3.arp_pool, (void **)&arp_mbuf);
78 if (likely(ret == 0)) {
79 arp_mbuf->port = tbase->l3.reachable_port_id;
80 tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, REQ_MAC_TO_CTRL, arp_mbuf, tbase->l3.core_id, tbase->l3.task_id, ip_dst);
82 plog_err("Failed to get a mbuf from arp mempool\n");
83 // We still send the initial mbuf
85 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + j, 1, out);
89 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
95 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
101 /* The following help functions also report stats. Therefore we need
102 to pass the task_base struct. */
103 static inline int txhw_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
108 /* TX vector mode can't transmit more than 32 packets */
109 if (n_pkts > MAX_PMD_TX) {
110 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, MAX_PMD_TX);
111 ntx += rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs + ntx, n_pkts - ntx);
113 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
115 TASK_STATS_ADD_TX(&tbase->aux->stats, ntx);
119 plog_dbg("Failed to send %d packets from %p\n", ret, mbufs[0]);
120 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts - ntx);
121 if (tbase->tx_pkt == tx_pkt_bw) {
122 uint32_t drop_bytes = 0;
124 drop_bytes += mbuf_wire_size(mbufs[ntx]);
125 rte_pktmbuf_free(mbufs[ntx++]);
126 } while (ntx < n_pkts);
127 TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
131 rte_pktmbuf_free(mbufs[ntx++]);
132 } while (ntx < n_pkts);
138 static inline int txhw_no_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
143 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
145 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
153 static inline int ring_enq_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
156 /* return 0 on succes, -ENOBUFS on failure */
157 // Rings can be single or multiproducer (ctrl rings are multi producer)
158 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
159 if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts))) {
161 if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0)) {
164 if (tbase->tx_pkt == tx_pkt_bw) {
165 uint32_t drop_bytes = 0;
166 for (uint16_t i = 0; i < n_pkts; ++i) {
167 drop_bytes += mbuf_wire_size(mbufs[i]);
168 rte_pktmbuf_free(mbufs[i]);
170 TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
171 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
174 for (uint16_t i = 0; i < n_pkts; ++i)
175 rte_pktmbuf_free(mbufs[i]);
176 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
180 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
185 static inline int ring_enq_no_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
188 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
189 while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts)) {
191 while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0) {
195 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
199 void flush_queues_hw(struct task_base *tbase)
203 for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
204 prod = tbase->ws_mbuf->idx[i].prod;
205 cons = tbase->ws_mbuf->idx[i].cons;
208 tbase->ws_mbuf->idx[i].prod = 0;
209 tbase->ws_mbuf->idx[i].cons = 0;
210 txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
214 tbase->flags &= ~FLAG_TX_FLUSH;
217 void flush_queues_sw(struct task_base *tbase)
221 for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
222 prod = tbase->ws_mbuf->idx[i].prod;
223 cons = tbase->ws_mbuf->idx[i].cons;
226 tbase->ws_mbuf->idx[i].prod = 0;
227 tbase->ws_mbuf->idx[i].cons = 0;
228 ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
231 tbase->flags &= ~FLAG_TX_FLUSH;
234 void flush_queues_no_drop_hw(struct task_base *tbase)
238 for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
239 prod = tbase->ws_mbuf->idx[i].prod;
240 cons = tbase->ws_mbuf->idx[i].cons;
243 tbase->ws_mbuf->idx[i].prod = 0;
244 tbase->ws_mbuf->idx[i].cons = 0;
245 txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
249 tbase->flags &= ~FLAG_TX_FLUSH;
252 void flush_queues_no_drop_sw(struct task_base *tbase)
256 for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
257 prod = tbase->ws_mbuf->idx[i].prod;
258 cons = tbase->ws_mbuf->idx[i].cons;
261 tbase->ws_mbuf->idx[i].prod = 0;
262 tbase->ws_mbuf->idx[i].cons = 0;
263 ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
266 tbase->flags &= ~FLAG_TX_FLUSH;
269 /* "try" functions try to send packets to sw/hw w/o failing or blocking;
270 They return if ring/queue is full and are used by aggregators.
271 "try" functions do not have drop/no drop flavors
272 They are only implemented in never_discard mode (as by default they
273 use only one outgoing ring. */
274 uint16_t tx_try_self(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
277 tx_pkt_never_discard_self(tbase, mbufs, n_pkts, NULL);
280 tx_pkt_never_discard_self(tbase, mbufs, 64, NULL);
285 uint16_t tx_try_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
287 const int bulk_size = 64;
288 uint16_t ret = bulk_size, sent = 0, n_bulks;
289 n_bulks = n_pkts >> __builtin_ctz(bulk_size);
291 for (int i = 0; i < n_bulks; i++) {
292 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
293 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size);
295 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size, NULL);
299 if (ret != bulk_size)
302 if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
303 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
304 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)));
306 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)), NULL);
311 TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
315 uint16_t tx_try_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
317 const int bulk_size = 64;
318 uint16_t ret = bulk_size, n_bulks, sent = 0;
319 n_bulks = n_pkts >> __builtin_ctz(bulk_size);
321 const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
322 for (int i = 0; i < n_bulks; i++) {
323 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, bulk_size);
326 if (ret != bulk_size)
329 if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
330 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, (n_pkts & (bulk_size - 1)));
334 TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
338 int tx_pkt_no_drop_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
340 return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
343 int tx_pkt_no_drop_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
345 static uint8_t fake_out[MAX_PKT_BURST] = {0};
347 if (n_pkts == MAX_PKT_BURST) {
348 // First xmit what was queued
351 prod = tbase->ws_mbuf->idx[0].prod;
352 cons = tbase->ws_mbuf->idx[0].cons;
354 if ((uint16_t)(prod - cons)){
355 tbase->flags &= ~FLAG_TX_FLUSH;
356 tbase->ws_mbuf->idx[0].prod = 0;
357 tbase->ws_mbuf->idx[0].cons = 0;
358 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
360 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
362 ret+= tx_pkt_no_drop_hw(tbase, mbufs, n_pkts, fake_out);
367 int tx_pkt_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
369 return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
372 int tx_pkt_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
374 static uint8_t fake_out[MAX_PKT_BURST] = {0};
376 if (n_pkts == MAX_PKT_BURST) {
377 // First xmit what was queued
380 prod = tbase->ws_mbuf->idx[0].prod;
381 cons = tbase->ws_mbuf->idx[0].cons;
383 if ((uint16_t)(prod - cons)){
384 tbase->flags &= ~FLAG_TX_FLUSH;
385 tbase->ws_mbuf->idx[0].prod = 0;
386 tbase->ws_mbuf->idx[0].cons = 0;
387 ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
389 ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
391 ret+= tx_pkt_hw(tbase, mbufs, n_pkts, fake_out);
396 /* Transmit to hw using tx_params_hw_sw structure
397 This function is used to transmit to hw when tx_params_hw_sw should be used
398 i.e. when the task needs to transmit both to hw and sw */
399 int tx_pkt_no_drop_never_discard_hw1_no_pointer(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
401 txhw_no_drop(&tbase->tx_params_hw_sw.tx_port_queue, mbufs, n_pkts, tbase);
405 int tx_pkt_no_drop_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
407 return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
410 int tx_pkt_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
412 return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
415 static uint16_t tx_pkt_free_dropped(__attribute__((unused)) struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
419 /* The most probable and most important optimize case is if
420 the no packets should be dropped. */
421 for (i = 0; i + 8 < n_pkts; i += 8) {
422 v |= *((uint64_t*)(&out[i]));
424 for (; i < n_pkts; ++i) {
429 /* At least some packets need to be dropped, so the
430 mbufs array needs to be updated. */
432 uint16_t n_discard = 0;
433 for (uint16_t i = 0; i < n_pkts; ++i) {
434 if (unlikely(out[i] >= OUT_HANDLED)) {
435 rte_pktmbuf_free(mbufs[i]);
436 n_discard += out[i] == OUT_DISCARD;
439 mbufs[n_kept++] = mbufs[i];
441 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, n_discard);
442 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts - n_kept - n_discard);
448 int tx_pkt_no_drop_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
450 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
454 ret = txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
458 int tx_pkt_no_drop_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
460 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
464 ret = ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
468 int tx_pkt_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
470 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
473 return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
477 int tx_pkt_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
479 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
482 return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
486 int tx_pkt_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
488 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
490 TASK_STATS_ADD_TX(&tbase->aux->stats, n_kept);
491 tbase->ws_mbuf->idx[0].nb_rx = n_kept;
492 struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
493 for (uint16_t i = 0; i < n_kept; ++i) {
494 tx_mbuf[i] = mbufs[i];
499 int tx_pkt_never_discard_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
501 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
502 tbase->ws_mbuf->idx[0].nb_rx = n_pkts;
503 struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
504 for (uint16_t i = 0; i < n_pkts; ++i) {
505 tx_mbuf[i] = mbufs[i];
510 int tx_pkt_no_drop_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
513 buf_pkt_all(tbase, mbufs, n_pkts, out);
515 const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
518 for (uint8_t i = 0; i < nb_bufs; ++i) {
519 prod = tbase->ws_mbuf->idx[i].prod;
520 cons = tbase->ws_mbuf->idx[i].cons;
522 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
523 tbase->flags &= ~FLAG_TX_FLUSH;
524 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
525 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
531 int tx_pkt_no_drop_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
534 buf_pkt_all(tbase, mbufs, n_pkts, out);
536 const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
539 for (uint8_t i = 0; i < nb_bufs; ++i) {
540 prod = tbase->ws_mbuf->idx[i].prod;
541 cons = tbase->ws_mbuf->idx[i].cons;
543 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
544 tbase->flags &= ~FLAG_TX_FLUSH;
545 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
546 ret += ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
552 int tx_pkt_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
555 buf_pkt_all(tbase, mbufs, n_pkts, out);
557 const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
560 for (uint8_t i = 0; i < nb_bufs; ++i) {
561 prod = tbase->ws_mbuf->idx[i].prod;
562 cons = tbase->ws_mbuf->idx[i].cons;
564 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
565 tbase->flags &= ~FLAG_TX_FLUSH;
566 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
567 ret += txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
573 int tx_pkt_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
576 buf_pkt_all(tbase, mbufs, n_pkts, out);
578 const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
580 for (uint8_t i = 0; i < nb_bufs; ++i) {
581 prod = tbase->ws_mbuf->idx[i].prod;
582 cons = tbase->ws_mbuf->idx[i].cons;
584 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
585 tbase->flags &= ~FLAG_TX_FLUSH;
586 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
587 ret+= ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
593 static inline void trace_one_rx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf)
596 /* For each packet being transmitted, find which
597 buffer represent the packet as it was before
600 uint32_t len = sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr)/sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr[0]);
601 for (;j < len; ++j) {
602 if (tbase->aux->task_rt_dump.pkt_mbuf_addr[j] == mbuf)
606 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
609 rte_pktmbuf_data_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
610 rte_pktmbuf_pkt_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
611 tmp.buf_addr = tbase->aux->task_rt_dump.pkt_cpy[j];
612 plogdx_info(&tmp, "Trace RX: ");
616 static inline void trace_one_tx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf, uint8_t *out, uint32_t i)
621 plogdx_info(mbuf, "Handled: ");
624 plogdx_info(mbuf, "Dropped: ");
627 plogdx_info(mbuf, "TX[%d]: ", out[i]);
630 } else if (tbase->aux->tx_pkt_orig == tx_pkt_drop_all) {
631 plogdx_info(mbuf, "Dropped: ");
633 plogdx_info(mbuf, "TX[0]: ");
636 static void unset_trace(struct task_base *tbase)
638 if (0 == tbase->aux->task_rt_dump.n_trace) {
639 if (tbase->tx_pkt == tx_pkt_l3) {
640 tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
641 tbase->aux->tx_pkt_orig = NULL;
643 tbase->tx_pkt = tbase->aux->tx_pkt_orig;
644 tbase->aux->tx_pkt_orig = NULL;
646 tbase->aux->task_rt_dump.cur_trace = 0;
647 task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
651 int tx_pkt_trace(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
654 if (tbase->aux->task_rt_dump.cur_trace == 0) {
655 // No packet received since dumping...
656 tbase->aux->task_rt_dump.n_print_tx = tbase->aux->task_rt_dump.n_trace;
657 if (tbase->aux->task_rt_dump.n_trace < n_pkts) {
658 tbase->aux->task_rt_dump.n_trace = 0;
659 tbase->aux->task_rt_dump.cur_trace = 0;
660 task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
662 tbase->aux->task_rt_dump.n_trace -= n_pkts;
664 ret = tx_pkt_dump(tbase, mbufs, n_pkts, out);
665 tbase->aux->task_rt_dump.n_print_tx = 0;
668 plog_info("Tracing %d pkts\n", tbase->aux->task_rt_dump.cur_trace);
669 uint32_t cur_trace = (n_pkts < tbase->aux->task_rt_dump.cur_trace) ? n_pkts: tbase->aux->task_rt_dump.cur_trace;
670 for (uint32_t i = 0; i < cur_trace; ++i) {
671 trace_one_rx_pkt(tbase, mbufs[i]);
672 trace_one_tx_pkt(tbase, mbufs[i], out, i);
675 ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
681 int tx_pkt_dump(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
683 uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
686 n_dump = n_pkts < n_dump? n_pkts : n_dump;
687 for (uint32_t i = 0; i < n_dump; ++i) {
691 plogdx_info(mbufs[i], "Handled: ");
694 plogdx_info(mbufs[i], "Dropped: ");
697 plogdx_info(mbufs[i], "TX[%d]: ", out[i]);
701 plogdx_info(mbufs[i], "TX: ");
703 tbase->aux->task_rt_dump.n_print_tx -= n_dump;
705 ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
707 if (0 == tbase->aux->task_rt_dump.n_print_tx) {
708 if (tbase->tx_pkt == tx_pkt_l3) {
709 tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
710 tbase->aux->tx_pkt_orig = NULL;
712 tbase->tx_pkt = tbase->aux->tx_pkt_orig;
713 tbase->aux->tx_pkt_orig = NULL;
719 /* Gather the distribution of the number of packets that have been
720 xmitted from one TX call. Since the value is only modified by the
721 task that xmits the packet, no atomic operation is needed. */
722 int tx_pkt_distr(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
724 if (likely(n_pkts < TX_BUCKET_SIZE))
725 tbase->aux->tx_bucket[n_pkts]++;
727 tbase->aux->tx_bucket[TX_BUCKET_SIZE - 1]++;
728 return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
731 int tx_pkt_bw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
733 uint32_t tx_bytes = 0;
734 uint32_t drop_bytes = 0;
736 for (uint16_t i = 0; i < n_pkts; ++i) {
737 if (!out || out[i] < OUT_HANDLED)
738 tx_bytes += mbuf_wire_size(mbufs[i]);
740 drop_bytes += mbuf_wire_size(mbufs[i]);
743 TASK_STATS_ADD_TX_BYTES(&tbase->aux->stats, tx_bytes);
744 TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
745 return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
748 int tx_pkt_drop_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
750 for (uint16_t j = 0; j < n_pkts; ++j) {
751 rte_pktmbuf_free(mbufs[j]);
754 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts);
756 for (uint16_t j = 0; j < n_pkts; ++j) {
757 if (out[j] == OUT_HANDLED)
758 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
760 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
766 static inline int tx_ring_all(struct task_base *tbase, struct rte_ring *ring, uint16_t command, struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
768 if (tbase->aux->task_rt_dump.cur_trace) {
769 trace_one_rx_pkt(tbase, mbuf);
771 mbuf->udata64 = ((uint64_t)ip << 32) | (core_id << 16) | (task_id << 8) | command;
772 return rte_ring_enqueue(ring, mbuf);
775 void tx_ring_cti(struct task_base *tbase, struct rte_ring *ring, uint16_t command, struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
777 plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
778 int ret = tx_ring_all(tbase, ring, command, mbuf, core_id, task_id, ip);
779 if (unlikely(ret != 0)) {
780 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
781 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
782 rte_pktmbuf_free(mbuf);
786 void tx_ring_ip(struct task_base *tbase, struct rte_ring *ring, uint16_t command, struct rte_mbuf *mbuf, uint32_t ip)
788 plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
789 int ret = tx_ring_all(tbase, ring, command, mbuf, 0, 0, ip);
790 if (unlikely(ret != 0)) {
791 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
792 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
793 rte_pktmbuf_free(mbuf);
797 void tx_ring(struct task_base *tbase, struct rte_ring *ring, uint16_t command, struct rte_mbuf *mbuf)
799 plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
800 int ret = tx_ring_all(tbase, ring, command, mbuf, 0, 0, 0);
801 if (unlikely(ret != 0)) {
802 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
803 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
804 rte_pktmbuf_free(mbuf);