2 // Copyright (c) 2010-2020 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include <rte_ethdev.h>
18 #include <rte_version.h>
22 #include "task_base.h"
25 #include "prox_assert.h"
27 #include "mbuf_utils.h"
28 #include "handle_master.h"
31 static void buf_pkt_single(struct task_base *tbase, struct rte_mbuf *mbuf, const uint8_t out)
33 const uint16_t prod = tbase->ws_mbuf->idx[out].prod++;
34 tbase->ws_mbuf->mbuf[out][prod & WS_MBUF_MASK] = mbuf;
37 static inline void buf_pkt_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
39 for (uint16_t j = 0; j < n_pkts; ++j) {
40 if (unlikely(out[j] >= OUT_HANDLED)) {
41 rte_pktmbuf_free(mbufs[j]);
42 if (out[j] == OUT_HANDLED)
43 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
45 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
48 buf_pkt_single(tbase, mbufs[j], out[j]);
54 void store_packet(struct task_base *tbase, struct rte_mbuf *mbuf)
56 // If buffer is full, drop the first mbuf
58 tx_drop(tbase->aux->mbuf);
59 tbase->aux->mbuf = mbuf;
62 int tx_pkt_ndp(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
65 struct ipv6_addr ip_dst;
66 int first = 0, ret, ok = 0, rc;
67 const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
68 struct rte_mbuf *mbuf = NULL; // used when one need to send both an ARP and a mbuf
70 for (int j = 0; j < n_pkts; j++) {
71 if ((out) && (out[j] >= OUT_HANDLED))
73 if (unlikely((rc = write_ip6_dst_mac(tbase, mbufs[j], &ip_dst)) != SEND_MBUF)) {
75 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
81 // Original mbuf (packet) is stored to be sent later -> need to allocate new mbuf
82 ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&mbuf);
83 if (likely(ret == 0)) {
84 store_packet(tbase, mbufs[j]);
85 mbuf->port = tbase->l3.reachable_port_id;
86 tx_ring_cti6(tbase, tbase->l3.ctrl_plane_ring, IP6_REQ_MAC_TO_MASTER, mbuf, tbase->l3.core_id, tbase->l3.task_id, &ip_dst);
88 plog_err("Failed to get a mbuf from arp/nd mempool\n");
90 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
93 case SEND_MBUF_AND_ARP_ND:
94 // We send the mbuf and an ND - we need to allocate another mbuf for ND
95 ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&mbuf);
96 if (likely(ret == 0)) {
97 mbuf->port = tbase->l3.reachable_port_id;
98 tx_ring_cti6(tbase, tbase->l3.ctrl_plane_ring, IP6_REQ_MAC_TO_MASTER, mbuf, tbase->l3.core_id, tbase->l3.task_id, &ip_dst);
100 plog_err("Failed to get a mbuf from arp/nd mempool\n");
101 // We still send the initial mbuf
103 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + j, 1, out);
107 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
112 if (n_pkts - first) {
113 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
118 int tx_pkt_l3(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
121 int first = 0, ret, ok = 0, rc;
122 const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
123 struct rte_mbuf *arp_mbuf = NULL; // used when one need to send both an ARP and a mbuf
125 uint64_t tsc = rte_rdtsc();
127 for (int j = 0; j < n_pkts; j++) {
128 if ((out) && (out[j] >= OUT_HANDLED))
130 if (unlikely((rc = write_dst_mac(tbase, mbufs[j], &ip_dst, &time, tsc)) != SEND_MBUF)) {
132 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
138 // We re-use the mbuf - no need to create a arp_mbuf and delete the existing mbuf
139 mbufs[j]->port = tbase->l3.reachable_port_id;
140 if (tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, IP4_REQ_MAC_TO_MASTER, mbufs[j], tbase->l3.core_id, tbase->l3.task_id, ip_dst) == 0)
141 update_arp_ndp_retransmit_timeout(&tbase->l3, time, 1000);
143 update_arp_ndp_retransmit_timeout(&tbase->l3, time, 100);
145 case SEND_MBUF_AND_ARP_ND:
146 // We send the mbuf and an ARP - we need to allocate another mbuf for ARP
147 ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&arp_mbuf);
148 if (likely(ret == 0)) {
149 arp_mbuf->port = tbase->l3.reachable_port_id;
150 if (tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, IP4_REQ_MAC_TO_MASTER, arp_mbuf, tbase->l3.core_id, tbase->l3.task_id, ip_dst) == 0)
151 update_arp_ndp_retransmit_timeout(&tbase->l3, time, 1000);
153 update_arp_ndp_retransmit_timeout(&tbase->l3, time, 100);
155 plog_err("Failed to get a mbuf from arp mempool\n");
156 // We still send the initial mbuf
158 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + j, 1, out);
162 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
167 if (n_pkts - first) {
168 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
174 /* The following help functions also report stats. Therefore we need
175 to pass the task_base struct. */
176 static inline int txhw_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
181 /* TX vector mode can't transmit more than 32 packets */
182 if (n_pkts > MAX_PMD_TX) {
183 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, MAX_PMD_TX);
184 ntx += rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs + ntx, n_pkts - ntx);
186 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
188 TASK_STATS_ADD_TX(&tbase->aux->stats, ntx);
192 plog_dbg("Failed to send %d packets from %p\n", ret, mbufs[0]);
193 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts - ntx);
194 if (tbase->tx_pkt == tx_pkt_bw) {
195 uint32_t drop_bytes = 0;
197 drop_bytes += mbuf_wire_size(mbufs[ntx]);
198 rte_pktmbuf_free(mbufs[ntx++]);
199 } while (ntx < n_pkts);
200 TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
204 rte_pktmbuf_free(mbufs[ntx++]);
205 } while (ntx < n_pkts);
211 static inline int txhw_no_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
216 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
218 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
226 static inline int ring_enq_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
229 /* return 0 on succes, -ENOBUFS on failure */
230 // Rings can be single or multiproducer (ctrl rings are multi producer)
231 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
232 if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts))) {
234 if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0)) {
237 if (tbase->tx_pkt == tx_pkt_bw) {
238 uint32_t drop_bytes = 0;
239 for (uint16_t i = 0; i < n_pkts; ++i) {
240 drop_bytes += mbuf_wire_size(mbufs[i]);
241 rte_pktmbuf_free(mbufs[i]);
243 TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
244 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
247 for (uint16_t i = 0; i < n_pkts; ++i)
248 rte_pktmbuf_free(mbufs[i]);
249 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
253 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
258 static inline int ring_enq_no_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
261 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
262 while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts)) {
264 while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0) {
268 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
272 void flush_queues_hw(struct task_base *tbase)
276 for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
277 prod = tbase->ws_mbuf->idx[i].prod;
278 cons = tbase->ws_mbuf->idx[i].cons;
281 tbase->ws_mbuf->idx[i].prod = 0;
282 tbase->ws_mbuf->idx[i].cons = 0;
283 txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
287 tbase->flags &= ~FLAG_TX_FLUSH;
290 void flush_queues_sw(struct task_base *tbase)
294 for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
295 prod = tbase->ws_mbuf->idx[i].prod;
296 cons = tbase->ws_mbuf->idx[i].cons;
299 tbase->ws_mbuf->idx[i].prod = 0;
300 tbase->ws_mbuf->idx[i].cons = 0;
301 ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
304 tbase->flags &= ~FLAG_TX_FLUSH;
307 void flush_queues_no_drop_hw(struct task_base *tbase)
311 for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
312 prod = tbase->ws_mbuf->idx[i].prod;
313 cons = tbase->ws_mbuf->idx[i].cons;
316 tbase->ws_mbuf->idx[i].prod = 0;
317 tbase->ws_mbuf->idx[i].cons = 0;
318 txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
322 tbase->flags &= ~FLAG_TX_FLUSH;
325 void flush_queues_no_drop_sw(struct task_base *tbase)
329 for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
330 prod = tbase->ws_mbuf->idx[i].prod;
331 cons = tbase->ws_mbuf->idx[i].cons;
334 tbase->ws_mbuf->idx[i].prod = 0;
335 tbase->ws_mbuf->idx[i].cons = 0;
336 ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
339 tbase->flags &= ~FLAG_TX_FLUSH;
342 /* "try" functions try to send packets to sw/hw w/o failing or blocking;
343 They return if ring/queue is full and are used by aggregators.
344 "try" functions do not have drop/no drop flavors
345 They are only implemented in never_discard mode (as by default they
346 use only one outgoing ring. */
347 uint16_t tx_try_self(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
350 tx_pkt_never_discard_self(tbase, mbufs, n_pkts, NULL);
353 tx_pkt_never_discard_self(tbase, mbufs, 64, NULL);
358 uint16_t tx_try_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
360 const int bulk_size = 64;
361 uint16_t ret = bulk_size, sent = 0, n_bulks;
362 n_bulks = n_pkts >> __builtin_ctz(bulk_size);
364 for (int i = 0; i < n_bulks; i++) {
365 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
366 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size);
368 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size, NULL);
372 if (ret != bulk_size)
375 if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
376 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
377 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)));
379 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)), NULL);
384 TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
388 uint16_t tx_try_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
390 const int bulk_size = 64;
391 uint16_t ret = bulk_size, n_bulks, sent = 0;
392 n_bulks = n_pkts >> __builtin_ctz(bulk_size);
394 const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
395 for (int i = 0; i < n_bulks; i++) {
396 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, bulk_size);
399 if (ret != bulk_size)
402 if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
403 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, (n_pkts & (bulk_size - 1)));
407 TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
411 int tx_pkt_no_drop_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
413 return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
416 int tx_pkt_no_drop_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
418 static uint8_t fake_out[MAX_PKT_BURST] = {0};
420 if (n_pkts == MAX_PKT_BURST) {
421 // First xmit what was queued
424 prod = tbase->ws_mbuf->idx[0].prod;
425 cons = tbase->ws_mbuf->idx[0].cons;
427 if ((uint16_t)(prod - cons)){
428 tbase->flags &= ~FLAG_TX_FLUSH;
429 tbase->ws_mbuf->idx[0].prod = 0;
430 tbase->ws_mbuf->idx[0].cons = 0;
431 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
433 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
435 ret+= tx_pkt_no_drop_hw(tbase, mbufs, n_pkts, fake_out);
440 int tx_pkt_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
442 return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
445 int tx_pkt_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
447 static uint8_t fake_out[MAX_PKT_BURST] = {0};
449 if (n_pkts == MAX_PKT_BURST) {
450 // First xmit what was queued
453 prod = tbase->ws_mbuf->idx[0].prod;
454 cons = tbase->ws_mbuf->idx[0].cons;
456 if ((uint16_t)(prod - cons)){
457 tbase->flags &= ~FLAG_TX_FLUSH;
458 tbase->ws_mbuf->idx[0].prod = 0;
459 tbase->ws_mbuf->idx[0].cons = 0;
460 ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
462 ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
464 ret+= tx_pkt_hw(tbase, mbufs, n_pkts, fake_out);
469 /* Transmit to hw using tx_params_hw_sw structure
470 This function is used to transmit to hw when tx_params_hw_sw should be used
471 i.e. when the task needs to transmit both to hw and sw */
472 int tx_pkt_no_drop_never_discard_hw1_no_pointer(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
474 txhw_no_drop(&tbase->tx_params_hw_sw.tx_port_queue, mbufs, n_pkts, tbase);
478 int tx_pkt_no_drop_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
480 return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
483 int tx_pkt_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
485 return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
488 static uint16_t tx_pkt_free_dropped(__attribute__((unused)) struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
492 /* The most probable and most important optimize case is if
493 the no packets should be dropped. */
494 for (i = 0; i + 8 < n_pkts; i += 8) {
495 v |= *((uint64_t*)(&out[i]));
497 for (; i < n_pkts; ++i) {
502 /* At least some packets need to be dropped, so the
503 mbufs array needs to be updated. */
505 uint16_t n_discard = 0;
506 for (uint16_t i = 0; i < n_pkts; ++i) {
507 if (unlikely(out[i] >= OUT_HANDLED)) {
508 rte_pktmbuf_free(mbufs[i]);
509 n_discard += out[i] == OUT_DISCARD;
512 mbufs[n_kept++] = mbufs[i];
514 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, n_discard);
515 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts - n_kept - n_discard);
521 int tx_pkt_no_drop_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
523 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
527 ret = txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
531 int tx_pkt_no_drop_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
533 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
537 ret = ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
541 int tx_pkt_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
543 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
546 return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
550 int tx_pkt_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
552 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
555 return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
559 int tx_pkt_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
561 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
563 TASK_STATS_ADD_TX(&tbase->aux->stats, n_kept);
564 tbase->ws_mbuf->idx[0].nb_rx = n_kept;
565 struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
566 for (uint16_t i = 0; i < n_kept; ++i) {
567 tx_mbuf[i] = mbufs[i];
572 int tx_pkt_never_discard_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
574 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
575 tbase->ws_mbuf->idx[0].nb_rx = n_pkts;
576 struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
577 for (uint16_t i = 0; i < n_pkts; ++i) {
578 tx_mbuf[i] = mbufs[i];
583 int tx_pkt_no_drop_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
586 buf_pkt_all(tbase, mbufs, n_pkts, out);
588 const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
591 for (uint8_t i = 0; i < nb_bufs; ++i) {
592 prod = tbase->ws_mbuf->idx[i].prod;
593 cons = tbase->ws_mbuf->idx[i].cons;
595 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
596 tbase->flags &= ~FLAG_TX_FLUSH;
597 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
598 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
604 int tx_pkt_no_drop_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
607 buf_pkt_all(tbase, mbufs, n_pkts, out);
609 const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
612 for (uint8_t i = 0; i < nb_bufs; ++i) {
613 prod = tbase->ws_mbuf->idx[i].prod;
614 cons = tbase->ws_mbuf->idx[i].cons;
616 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
617 tbase->flags &= ~FLAG_TX_FLUSH;
618 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
619 ret += ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
625 int tx_pkt_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
628 buf_pkt_all(tbase, mbufs, n_pkts, out);
630 const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
633 for (uint8_t i = 0; i < nb_bufs; ++i) {
634 prod = tbase->ws_mbuf->idx[i].prod;
635 cons = tbase->ws_mbuf->idx[i].cons;
637 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
638 tbase->flags &= ~FLAG_TX_FLUSH;
639 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
640 ret += txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
646 int tx_pkt_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
649 buf_pkt_all(tbase, mbufs, n_pkts, out);
651 const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
653 for (uint8_t i = 0; i < nb_bufs; ++i) {
654 prod = tbase->ws_mbuf->idx[i].prod;
655 cons = tbase->ws_mbuf->idx[i].cons;
657 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
658 tbase->flags &= ~FLAG_TX_FLUSH;
659 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
660 ret+= ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
666 static inline void trace_one_rx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf)
669 /* For each packet being transmitted, find which
670 buffer represent the packet as it was before
673 uint32_t len = sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr)/sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr[0]);
674 for (;j < len; ++j) {
675 if (tbase->aux->task_rt_dump.pkt_mbuf_addr[j] == mbuf)
679 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
682 rte_pktmbuf_data_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
683 rte_pktmbuf_pkt_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
684 tmp.buf_addr = tbase->aux->task_rt_dump.pkt_cpy[j];
685 plogdx_info(&tmp, "Trace RX: ");
689 static inline void trace_one_tx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf, uint8_t *out, uint32_t i)
694 plogdx_info(mbuf, "Handled: ");
697 plogdx_info(mbuf, "Dropped: ");
700 plogdx_info(mbuf, "TX[%d]: ", out[i]);
703 } else if (tbase->aux->tx_pkt_orig == tx_pkt_drop_all) {
704 plogdx_info(mbuf, "Dropped: ");
706 plogdx_info(mbuf, "TX[0]: ");
709 static void unset_trace(struct task_base *tbase)
711 if (0 == tbase->aux->task_rt_dump.n_trace) {
712 if ((tbase->tx_pkt == tx_pkt_l3) || (tbase->tx_pkt == tx_pkt_ndp)){
713 tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
714 tbase->aux->tx_pkt_orig = NULL;
716 tbase->tx_pkt = tbase->aux->tx_pkt_orig;
717 tbase->aux->tx_pkt_orig = NULL;
719 tbase->aux->task_rt_dump.cur_trace = 0;
720 task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
724 int tx_pkt_trace(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
727 if (tbase->aux->task_rt_dump.cur_trace == 0) {
728 // No packet received since dumping...
729 tbase->aux->task_rt_dump.n_print_tx = tbase->aux->task_rt_dump.n_trace;
730 if (tbase->aux->task_rt_dump.n_trace < n_pkts) {
731 tbase->aux->task_rt_dump.n_trace = 0;
732 tbase->aux->task_rt_dump.cur_trace = 0;
733 task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
735 tbase->aux->task_rt_dump.n_trace -= n_pkts;
737 ret = tx_pkt_dump(tbase, mbufs, n_pkts, out);
738 tbase->aux->task_rt_dump.n_print_tx = 0;
741 plog_info("Tracing %d pkts\n", tbase->aux->task_rt_dump.cur_trace);
742 uint32_t cur_trace = (n_pkts < tbase->aux->task_rt_dump.cur_trace) ? n_pkts: tbase->aux->task_rt_dump.cur_trace;
743 for (uint32_t i = 0; i < cur_trace; ++i) {
744 trace_one_rx_pkt(tbase, mbufs[i]);
745 trace_one_tx_pkt(tbase, mbufs[i], out, i);
748 ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
754 int tx_pkt_dump(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
756 uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
759 n_dump = n_pkts < n_dump? n_pkts : n_dump;
760 for (uint32_t i = 0; i < n_dump; ++i) {
764 plogdx_info(mbufs[i], "Handled: ");
767 plogdx_info(mbufs[i], "Dropped: ");
770 plogdx_info(mbufs[i], "TX[%d]: ", out[i]);
774 plogdx_info(mbufs[i], "TX: ");
776 tbase->aux->task_rt_dump.n_print_tx -= n_dump;
778 ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
780 if (0 == tbase->aux->task_rt_dump.n_print_tx) {
781 if ((tbase->tx_pkt == tx_pkt_l3) || (tbase->tx_pkt == tx_pkt_ndp)) {
782 tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
783 tbase->aux->tx_pkt_orig = NULL;
785 tbase->tx_pkt = tbase->aux->tx_pkt_orig;
786 tbase->aux->tx_pkt_orig = NULL;
792 /* Gather the distribution of the number of packets that have been
793 xmitted from one TX call. Since the value is only modified by the
794 task that xmits the packet, no atomic operation is needed. */
795 int tx_pkt_distr(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
797 if (likely(n_pkts < TX_BUCKET_SIZE))
798 tbase->aux->tx_bucket[n_pkts]++;
800 tbase->aux->tx_bucket[TX_BUCKET_SIZE - 1]++;
801 return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
804 int tx_pkt_bw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
806 uint32_t tx_bytes = 0;
807 uint32_t drop_bytes = 0;
809 for (uint16_t i = 0; i < n_pkts; ++i) {
810 if (!out || out[i] < OUT_HANDLED)
811 tx_bytes += mbuf_wire_size(mbufs[i]);
813 drop_bytes += mbuf_wire_size(mbufs[i]);
816 TASK_STATS_ADD_TX_BYTES(&tbase->aux->stats, tx_bytes);
817 TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
818 return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
821 int tx_pkt_drop_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
823 for (uint16_t j = 0; j < n_pkts; ++j) {
824 rte_pktmbuf_free(mbufs[j]);
827 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts);
829 for (uint16_t j = 0; j < n_pkts; ++j) {
830 if (out[j] == OUT_HANDLED)
831 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
833 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
838 static inline void dump_pkts(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
840 uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
841 uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
843 if (unlikely(n_dump)) {
844 n_dump = n_pkts < n_dump? n_pkts : n_dump;
845 for (uint32_t i = 0; i < n_dump; ++i) {
846 plogdx_info(mbufs[i], "TX: ");
848 tbase->aux->task_rt_dump.n_print_tx -= n_dump;
849 } else if (unlikely(n_trace)) {
850 n_trace = n_pkts < n_trace? n_pkts : n_trace;
851 for (uint32_t i = 0; i < n_trace; ++i) {
852 plogdx_info(mbufs[i], "TX: ");
854 tbase->aux->task_rt_dump.n_trace -= n_trace;
858 // ctrlplane packets are slow path, hence cost of checking if dump ortrace is needed in not too important
859 // easier to have this implementation than an implementation similar to dataplane tx
860 int tx_ctrlplane_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
862 dump_pkts(tbase, mbufs, n_pkts);
863 return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
866 int tx_ctrlplane_sw(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
868 dump_pkts(tbase, mbufs, n_pkts);
869 return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
872 static inline int tx_ring_all(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
874 if (tbase->aux->task_rt_dump.cur_trace) {
875 trace_one_rx_pkt(tbase, mbuf);
877 ctrl_ring_set_command_core_task_ip(mbuf, ((uint64_t)ip << 32) | (core_id << 16) | (task_id << 8) | command);
878 return rte_ring_enqueue(ring, mbuf);
881 int tx_ring_cti(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
883 plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
884 int ret = tx_ring_all(tbase, ring, command, mbuf, core_id, task_id, ip);
885 if (unlikely(ret != 0)) {
886 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
887 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
888 rte_pktmbuf_free(mbuf);
893 void tx_ring_ip(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, uint32_t ip)
895 plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
896 int ret = tx_ring_all(tbase, ring, command, mbuf, 0, 0, ip);
897 if (unlikely(ret != 0)) {
898 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
899 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
900 rte_pktmbuf_free(mbuf);
904 void tx_ring(struct task_base *tbase, struct rte_ring *ring, uint16_t command, struct rte_mbuf *mbuf)
906 plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
907 int ret = tx_ring_all(tbase, ring, command, mbuf, 0, 0, 0);
908 if (unlikely(ret != 0)) {
909 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
910 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
911 rte_pktmbuf_free(mbuf);
915 void tx_ring_route(struct task_base *tbase, struct rte_ring *ring, int add, struct rte_mbuf *mbuf, uint32_t ip, uint32_t gateway_ip, uint32_t prefix)
919 command = ROUTE_ADD_FROM_MASTER;
921 command = ROUTE_DEL_FROM_MASTER;
923 plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
924 ctrl_ring_set_command(mbuf, command);
925 ctrl_ring_set_ip(mbuf, ip);
926 ctrl_ring_set_gateway_ip(mbuf, gateway_ip);
927 ctrl_ring_set_prefix(mbuf, prefix);
928 if (tbase->aux->task_rt_dump.cur_trace) {
929 trace_one_rx_pkt(tbase, mbuf);
931 int ret = rte_ring_enqueue(ring, mbuf);
932 if (unlikely(ret != 0)) {
933 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
934 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
935 rte_pktmbuf_free(mbuf);
939 void tx_ring_cti6(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, struct ipv6_addr *ip)
942 plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
943 if (tbase->aux->task_rt_dump.cur_trace) {
944 trace_one_rx_pkt(tbase, mbuf);
946 ctrl_ring_set_command_core_task_ip(mbuf, (core_id << 16) | (task_id << 8) | command);
947 ctrl_ring_set_ipv6_addr(mbuf, ip);
948 ret = rte_ring_enqueue(ring, mbuf);
950 if (unlikely(ret != 0)) {
951 plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
952 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
953 rte_pktmbuf_free(mbuf);
957 void tx_ring_ip6(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, struct ipv6_addr *ip)
960 plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
961 if (tbase->aux->task_rt_dump.cur_trace) {
962 trace_one_rx_pkt(tbase, mbuf);
964 ctrl_ring_set_command(mbuf, command);
965 ctrl_ring_set_ipv6_addr(mbuf, ip);
966 ret = rte_ring_enqueue(ring, mbuf);
968 if (unlikely(ret != 0)) {
969 plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
970 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
971 rte_pktmbuf_free(mbuf);
975 void tx_ring_ip6_data(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, struct ipv6_addr *ip, uint64_t data)
978 plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
979 if (tbase->aux->task_rt_dump.cur_trace) {
980 trace_one_rx_pkt(tbase, mbuf);
982 ctrl_ring_set_command(mbuf, command);
983 ctrl_ring_set_ipv6_addr(mbuf, ip);
984 ctrl_ring_set_data(mbuf, data);
985 ret = rte_ring_enqueue(ring, mbuf);
987 if (unlikely(ret != 0)) {
988 plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
989 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
990 rte_pktmbuf_free(mbuf);