2 // Copyright (c) 2010-2020 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include <rte_ethdev.h>
18 #include <rte_version.h>
22 #include "task_base.h"
25 #include "prox_assert.h"
27 #include "mbuf_utils.h"
28 #include "handle_master.h"
31 static void buf_pkt_single(struct task_base *tbase, struct rte_mbuf *mbuf, const uint8_t out)
33 const uint16_t prod = tbase->ws_mbuf->idx[out].prod++;
34 tbase->ws_mbuf->mbuf[out][prod & WS_MBUF_MASK] = mbuf;
37 static inline void buf_pkt_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
39 for (uint16_t j = 0; j < n_pkts; ++j) {
40 if (unlikely(out[j] >= OUT_HANDLED)) {
41 rte_pktmbuf_free(mbufs[j]);
42 if (out[j] == OUT_HANDLED)
43 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
45 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
48 buf_pkt_single(tbase, mbufs[j], out[j]);
54 void store_packet(struct task_base *tbase, struct rte_mbuf *mbuf)
56 // If buffer is full, drop the first mbuf
58 tx_drop(tbase->aux->mbuf);
59 tbase->aux->mbuf = mbuf;
62 int tx_pkt_ndp(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
64 struct ipv6_addr ip_dst;
65 int first = 0, ret, ok = 0, rc;
66 const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
67 struct rte_mbuf *mbuf = NULL; // used when one need to send both an ARP and a mbuf
70 for (int j = 0; j < n_pkts; j++) {
71 if ((out) && (out[j] >= OUT_HANDLED))
73 if (unlikely((rc = write_ip6_dst_mac(tbase, mbufs[j], &ip_dst, &vlan)) != SEND_MBUF)) {
75 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
81 // Original mbuf (packet) is stored to be sent later -> need to allocate new mbuf
82 ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&mbuf);
83 if (likely(ret == 0)) {
84 store_packet(tbase, mbufs[j]);
85 mbuf->port = tbase->l3.reachable_port_id;
86 tx_ring_cti6(tbase, tbase->l3.ctrl_plane_ring, IP6_REQ_MAC_TO_MASTER, mbuf, tbase->l3.core_id, tbase->l3.task_id, &ip_dst, vlan);
88 plog_err("Failed to get a mbuf from arp/nd mempool\n");
90 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
93 case SEND_MBUF_AND_ARP_ND:
94 // We send the mbuf and an ND - we need to allocate another mbuf for ND
95 ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&mbuf);
96 if (likely(ret == 0)) {
97 mbuf->port = tbase->l3.reachable_port_id;
98 tx_ring_cti6(tbase, tbase->l3.ctrl_plane_ring, IP6_REQ_MAC_TO_MASTER, mbuf, tbase->l3.core_id, tbase->l3.task_id, &ip_dst, vlan);
100 plog_err("Failed to get a mbuf from arp/nd mempool\n");
101 // We still send the initial mbuf
103 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + j, 1, out);
107 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
112 if (n_pkts - first) {
113 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
118 int tx_pkt_l3(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
122 int first = 0, ret, ok = 0, rc;
123 const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
124 struct rte_mbuf *arp_mbuf = NULL; // used when one need to send both an ARP and a mbuf
126 uint64_t tsc = rte_rdtsc();
128 for (int j = 0; j < n_pkts; j++) {
129 if ((out) && (out[j] >= OUT_HANDLED))
131 if (unlikely((rc = write_dst_mac(tbase, mbufs[j], &ip_dst, &vlan, &time, tsc)) != SEND_MBUF)) {
133 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, j - first, out);
139 // We re-use the mbuf - no need to create a arp_mbuf and delete the existing mbuf
140 mbufs[j]->port = tbase->l3.reachable_port_id;
141 if (tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, IP4_REQ_MAC_TO_MASTER, mbufs[j], tbase->l3.core_id, tbase->l3.task_id, ip_dst, vlan) == 0)
142 update_arp_ndp_retransmit_timeout(&tbase->l3, time, 1000);
144 update_arp_ndp_retransmit_timeout(&tbase->l3, time, 100);
146 case SEND_MBUF_AND_ARP_ND:
147 // We send the mbuf and an ARP - we need to allocate another mbuf for ARP
148 ret = rte_mempool_get(tbase->l3.arp_nd_pool, (void **)&arp_mbuf);
149 if (likely(ret == 0)) {
150 arp_mbuf->port = tbase->l3.reachable_port_id;
151 if (tx_ring_cti(tbase, tbase->l3.ctrl_plane_ring, IP4_REQ_MAC_TO_MASTER, arp_mbuf, tbase->l3.core_id, tbase->l3.task_id, ip_dst, vlan) == 0)
152 update_arp_ndp_retransmit_timeout(&tbase->l3, time, 1000);
154 update_arp_ndp_retransmit_timeout(&tbase->l3, time, 100);
156 plog_err("Failed to get a mbuf from arp mempool\n");
157 // We still send the initial mbuf
159 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + j, 1, out);
163 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
168 if (n_pkts - first) {
169 ret = tbase->aux->tx_pkt_l2(tbase, mbufs + first, n_pkts - first, out);
175 /* The following help functions also report stats. Therefore we need
176 to pass the task_base struct. */
177 static inline int txhw_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
182 /* TX vector mode can't transmit more than 32 packets */
183 if (n_pkts > MAX_PMD_TX) {
184 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, MAX_PMD_TX);
185 ntx += rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs + ntx, n_pkts - ntx);
187 ntx = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
189 TASK_STATS_ADD_TX(&tbase->aux->stats, ntx);
193 plog_dbg("Failed to send %d packets from %p\n", ret, mbufs[0]);
194 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts - ntx);
195 if (tbase->tx_pkt == tx_pkt_bw) {
196 uint32_t drop_bytes = 0;
198 drop_bytes += mbuf_wire_size(mbufs[ntx]);
199 rte_pktmbuf_free(mbufs[ntx++]);
200 } while (ntx < n_pkts);
201 TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
205 rte_pktmbuf_free(mbufs[ntx++]);
206 } while (ntx < n_pkts);
212 static inline int txhw_no_drop(const struct port_queue *port_queue, struct rte_mbuf **mbufs, uint16_t n_pkts, struct task_base *tbase)
217 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
219 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, n_pkts);
227 static inline int ring_enq_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
230 /* return 0 on succes, -ENOBUFS on failure */
231 // Rings can be single or multiproducer (ctrl rings are multi producer)
232 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
233 if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts))) {
235 if (unlikely(rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0)) {
238 if (tbase->tx_pkt == tx_pkt_bw) {
239 uint32_t drop_bytes = 0;
240 for (uint16_t i = 0; i < n_pkts; ++i) {
241 drop_bytes += mbuf_wire_size(mbufs[i]);
242 rte_pktmbuf_free(mbufs[i]);
244 TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
245 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
248 for (uint16_t i = 0; i < n_pkts; ++i)
249 rte_pktmbuf_free(mbufs[i]);
250 TASK_STATS_ADD_DROP_TX_FAIL(&tbase->aux->stats, n_pkts);
254 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
259 static inline int ring_enq_no_drop(struct rte_ring *ring, struct rte_mbuf *const *mbufs, uint16_t n_pkts, __attribute__((unused)) struct task_base *tbase)
262 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
263 while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts)) {
265 while (rte_ring_enqueue_bulk(ring, (void *const *)mbufs, n_pkts, NULL) == 0) {
269 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
273 void flush_queues_hw(struct task_base *tbase)
277 for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
278 prod = tbase->ws_mbuf->idx[i].prod;
279 cons = tbase->ws_mbuf->idx[i].cons;
282 tbase->ws_mbuf->idx[i].prod = 0;
283 tbase->ws_mbuf->idx[i].cons = 0;
284 txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
288 tbase->flags &= ~FLAG_TX_FLUSH;
291 void flush_queues_sw(struct task_base *tbase)
295 for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
296 prod = tbase->ws_mbuf->idx[i].prod;
297 cons = tbase->ws_mbuf->idx[i].cons;
300 tbase->ws_mbuf->idx[i].prod = 0;
301 tbase->ws_mbuf->idx[i].cons = 0;
302 ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
305 tbase->flags &= ~FLAG_TX_FLUSH;
308 void flush_queues_no_drop_hw(struct task_base *tbase)
312 for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
313 prod = tbase->ws_mbuf->idx[i].prod;
314 cons = tbase->ws_mbuf->idx[i].cons;
317 tbase->ws_mbuf->idx[i].prod = 0;
318 tbase->ws_mbuf->idx[i].cons = 0;
319 txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
323 tbase->flags &= ~FLAG_TX_FLUSH;
326 void flush_queues_no_drop_sw(struct task_base *tbase)
330 for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
331 prod = tbase->ws_mbuf->idx[i].prod;
332 cons = tbase->ws_mbuf->idx[i].cons;
335 tbase->ws_mbuf->idx[i].prod = 0;
336 tbase->ws_mbuf->idx[i].cons = 0;
337 ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), prod - cons, tbase);
340 tbase->flags &= ~FLAG_TX_FLUSH;
343 /* "try" functions try to send packets to sw/hw w/o failing or blocking;
344 They return if ring/queue is full and are used by aggregators.
345 "try" functions do not have drop/no drop flavors
346 They are only implemented in never_discard mode (as by default they
347 use only one outgoing ring. */
348 uint16_t tx_try_self(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
351 tx_pkt_never_discard_self(tbase, mbufs, n_pkts, NULL);
354 tx_pkt_never_discard_self(tbase, mbufs, 64, NULL);
359 uint16_t tx_try_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
361 const int bulk_size = 64;
362 uint16_t ret = bulk_size, sent = 0, n_bulks;
363 n_bulks = n_pkts >> __builtin_ctz(bulk_size);
365 for (int i = 0; i < n_bulks; i++) {
366 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
367 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size);
369 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, bulk_size, NULL);
373 if (ret != bulk_size)
376 if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
377 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
378 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)));
380 ret = rte_ring_enqueue_burst(tbase->tx_params_sw.tx_rings[0], (void *const *)mbufs, (n_pkts & (bulk_size - 1)), NULL);
385 TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
389 uint16_t tx_try_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
391 const int bulk_size = 64;
392 uint16_t ret = bulk_size, n_bulks, sent = 0;
393 n_bulks = n_pkts >> __builtin_ctz(bulk_size);
395 const struct port_queue *port_queue = &tbase->tx_params_hw.tx_port_queue[0];
396 for (int i = 0; i < n_bulks; i++) {
397 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, bulk_size);
400 if (ret != bulk_size)
403 if ((ret == bulk_size) && (n_pkts & (bulk_size - 1))) {
404 ret = rte_eth_tx_burst(port_queue->port, port_queue->queue, mbufs, (n_pkts & (bulk_size - 1)));
408 TASK_STATS_ADD_TX(&tbase->aux->stats, sent);
412 int tx_pkt_no_drop_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
414 return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
417 int tx_pkt_no_drop_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
419 static uint8_t fake_out[MAX_PKT_BURST] = {0};
421 if (n_pkts == MAX_PKT_BURST) {
422 // First xmit what was queued
425 prod = tbase->ws_mbuf->idx[0].prod;
426 cons = tbase->ws_mbuf->idx[0].cons;
428 if ((uint16_t)(prod - cons)){
429 tbase->flags &= ~FLAG_TX_FLUSH;
430 tbase->ws_mbuf->idx[0].prod = 0;
431 tbase->ws_mbuf->idx[0].cons = 0;
432 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
434 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
436 ret+= tx_pkt_no_drop_hw(tbase, mbufs, n_pkts, fake_out);
441 int tx_pkt_never_discard_hw1_lat_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
443 return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
446 int tx_pkt_never_discard_hw1_thrpt_opt(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
448 static uint8_t fake_out[MAX_PKT_BURST] = {0};
450 if (n_pkts == MAX_PKT_BURST) {
451 // First xmit what was queued
454 prod = tbase->ws_mbuf->idx[0].prod;
455 cons = tbase->ws_mbuf->idx[0].cons;
457 if ((uint16_t)(prod - cons)){
458 tbase->flags &= ~FLAG_TX_FLUSH;
459 tbase->ws_mbuf->idx[0].prod = 0;
460 tbase->ws_mbuf->idx[0].cons = 0;
461 ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], tbase->ws_mbuf->mbuf[0] + (cons & WS_MBUF_MASK), (uint16_t)(prod - cons), tbase);
463 ret+= txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
465 ret+= tx_pkt_hw(tbase, mbufs, n_pkts, fake_out);
470 /* Transmit to hw using tx_params_hw_sw structure
471 This function is used to transmit to hw when tx_params_hw_sw should be used
472 i.e. when the task needs to transmit both to hw and sw */
473 int tx_pkt_no_drop_never_discard_hw1_no_pointer(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
475 txhw_no_drop(&tbase->tx_params_hw_sw.tx_port_queue, mbufs, n_pkts, tbase);
479 int tx_pkt_no_drop_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
481 return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
484 int tx_pkt_never_discard_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
486 return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
489 static uint16_t tx_pkt_free_dropped(__attribute__((unused)) struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
493 /* The most probable and most important optimize case is if
494 the no packets should be dropped. */
495 for (i = 0; i + 8 < n_pkts; i += 8) {
496 v |= *((uint64_t*)(&out[i]));
498 for (; i < n_pkts; ++i) {
503 /* At least some packets need to be dropped, so the
504 mbufs array needs to be updated. */
506 uint16_t n_discard = 0;
507 for (uint16_t i = 0; i < n_pkts; ++i) {
508 if (unlikely(out[i] >= OUT_HANDLED)) {
509 rte_pktmbuf_free(mbufs[i]);
510 n_discard += out[i] == OUT_DISCARD;
513 mbufs[n_kept++] = mbufs[i];
515 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, n_discard);
516 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts - n_kept - n_discard);
522 int tx_pkt_no_drop_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
524 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
528 ret = txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
532 int tx_pkt_no_drop_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
534 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
538 ret = ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
542 int tx_pkt_hw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
544 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
547 return txhw_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_kept, tbase);
551 int tx_pkt_sw1(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
553 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
556 return ring_enq_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_kept, tbase);
560 int tx_pkt_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, uint8_t *out)
562 const uint16_t n_kept = tx_pkt_free_dropped(tbase, mbufs, n_pkts, out);
564 TASK_STATS_ADD_TX(&tbase->aux->stats, n_kept);
565 tbase->ws_mbuf->idx[0].nb_rx = n_kept;
566 struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
567 for (uint16_t i = 0; i < n_kept; ++i) {
568 tx_mbuf[i] = mbufs[i];
573 int tx_pkt_never_discard_self(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
575 TASK_STATS_ADD_TX(&tbase->aux->stats, n_pkts);
576 tbase->ws_mbuf->idx[0].nb_rx = n_pkts;
577 struct rte_mbuf **tx_mbuf = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
578 for (uint16_t i = 0; i < n_pkts; ++i) {
579 tx_mbuf[i] = mbufs[i];
584 int tx_pkt_no_drop_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
587 buf_pkt_all(tbase, mbufs, n_pkts, out);
589 const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
592 for (uint8_t i = 0; i < nb_bufs; ++i) {
593 prod = tbase->ws_mbuf->idx[i].prod;
594 cons = tbase->ws_mbuf->idx[i].cons;
596 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
597 tbase->flags &= ~FLAG_TX_FLUSH;
598 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
599 ret+= txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
605 int tx_pkt_no_drop_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
608 buf_pkt_all(tbase, mbufs, n_pkts, out);
610 const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
613 for (uint8_t i = 0; i < nb_bufs; ++i) {
614 prod = tbase->ws_mbuf->idx[i].prod;
615 cons = tbase->ws_mbuf->idx[i].cons;
617 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
618 tbase->flags &= ~FLAG_TX_FLUSH;
619 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
620 ret += ring_enq_no_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
626 int tx_pkt_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
629 buf_pkt_all(tbase, mbufs, n_pkts, out);
631 const uint8_t nb_bufs = tbase->tx_params_hw.nb_txports;
634 for (uint8_t i = 0; i < nb_bufs; ++i) {
635 prod = tbase->ws_mbuf->idx[i].prod;
636 cons = tbase->ws_mbuf->idx[i].cons;
638 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
639 tbase->flags &= ~FLAG_TX_FLUSH;
640 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
641 ret += txhw_drop(&tbase->tx_params_hw.tx_port_queue[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
647 int tx_pkt_sw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
650 buf_pkt_all(tbase, mbufs, n_pkts, out);
652 const uint8_t nb_bufs = tbase->tx_params_sw.nb_txrings;
654 for (uint8_t i = 0; i < nb_bufs; ++i) {
655 prod = tbase->ws_mbuf->idx[i].prod;
656 cons = tbase->ws_mbuf->idx[i].cons;
658 if (((uint16_t)(prod - cons)) >= MAX_PKT_BURST) {
659 tbase->flags &= ~FLAG_TX_FLUSH;
660 tbase->ws_mbuf->idx[i].cons = cons + MAX_PKT_BURST;
661 ret+= ring_enq_drop(tbase->tx_params_sw.tx_rings[i], tbase->ws_mbuf->mbuf[i] + (cons & WS_MBUF_MASK), MAX_PKT_BURST, tbase);
667 static inline void trace_one_rx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf)
670 /* For each packet being transmitted, find which
671 buffer represent the packet as it was before
674 uint32_t len = sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr)/sizeof(tbase->aux->task_rt_dump.pkt_mbuf_addr[0]);
675 for (;j < len; ++j) {
676 if (tbase->aux->task_rt_dump.pkt_mbuf_addr[j] == mbuf)
680 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
683 rte_pktmbuf_data_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
684 rte_pktmbuf_pkt_len(&tmp) = tbase->aux->task_rt_dump.pkt_cpy_len[j];
685 tmp.buf_addr = tbase->aux->task_rt_dump.pkt_cpy[j];
686 plogdx_info(&tmp, "Trace RX: ");
690 static inline void trace_one_tx_pkt(struct task_base *tbase, struct rte_mbuf *mbuf, uint8_t *out, uint32_t i)
695 plogdx_info(mbuf, "Handled: ");
698 plogdx_info(mbuf, "Dropped: ");
701 plogdx_info(mbuf, "TX[%d]: ", out[i]);
704 } else if (tbase->aux->tx_pkt_orig == tx_pkt_drop_all) {
705 plogdx_info(mbuf, "Dropped: ");
707 plogdx_info(mbuf, "TX[0]: ");
710 static void unset_trace(struct task_base *tbase)
712 if (0 == tbase->aux->task_rt_dump.n_trace) {
713 if ((tbase->tx_pkt == tx_pkt_l3) || (tbase->tx_pkt == tx_pkt_ndp)){
714 tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
715 tbase->aux->tx_pkt_orig = NULL;
717 tbase->tx_pkt = tbase->aux->tx_pkt_orig;
718 tbase->aux->tx_pkt_orig = NULL;
720 tbase->aux->task_rt_dump.cur_trace = 0;
721 task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
725 int tx_pkt_trace(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
728 if (tbase->aux->task_rt_dump.cur_trace == 0) {
729 // No packet received since dumping...
730 tbase->aux->task_rt_dump.n_print_tx = tbase->aux->task_rt_dump.n_trace;
731 if (tbase->aux->task_rt_dump.n_trace < n_pkts) {
732 tbase->aux->task_rt_dump.n_trace = 0;
733 tbase->aux->task_rt_dump.cur_trace = 0;
734 task_base_del_rx_pkt_function(tbase, rx_pkt_trace);
736 tbase->aux->task_rt_dump.n_trace -= n_pkts;
738 ret = tx_pkt_dump(tbase, mbufs, n_pkts, out);
739 tbase->aux->task_rt_dump.n_print_tx = 0;
742 plog_info("Tracing %d pkts\n", tbase->aux->task_rt_dump.cur_trace);
743 uint32_t cur_trace = (n_pkts < tbase->aux->task_rt_dump.cur_trace) ? n_pkts: tbase->aux->task_rt_dump.cur_trace;
744 for (uint32_t i = 0; i < cur_trace; ++i) {
745 trace_one_rx_pkt(tbase, mbufs[i]);
746 trace_one_tx_pkt(tbase, mbufs[i], out, i);
749 ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
755 int tx_pkt_dump(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
757 uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
760 n_dump = n_pkts < n_dump? n_pkts : n_dump;
761 for (uint32_t i = 0; i < n_dump; ++i) {
765 plogdx_info(mbufs[i], "Handled: ");
768 plogdx_info(mbufs[i], "Dropped: ");
771 plogdx_info(mbufs[i], "TX[%d]: ", out[i]);
775 plogdx_info(mbufs[i], "TX: ");
777 tbase->aux->task_rt_dump.n_print_tx -= n_dump;
779 ret = tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
781 if (0 == tbase->aux->task_rt_dump.n_print_tx) {
782 if ((tbase->tx_pkt == tx_pkt_l3) || (tbase->tx_pkt == tx_pkt_ndp)) {
783 tbase->aux->tx_pkt_l2 = tbase->aux->tx_pkt_orig;
784 tbase->aux->tx_pkt_orig = NULL;
786 tbase->tx_pkt = tbase->aux->tx_pkt_orig;
787 tbase->aux->tx_pkt_orig = NULL;
793 /* Gather the distribution of the number of packets that have been
794 xmitted from one TX call. Since the value is only modified by the
795 task that xmits the packet, no atomic operation is needed. */
796 int tx_pkt_distr(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
798 if (likely(n_pkts < TX_BUCKET_SIZE))
799 tbase->aux->tx_bucket[n_pkts]++;
801 tbase->aux->tx_bucket[TX_BUCKET_SIZE - 1]++;
802 return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
805 int tx_pkt_bw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
807 uint32_t tx_bytes = 0;
808 uint32_t drop_bytes = 0;
810 for (uint16_t i = 0; i < n_pkts; ++i) {
811 if (!out || out[i] < OUT_HANDLED)
812 tx_bytes += mbuf_wire_size(mbufs[i]);
814 drop_bytes += mbuf_wire_size(mbufs[i]);
817 TASK_STATS_ADD_TX_BYTES(&tbase->aux->stats, tx_bytes);
818 TASK_STATS_ADD_DROP_BYTES(&tbase->aux->stats, drop_bytes);
819 return tbase->aux->tx_pkt_orig(tbase, mbufs, n_pkts, out);
822 int tx_pkt_drop_all(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, uint8_t *out)
824 for (uint16_t j = 0; j < n_pkts; ++j) {
825 rte_pktmbuf_free(mbufs[j]);
828 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, n_pkts);
830 for (uint16_t j = 0; j < n_pkts; ++j) {
831 if (out[j] == OUT_HANDLED)
832 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, 1);
834 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
839 static inline void dump_pkts(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
841 uint32_t n_dump = tbase->aux->task_rt_dump.n_print_tx;
842 uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
844 if (unlikely(n_dump)) {
845 n_dump = n_pkts < n_dump? n_pkts : n_dump;
846 for (uint32_t i = 0; i < n_dump; ++i) {
847 plogdx_info(mbufs[i], "TX: ");
849 tbase->aux->task_rt_dump.n_print_tx -= n_dump;
850 } else if (unlikely(n_trace)) {
851 n_trace = n_pkts < n_trace? n_pkts : n_trace;
852 for (uint32_t i = 0; i < n_trace; ++i) {
853 plogdx_info(mbufs[i], "TX: ");
855 tbase->aux->task_rt_dump.n_trace -= n_trace;
859 // ctrlplane packets are slow path, hence cost of checking if dump ortrace is needed in not too important
860 // easier to have this implementation than an implementation similar to dataplane tx
861 int tx_ctrlplane_hw(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
863 dump_pkts(tbase, mbufs, n_pkts);
864 return txhw_no_drop(&tbase->tx_params_hw.tx_port_queue[0], mbufs, n_pkts, tbase);
867 int tx_ctrlplane_sw(struct task_base *tbase, struct rte_mbuf **mbufs, const uint16_t n_pkts, __attribute__((unused)) uint8_t *out)
869 dump_pkts(tbase, mbufs, n_pkts);
870 return ring_enq_no_drop(tbase->tx_params_sw.tx_rings[0], mbufs, n_pkts, tbase);
873 static inline int tx_ring_all(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip)
875 if (tbase->aux->task_rt_dump.cur_trace) {
876 trace_one_rx_pkt(tbase, mbuf);
878 ctrl_ring_set_command_core_task_ip(mbuf, ((uint64_t)ip << 32) | (core_id << 16) | (task_id << 8) | command);
879 return rte_ring_enqueue(ring, mbuf);
882 int tx_ring_cti(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, uint32_t ip, uint16_t vlan)
884 plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
885 ctrl_ring_set_vlan(mbuf, vlan);
886 int ret = tx_ring_all(tbase, ring, command, mbuf, core_id, task_id, ip);
887 if (unlikely(ret != 0)) {
888 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
889 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
890 rte_pktmbuf_free(mbuf);
895 void tx_ring_ip(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, uint32_t ip)
897 plogx_dbg("\tSending command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
898 int ret = tx_ring_all(tbase, ring, command, mbuf, 0, 0, ip);
899 if (unlikely(ret != 0)) {
900 plogx_dbg("\tFail to send command %s with ip %d.%d.%d.%d to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IP4(ip), ring, mbuf, rte_ring_free_count(ring));
901 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
902 rte_pktmbuf_free(mbuf);
906 void tx_ring(struct task_base *tbase, struct rte_ring *ring, uint16_t command, struct rte_mbuf *mbuf)
908 plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
909 int ret = tx_ring_all(tbase, ring, command, mbuf, 0, 0, 0);
910 if (unlikely(ret != 0)) {
911 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
912 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
913 rte_pktmbuf_free(mbuf);
917 void tx_ring_route(struct task_base *tbase, struct rte_ring *ring, int add, struct rte_mbuf *mbuf, uint32_t ip, uint32_t gateway_ip, uint32_t prefix)
921 command = ROUTE_ADD_FROM_MASTER;
923 command = ROUTE_DEL_FROM_MASTER;
925 plogx_dbg("\tSending command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
926 ctrl_ring_set_command(mbuf, command);
927 ctrl_ring_set_ip(mbuf, ip);
928 ctrl_ring_set_gateway_ip(mbuf, gateway_ip);
929 ctrl_ring_set_prefix(mbuf, prefix);
930 if (tbase->aux->task_rt_dump.cur_trace) {
931 trace_one_rx_pkt(tbase, mbuf);
933 int ret = rte_ring_enqueue(ring, mbuf);
934 if (unlikely(ret != 0)) {
935 plogx_dbg("\tFail to send command %s to ring %p using mbuf %p - ring size now %d\n", actions_string[command], ring, mbuf, rte_ring_free_count(ring));
936 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
937 rte_pktmbuf_free(mbuf);
941 void tx_ring_cti6(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, uint8_t core_id, uint8_t task_id, struct ipv6_addr *ip, uint16_t vlan)
944 plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
945 if (tbase->aux->task_rt_dump.cur_trace) {
946 trace_one_rx_pkt(tbase, mbuf);
948 ctrl_ring_set_command_core_task_ip(mbuf, (core_id << 16) | (task_id << 8) | command);
949 ctrl_ring_set_ipv6_addr(mbuf, ip);
950 ctrl_ring_set_vlan(mbuf, vlan);
951 ret = rte_ring_enqueue(ring, mbuf);
953 if (unlikely(ret != 0)) {
954 plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p, core %d and task %d - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, core_id, task_id, rte_ring_free_count(ring));
955 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
956 rte_pktmbuf_free(mbuf);
960 void tx_ring_ip6(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, struct ipv6_addr *ip)
963 plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
964 if (tbase->aux->task_rt_dump.cur_trace) {
965 trace_one_rx_pkt(tbase, mbuf);
967 ctrl_ring_set_command(mbuf, command);
968 ctrl_ring_set_ipv6_addr(mbuf, ip);
969 ret = rte_ring_enqueue(ring, mbuf);
971 if (unlikely(ret != 0)) {
972 plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
973 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
974 rte_pktmbuf_free(mbuf);
978 void tx_ring_ip6_data(struct task_base *tbase, struct rte_ring *ring, uint8_t command, struct rte_mbuf *mbuf, struct ipv6_addr *ip, uint64_t data)
981 plogx_dbg("\tSending command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
982 if (tbase->aux->task_rt_dump.cur_trace) {
983 trace_one_rx_pkt(tbase, mbuf);
985 ctrl_ring_set_command(mbuf, command);
986 ctrl_ring_set_ipv6_addr(mbuf, ip);
987 ctrl_ring_set_data(mbuf, data);
988 ret = rte_ring_enqueue(ring, mbuf);
990 if (unlikely(ret != 0)) {
991 plogx_dbg("\tFail to send command %s with ip "IPv6_BYTES_FMT" to ring %p using mbuf %p - ring size now %d\n", actions_string[command], IPv6_BYTES(ip->bytes), ring, mbuf, rte_ring_free_count(ring));
992 TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, 1);
993 rte_pktmbuf_free(mbuf);