2 // Copyright (c) 2010-2020 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include <rte_ethdev.h>
18 #include <rte_version.h>
22 #include "task_base.h"
25 #include "prox_assert.h"
27 #include "mbuf_utils.h"
28 #include "handle_master.h"
31 static void buf_pkt_single(struct task_base *tbase, struct rte_mbuf *mbuf, const uint8_t out)
33 const uint16_t prod = tbase->ws_mbuf->idx[out].prod++;
34 tbase->ws_mbuf->mbuf[out][prod & WS_MBUF_MASK] = mbuf;
37 static inline void buf_pkt_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
39 for (uint16_t j = 0; j < n_pkts; ++j) {
40 if (unlikely(out[j] >= OUT_HANDLED)) {
41 rte_pktmbuf_free(mbufs[j]);
42 if (out[j] == OUT_HANDLED)
43 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
45 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
48 buf_pkt_single(tbase, mbufs[j], out[j]);
54 void store_packet(struct task_base *tbase, struct rte_mbuf *mbuf)
56 // If buffer is full, drop the first mbuf
58 tx_drop(tbase->aux->mbuf);
59 tbase->aux->mbuf = mbuf;
62 int tx_pkt_ndp(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
64 struct ipv6_addr ip_dst;
65 int first = 0, ret, ok = 0, rc;
66 const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
67 struct rte_mbuf *mbuf = NULL; // used when one need to send both an ARP and a mbuf
69 uint64_t tsc = rte_rdtsc();
71 for (int j = 0; j < n_pkts; j++) {
72 if ((out) && (out[j] >= OUT_HANDLED))
74 if (unlikely((rc = write_ip6_dst_mac(tbase, mbufs[j], &ip_dst, &vlan, tsc)) != SEND_MBUF)) {
76 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
82 // Original mbuf (packet) is stored to be sent later -> need to allocate new mbuf
83 ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&mbuf);
84 if (likely(ret == 0)) {
85 store_packet(tbase, mbufs[j]);
86 mbuf->port = tbase->l3.reachable_port_id;
87 tx_ring_cti6(tbase, tbase->l3.ctrl_plane_ring, IP6_REQ_MAC_TO_MASTER, mbuf, tbase->l3.core_id, tbase->l3.task_id, &ip_dst, vlan);
89 plog_err("Failed to get a mbuf from arp/nd mempool\n");
91 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
94 case SEND_MBUF_AND_ARP_ND:
95 // We send the mbuf and an ND - we need to allocate another mbuf for ND
96 ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&mbuf);
97 if (likely(ret == 0)) {
98 mbuf->port = tbase->l3.reachable_port_id;
99 tx_ring_cti6(tbase, tbase->l3.ctrl_plane_ring, IP6_REQ_MAC_TO_MASTER, mbuf, tbase->l3.core_id, tbase->l3.task_id, &ip_dst, vlan);
101 plog_err("Failed to get a mbuf from arp/nd mempool\n");
102 // We still send the initial mbuf
104 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + j, 1, out);
108 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
113 if (n_pkts - first) {
114 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
119 int tx_pkt_l3(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
123 int first = 0, ret, ok = 0, rc;
124 const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
125 struct rte_mbuf *arp_mbuf = NULL; // used when one need to send both an ARP and a mbuf
127 uint64_t tsc = rte_rdtsc();
129 for (int j = 0; j < n_pkts; j++) {
130 if ((out) && (out[j] >= OUT_HANDLED))
132 if (unlikely((rc = write_dst_mac(tbase, mbufs[j], &ip_dst, &vlan, &time, tsc)) != SEND_MBUF)) {
134 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
140 // We re-use the mbuf - no need to create a arp_mbuf and delete the existing mbuf
141 mbufs[j]->port = tbase->l3.reachable_port_id;
142 if (tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, IP4_REQ_MAC_TO_MASTER, mbufs[j], tbase->l3.core_id, tbase->l3.task_id, ip_dst, vlan) == 0)
143 update_arp_ndp_retransmit_timeout(&tbase->l3, time, 1000);
145 update_arp_ndp_retransmit_timeout(&tbase->l3, time, 100);
147 case SEND_MBUF_AND_ARP_ND:
148 // We send the mbuf and an ARP - we need to allocate another mbuf for ARP
149 ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&arp_mbuf);
150 if (likely(ret == 0)) {
151 arp_mbuf->port = tbase->l3.reachable_port_id;
152 if (tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, IP4_REQ_MAC_TO_MASTER, arp_mbuf, tbase->l3.core_id, tbase->l3.task_id, ip_dst, vlan) == 0)
153 update_arp_ndp_retransmit_timeout(&tbase->l3, time, 1000);
155 update_arp_ndp_retransmit_timeout(&tbase->l3, time, 100);
157 plog_err("Failed to get a mbuf from arp mempool\n");
158 // We still send the initial mbuf
160 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + j, 1, out);
164 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
169 if (n_pkts - first) {
170 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
176 /* The following help functions also report stats. Therefore we need
177 to pass the task_base struct. */
178 static inline int txhw_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
183 /* TX vector mode can't transmit more than 32 packets */
184 if (n_pkts > MAX_PMD_TX) {
185 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, MAX_PMD_TX);
186 ntx += rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs + ntx, n_pkts - ntx);
188 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
190 TASK_STATS_ADD_TX(&tbase->aux->stats, ntx);
194 plog_dbg("Failed to send %d packets from %p\n", ret, mbufs[0]);
195 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts - ntx);
196 if (tbase->tx_pkt == tx_pkt_bw) {
197 uint32_t drop_bytes = 0;
199 drop_bytes += mbuf_wire_size(mbufs[ntx]);
200 rte_pktmbuf_free(mbufs[ntx++]);
201 } while (ntx < n_pkts);
202 TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
206 rte_pktmbuf_free(mbufs[ntx++]);
207 } while (ntx < n_pkts);
213 static inline int txhw_no_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
218 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
220 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
228 static inline int ring_enq_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
231 /* return 0 on succes, -ENOBUFS on failure */
232 // Rings can be single or multiproducer (ctrl rings are multi producer)
233 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
234 if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts))) {
236 if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0)) {
239 if (tbase->tx_pkt == tx_pkt_bw) {
240 uint32_t drop_bytes = 0;
241 for (uint16_t i = 0; i < n_pkts; ++i) {
242 drop_bytes += mbuf_wire_size(mbufs[i]);
243 rte_pktmbuf_free(mbufs[i]);
245 TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
246 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
249 for (uint16_t i = 0; i < n_pkts; ++i)
250 rte_pktmbuf_free(mbufs[i]);
251 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
255 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
260 static inline int ring_enq_no_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
263 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
264 while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts)) {
266 while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0) {
270 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
274 void flush_queues_hw(struct task_base *tbase)
278 for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
279 prod = tbase->ws_mbuf->idx[i].prod;
280 cons = tbase->ws_mbuf->idx[i].cons;
283 tbase->ws_mbuf->idx[i].prod = 0;
284 tbase->ws_mbuf->idx[i].cons = 0;
285 txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
289 tbase->flags &= ~FLAG_TX_FLUSH;
292 void flush_queues_sw(struct task_base *tbase)
296 for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
297 prod = tbase->ws_mbuf->idx[i].prod;
298 cons = tbase->ws_mbuf->idx[i].cons;
301 tbase->ws_mbuf->idx[i].prod = 0;
302 tbase->ws_mbuf->idx[i].cons = 0;
303 ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
306 tbase->flags &= ~FLAG_TX_FLUSH;
309 void flush_queues_no_drop_hw(struct task_base *tbase)
313 for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
314 prod = tbase->ws_mbuf->idx[i].prod;
315 cons = tbase->ws_mbuf->idx[i].cons;
318 tbase->ws_mbuf->idx[i].prod = 0;
319 tbase->ws_mbuf->idx[i].cons = 0;
320 txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
324 tbase->flags &= ~FLAG_TX_FLUSH;
327 void flush_queues_no_drop_sw(struct task_base *tbase)
331 for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
332 prod = tbase->ws_mbuf->idx[i].prod;
333 cons = tbase->ws_mbuf->idx[i].cons;
336 tbase->ws_mbuf->idx[i].prod = 0;
337 tbase->ws_mbuf->idx[i].cons = 0;
338 ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
341 tbase->flags &= ~FLAG_TX_FLUSH;
344 /* "try" functions try to send packets to sw/hw w/o failing or blocking;
345 They return if ring/queue is full and are used by aggregators.
346 "try" functions do not have drop/no drop flavors
347 They are only implemented in never_discard mode (as by default they
348 use only one outgoing ring. */
349 uint16_t tx_try_self(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
352 tx_pkt_never_discard_self(tbase, mbufs, n_pkts, NULL);
355 tx_pkt_never_discard_self(tbase, mbufs, 64, NULL);
360 uint16_t tx_try_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
362 const int bulk_size = 64;
363 uint16_t ret = bulk_size, sent = 0, n_bulks;
364 n_bulks = n_pkts >> __builtin_ctz(bulk_size);
366 for (int i = 0; i < n_bulks; i++) {
367 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
368 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size);
370 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size, NULL);
374 if (ret != bulk_size)
377 if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
378 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
379 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)));
381 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)), NULL);
386 TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
390 uint16_t tx_try_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
392 const int bulk_size = 64;
393 uint16_t ret = bulk_size, n_bulks, sent = 0;
394 n_bulks = n_pkts >> __builtin_ctz(bulk_size);
396 const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
397 for (int i = 0; i < n_bulks; i++) {
398 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, bulk_size);
401 if (ret != bulk_size)
404 if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
405 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, (n_pkts & (bulk_size - 1)));
409 TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
413 int tx_pkt_no_drop_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
415 return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
418 int tx_pkt_no_drop_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
420 static uint8_t fake_out[MAX_PKT_BURST] = {0};
422 if (n_pkts == MAX_PKT_BURST) {
423 // First xmit what was queued
426 prod = tbase->ws_mbuf->idx[0].prod;
427 cons = tbase->ws_mbuf->idx[0].cons;
429 if ((uint16_t)(prod - cons)){
430 tbase->flags &= ~FLAG_TX_FLUSH;
431 tbase->ws_mbuf->idx[0].prod = 0;
432 tbase->ws_mbuf->idx[0].cons = 0;
433 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
435 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
437 ret+= tx_pkt_no_drop_hw(tbase, mbufs, n_pkts, fake_out);
442 int tx_pkt_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
444 return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
447 int tx_pkt_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
449 static uint8_t fake_out[MAX_PKT_BURST] = {0};
451 if (n_pkts == MAX_PKT_BURST) {
452 // First xmit what was queued
455 prod = tbase->ws_mbuf->idx[0].prod;
456 cons = tbase->ws_mbuf->idx[0].cons;
458 if ((uint16_t)(prod - cons)){
459 tbase->flags &= ~FLAG_TX_FLUSH;
460 tbase->ws_mbuf->idx[0].prod = 0;
461 tbase->ws_mbuf->idx[0].cons = 0;
462 ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
464 ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
466 ret+= tx_pkt_hw(tbase, mbufs, n_pkts, fake_out);
471 /* Transmit to hw using tx_params_hw_sw structure
472 This function is used to transmit to hw when tx_params_hw_sw should be used
473 i.e. when the task needs to transmit both to hw and sw */
474 int tx_pkt_no_drop_never_discard_hw1_no_pointer(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
476 txhw_no_drop(&tbase->tx_params_hw_sw.tx_port_queue, mbufs, n_pkts, tbase);
480 int tx_pkt_no_drop_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
482 return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
485 int tx_pkt_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
487 return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
490 static uint16_t tx_pkt_free_dropped(__attribute__((unused)) struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
494 /* The most probable and most important optimize case is if
495 the no packets should be dropped. */
496 for (i = 0; i + 8 < n_pkts; i += 8) {
497 v |= *((uint64_t*)(&out[i]));
499 for (; i < n_pkts; ++i) {
504 /* At least some packets need to be dropped, so the
505 mbufs array needs to be updated. */
507 uint16_t n_discard = 0;
508 for (uint16_t i = 0; i < n_pkts; ++i) {
509 if (unlikely(out[i] >= OUT_HANDLED)) {
510 rte_pktmbuf_free(mbufs[i]);
511 n_discard += out[i] == OUT_DISCARD;
514 mbufs[n_kept++] = mbufs[i];
516 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, n_discard);
517 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts - n_kept - n_discard);
523 int tx_pkt_no_drop_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
525 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
529 ret = txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
533 int tx_pkt_no_drop_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
535 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
539 ret = ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
543 int tx_pkt_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
545 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
548 return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
552 int tx_pkt_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
554 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
557 return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
561 int tx_pkt_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
563 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
565 TASK_STATS_ADD_TX(&tbase->aux->stats, n_kept);
566 tbase->ws_mbuf->idx[0].nb_rx = n_kept;
567 struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
568 for (uint16_t i = 0; i < n_kept; ++i) {
569 tx_mbuf[i] = mbufs[i];
574 int tx_pkt_never_discard_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
576 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
577 tbase->ws_mbuf->idx[0].nb_rx = n_pkts;
578 struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
579 for (uint16_t i = 0; i < n_pkts; ++i) {
580 tx_mbuf[i] = mbufs[i];
585 int tx_pkt_no_drop_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
588 buf_pkt_all(tbase, mbufs, n_pkts, out);
590 const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
593 for (uint8_t i = 0; i < nb_bufs; ++i) {
594 prod = tbase->ws_mbuf->idx[i].prod;
595 cons = tbase->ws_mbuf->idx[i].cons;
597 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
598 tbase->flags &= ~FLAG_TX_FLUSH;
599 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
600 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
606 int tx_pkt_no_drop_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
609 buf_pkt_all(tbase, mbufs, n_pkts, out);
611 const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
614 for (uint8_t i = 0; i < nb_bufs; ++i) {
615 prod = tbase->ws_mbuf->idx[i].prod;
616 cons = tbase->ws_mbuf->idx[i].cons;
618 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
619 tbase->flags &= ~FLAG_TX_FLUSH;
620 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
621 ret += ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
627 int tx_pkt_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
630 buf_pkt_all(tbase, mbufs, n_pkts, out);
632 const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
635 for (uint8_t i = 0; i < nb_bufs; ++i) {
636 prod = tbase->ws_mbuf->idx[i].prod;
637 cons = tbase->ws_mbuf->idx[i].cons;
639 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
640 tbase->flags &= ~FLAG_TX_FLUSH;
641 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
642 ret += txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
648 int tx_pkt_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
651 buf_pkt_all(tbase, mbufs, n_pkts, out);
653 const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
655 for (uint8_t i = 0; i < nb_bufs; ++i) {
656 prod = tbase->ws_mbuf->idx[i].prod;
657 cons = tbase->ws_mbuf->idx[i].cons;
659 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
660 tbase->flags &= ~FLAG_TX_FLUSH;
661 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
662 ret+= ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
668 static inline void trace_one_rx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf)
671 /* For each packet being transmitted, find which
672 buffer represent the packet as it was before
675 uint32_t len = sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr)/sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr[0]);
676 for (;j < len; ++j) {
677 if (tbase->aux->task_rt_dump.pkt_mbuf_addr[j] == mbuf)
681 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
684 rte_pktmbuf_data_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
685 rte_pktmbuf_pkt_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
686 tmp.buf_addr = tbase->aux->task_rt_dump.pkt_cpy[j];
687 plogdx_info(&tmp, "Trace RX: ");
691 static inline void trace_one_tx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf, uint8_t *out, uint32_t i)
696 plogdx_info(mbuf, "Handled: ");
699 plogdx_info(mbuf, "Dropped: ");
702 plogdx_info(mbuf, "TX[%d]: ", out[i]);
705 } else if (tbase->aux->tx_pkt_orig == tx_pkt_drop_all) {
706 plogdx_info(mbuf, "Dropped: ");
708 plogdx_info(mbuf, "TX[0]: ");
711 static void unset_trace(struct task_base *tbase)
713 if (0 == tbase->aux->task_rt_dump.n_trace) {
714 if ((tbase->tx_pkt == tx_pkt_l3) || (tbase->tx_pkt == tx_pkt_ndp)){
715 tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
716 tbase->aux->tx_pkt_orig = NULL;
718 tbase->tx_pkt = tbase->aux->tx_pkt_orig;
719 tbase->aux->tx_pkt_orig = NULL;
721 tbase->aux->task_rt_dump.cur_trace = 0;
722 task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
726 int tx_pkt_trace(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
729 if (tbase->aux->task_rt_dump.cur_trace == 0) {
730 // No packet received since dumping...
731 tbase->aux->task_rt_dump.n_print_tx = tbase->aux->task_rt_dump.n_trace;
732 if (tbase->aux->task_rt_dump.n_trace < n_pkts) {
733 tbase->aux->task_rt_dump.n_trace = 0;
734 tbase->aux->task_rt_dump.cur_trace = 0;
735 task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
737 tbase->aux->task_rt_dump.n_trace -= n_pkts;
739 ret = tx_pkt_dump(tbase, mbufs, n_pkts, out);
740 tbase->aux->task_rt_dump.n_print_tx = 0;
743 plog_info("Tracing %d pkts\n", tbase->aux->task_rt_dump.cur_trace);
744 uint32_t cur_trace = (n_pkts < tbase->aux->task_rt_dump.cur_trace) ? n_pkts: tbase->aux->task_rt_dump.cur_trace;
745 for (uint32_t i = 0; i < cur_trace; ++i) {
746 trace_one_rx_pkt(tbase, mbufs[i]);
747 trace_one_tx_pkt(tbase, mbufs[i], out, i);
750 ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
756 int tx_pkt_dump(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
758 uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
761 n_dump = n_pkts < n_dump? n_pkts : n_dump;
762 for (uint32_t i = 0; i < n_dump; ++i) {
766 plogdx_info(mbufs[i], "Handled: ");
769 plogdx_info(mbufs[i], "Dropped: ");
772 plogdx_info(mbufs[i], "TX[%d]: ", out[i]);
776 plogdx_info(mbufs[i], "TX: ");
778 tbase->aux->task_rt_dump.n_print_tx -= n_dump;
780 ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
782 if (0 == tbase->aux->task_rt_dump.n_print_tx) {
783 if ((tbase->tx_pkt == tx_pkt_l3) || (tbase->tx_pkt == tx_pkt_ndp)) {
784 tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
785 tbase->aux->tx_pkt_orig = NULL;
787 tbase->tx_pkt = tbase->aux->tx_pkt_orig;
788 tbase->aux->tx_pkt_orig = NULL;
794 /* Gather the distribution of the number of packets that have been
795 xmitted from one TX call. Since the value is only modified by the
796 task that xmits the packet, no atomic operation is needed. */
797 int tx_pkt_distr(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
799 if (likely(n_pkts < TX_BUCKET_SIZE))
800 tbase->aux->tx_bucket[n_pkts]++;
802 tbase->aux->tx_bucket[TX_BUCKET_SIZE - 1]++;
803 return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
806 int tx_pkt_bw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
808 uint32_t tx_bytes = 0;
809 uint32_t drop_bytes = 0;
811 for (uint16_t i = 0; i < n_pkts; ++i) {
812 if (!out || out[i] < OUT_HANDLED)
813 tx_bytes += mbuf_wire_size(mbufs[i]);
815 drop_bytes += mbuf_wire_size(mbufs[i]);
818 TASK_STATS_ADD_TX_BYTES(&tbase->aux->stats, tx_bytes);
819 TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
820 return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
823 int tx_pkt_drop_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
825 for (uint16_t j = 0; j < n_pkts; ++j) {
826 rte_pktmbuf_free(mbufs[j]);
829 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts);
831 for (uint16_t j = 0; j < n_pkts; ++j) {
832 if (out[j] == OUT_HANDLED)
833 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
835 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
840 static inline void dump_pkts(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
842 uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
843 uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
845 if (unlikely(n_dump)) {
846 n_dump = n_pkts < n_dump? n_pkts : n_dump;
847 for (uint32_t i = 0; i < n_dump; ++i) {
848 plogdx_info(mbufs[i], "TX: ");
850 tbase->aux->task_rt_dump.n_print_tx -= n_dump;
851 } else if (unlikely(n_trace)) {
852 n_trace = n_pkts < n_trace? n_pkts : n_trace;
853 for (uint32_t i = 0; i < n_trace; ++i) {
854 plogdx_info(mbufs[i], "TX: ");
856 tbase->aux->task_rt_dump.n_trace -= n_trace;
860 // ctrlplane packets are slow path, hence cost of checking if dump ortrace is needed in not too important
861 // easier to have this implementation than an implementation similar to dataplane tx
862 int tx_ctrlplane_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
864 dump_pkts(tbase, mbufs, n_pkts);
865 return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
868 int tx_ctrlplane_sw(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
870 dump_pkts(tbase, mbufs, n_pkts);
871 return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
874 static inline int tx_ring_all(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
876 if (tbase->aux->task_rt_dump.cur_trace) {
877 trace_one_rx_pkt(tbase, mbuf);
879 ctrl_ring_set_command_core_task_ip(mbuf, ((uint64_t)ip << 32) | (core_id << 16) | (task_id << 8) | command);
880 return rte_ring_enqueue(ring, mbuf);
883 int tx_ring_cti(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip, uint16_t vlan)
885 plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
886 ctrl_ring_set_vlan(mbuf, vlan);
887 int ret = tx_ring_all(tbase, ring, command, mbuf, core_id, task_id, ip);
888 if (unlikely(ret != 0)) {
889 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
890 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
891 rte_pktmbuf_free(mbuf);
896 void tx_ring_ip(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, uint32_t ip)
898 plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
899 int ret = tx_ring_all(tbase, ring, command, mbuf, 0, 0, ip);
900 if (unlikely(ret != 0)) {
901 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
902 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
903 rte_pktmbuf_free(mbuf);
907 void tx_ring(struct task_base *tbase, struct rte_ring *ring, uint16_t command, struct rte_mbuf *mbuf)
909 plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
910 int ret = tx_ring_all(tbase, ring, command, mbuf, 0, 0, 0);
911 if (unlikely(ret != 0)) {
912 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
913 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
914 rte_pktmbuf_free(mbuf);
918 void tx_ring_route(struct task_base *tbase, struct rte_ring *ring, int add, struct rte_mbuf *mbuf, uint32_t ip, uint32_t gateway_ip, uint32_t prefix)
922 command = ROUTE_ADD_FROM_MASTER;
924 command = ROUTE_DEL_FROM_MASTER;
926 plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
927 ctrl_ring_set_command(mbuf, command);
928 ctrl_ring_set_ip(mbuf, ip);
929 ctrl_ring_set_gateway_ip(mbuf, gateway_ip);
930 ctrl_ring_set_prefix(mbuf, prefix);
931 if (tbase->aux->task_rt_dump.cur_trace) {
932 trace_one_rx_pkt(tbase, mbuf);
934 int ret = rte_ring_enqueue(ring, mbuf);
935 if (unlikely(ret != 0)) {
936 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
937 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
938 rte_pktmbuf_free(mbuf);
942 void tx_ring_cti6(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, struct ipv6_addr *ip, uint16_t vlan)
945 plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
946 if (tbase->aux->task_rt_dump.cur_trace) {
947 trace_one_rx_pkt(tbase, mbuf);
949 ctrl_ring_set_command_core_task_ip(mbuf, (core_id << 16) | (task_id << 8) | command);
950 ctrl_ring_set_ipv6_addr(mbuf, ip);
951 ctrl_ring_set_vlan(mbuf, vlan);
952 ret = rte_ring_enqueue(ring, mbuf);
954 if (unlikely(ret != 0)) {
955 plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
956 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
957 rte_pktmbuf_free(mbuf);
961 void tx_ring_ip6(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, struct ipv6_addr *ip)
964 plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
965 if (tbase->aux->task_rt_dump.cur_trace) {
966 trace_one_rx_pkt(tbase, mbuf);
968 ctrl_ring_set_command(mbuf, command);
969 ctrl_ring_set_ipv6_addr(mbuf, ip);
970 ret = rte_ring_enqueue(ring, mbuf);
972 if (unlikely(ret != 0)) {
973 plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
974 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
975 rte_pktmbuf_free(mbuf);
979 void tx_ring_ip6_data(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, struct ipv6_addr *ip, uint64_t data)
982 plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
983 if (tbase->aux->task_rt_dump.cur_trace) {
984 trace_one_rx_pkt(tbase, mbuf);
986 ctrl_ring_set_command(mbuf, command);
987 ctrl_ring_set_ipv6_addr(mbuf, ip);
988 ctrl_ring_set_data(mbuf, data);
989 ret = rte_ring_enqueue(ring, mbuf);
991 if (unlikely(ret != 0)) {
992 plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
993 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
994 rte_pktmbuf_free(mbuf);