2 // Copyright (c) 2010-2020 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include <rte_cycles.h>
18 #include <rte_ethdev.h>
19 #include <rte_version.h>
22 #include "task_base.h"
26 #include "mbuf_utils.h"
30 #include "handle_master.h"
31 #include "input.h" /* Needed for callback on dump */
33 /* _param version of the rx_pkt_hw functions are used to create two
34 instances of very similar variations of these functions. The
35 variations are specified by the "multi" parameter which significies
36 that the rte_eth_rx_burst function should be called multiple times.
37 The reason for this is that with the vector PMD, the maximum number
38 of packets being returned is 32. If packets have been split in
39 multiple mbufs then rte_eth_rx_burst might even receive less than
41 Some algorithms (like QoS) only work correctly if more than 32
42 packets are received if the dequeue step involves finding 32 packets.
47 static uint16_t rx_pkt_hw_port_queue(struct port_queue *pq, struct rte_mbuf **mbufs, int multi)
51 nb_rx = rte_eth_rx_burst(pq->port, pq->queue, mbufs, MAX_PKT_BURST);
55 while (n != 0 && MAX_PKT_BURST - nb_rx >= MIN_PMD_RX) {
56 n = rte_eth_rx_burst(pq->port, pq->queue, mbufs + nb_rx, MIN_PMD_RX);
58 PROX_PANIC(nb_rx > 64, "Received %d packets while expecting maximum %d\n", n, MIN_PMD_RX);
64 static void next_port(struct rx_params_hw *rx_params_hw)
66 ++rx_params_hw->last_read_portid;
67 if (unlikely(rx_params_hw->last_read_portid == rx_params_hw->nb_rxports)) {
68 rx_params_hw->last_read_portid = 0;
72 static void next_port_pow2(struct rx_params_hw *rx_params_hw)
74 rx_params_hw->last_read_portid = (rx_params_hw->last_read_portid + 1) & rx_params_hw->rxport_mask;
77 static inline void dump_l3(struct task_base *tbase, struct rte_mbuf *mbuf)
79 if (unlikely(tbase->aux->task_rt_dump.n_print_rx)) {
80 if ((tbase->aux->task_rt_dump.input == NULL) || (tbase->aux->task_rt_dump.input->reply == NULL)) {
81 plogdx_info(mbuf, "RX: ");
83 struct input *input = tbase->aux->task_rt_dump.input;
86 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
87 int port_id = mbuf->port;
89 int port_id = mbuf->pkt.in_port;
91 strlen = snprintf(tmp, sizeof(tmp), "pktdump,%d,%d\n", port_id,
92 rte_pktmbuf_pkt_len(mbuf));
93 input->reply(input, tmp, strlen);
94 input->reply(input, rte_pktmbuf_mtod(mbuf, char *), rte_pktmbuf_pkt_len(mbuf));
95 input->reply(input, "\n", 1);
97 tbase->aux->task_rt_dump.n_print_rx --;
98 if (0 == tbase->aux->task_rt_dump.n_print_rx) {
99 task_base_del_rx_pkt_function(tbase, rx_pkt_dump);
102 if (unlikely(tbase->aux->task_rt_dump.n_trace)) {
103 plogdx_info(mbuf, "RX: ");
104 tbase->aux->task_rt_dump.n_trace--;
108 static uint16_t rx_pkt_hw_param(struct task_base *tbase, struct rte_mbuf ***mbufs_ptr, int multi,
109 void (*next)(struct rx_params_hw *rx_param_hw), int l3)
111 uint8_t last_read_portid;
115 START_EMPTY_MEASSURE();
116 *mbufs_ptr = tbase->ws_mbuf->mbuf[0] +
117 (RTE_ALIGN_CEIL(tbase->ws_mbuf->idx[0].prod, 2) & WS_MBUF_MASK);
119 last_read_portid = tbase->rx_params_hw.last_read_portid;
120 struct port_queue *pq = &tbase->rx_params_hw.rx_pq[last_read_portid];
122 nb_rx = rx_pkt_hw_port_queue(pq, *mbufs_ptr, multi);
123 next(&tbase->rx_params_hw);
126 struct rte_mbuf **mbufs = *mbufs_ptr;
128 struct ether_hdr_arp *hdr_arp[MAX_PKT_BURST];
129 prox_rte_ether_hdr *hdr;
130 for (i = 0; i < nb_rx; i++) {
133 for (i = 0; i < nb_rx; i++) {
134 hdr_arp[i] = rte_pktmbuf_mtod(mbufs[i], struct ether_hdr_arp *);
135 PREFETCH0(hdr_arp[i]);
137 for (i = 0; i < nb_rx; i++) {
138 if (likely(hdr_arp[i]->ether_hdr.ether_type == ETYPE_IPv4)) {
139 hdr = (prox_rte_ether_hdr *)hdr_arp[i];
140 prox_rte_ipv4_hdr *pip = (prox_rte_ipv4_hdr *)(hdr + 1);
141 if (pip->next_proto_id == IPPROTO_ICMP) {
142 dump_l3(tbase, mbufs[i]);
143 tx_ring(tbase, tbase->l3.ctrl_plane_ring, ICMP_TO_CTRL, mbufs[i]);
145 } else if (unlikely(skip)) {
146 mbufs[i - skip] = mbufs[i];
148 } else if (unlikely(hdr_arp[i]->ether_hdr.ether_type == ETYPE_ARP)) {
149 dump_l3(tbase, mbufs[i]);
150 tx_ring(tbase, tbase->l3.ctrl_plane_ring, ARP_TO_CTRL, mbufs[i]);
152 } else if (unlikely(skip)) {
153 mbufs[i - skip] = mbufs[i];
159 TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, skip);
160 if (likely(nb_rx > 0)) {
161 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
164 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
168 static inline uint16_t rx_pkt_hw1_param(struct task_base *tbase, struct rte_mbuf ***mbufs_ptr, int multi, int l3)
173 START_EMPTY_MEASSURE();
174 *mbufs_ptr = tbase->ws_mbuf->mbuf[0] +
175 (RTE_ALIGN_CEIL(tbase->ws_mbuf->idx[0].prod, 2) & WS_MBUF_MASK);
177 nb_rx = rte_eth_rx_burst(tbase->rx_params_hw1.rx_pq.port,
178 tbase->rx_params_hw1.rx_pq.queue,
179 *mbufs_ptr, MAX_PKT_BURST);
183 while ((n != 0) && (MAX_PKT_BURST - nb_rx >= MIN_PMD_RX)) {
184 n = rte_eth_rx_burst(tbase->rx_params_hw1.rx_pq.port,
185 tbase->rx_params_hw1.rx_pq.queue,
186 *mbufs_ptr + nb_rx, MIN_PMD_RX);
188 PROX_PANIC(nb_rx > 64, "Received %d packets while expecting maximum %d\n", n, MIN_PMD_RX);
193 struct rte_mbuf **mbufs = *mbufs_ptr;
195 struct ether_hdr_arp *hdr_arp[MAX_PKT_BURST];
196 prox_rte_ether_hdr *hdr;
197 for (i = 0; i < nb_rx; i++) {
200 for (i = 0; i < nb_rx; i++) {
201 hdr_arp[i] = rte_pktmbuf_mtod(mbufs[i], struct ether_hdr_arp *);
202 PREFETCH0(hdr_arp[i]);
204 for (i = 0; i < nb_rx; i++) {
205 if (likely(hdr_arp[i]->ether_hdr.ether_type == ETYPE_IPv4)) {
206 hdr = (prox_rte_ether_hdr *)hdr_arp[i];
207 prox_rte_ipv4_hdr *pip = (prox_rte_ipv4_hdr *)(hdr + 1);
208 if (pip->next_proto_id == IPPROTO_ICMP) {
209 dump_l3(tbase, mbufs[i]);
210 tx_ring(tbase, tbase->l3.ctrl_plane_ring, ICMP_TO_CTRL, mbufs[i]);
212 } else if (unlikely(skip)) {
213 mbufs[i - skip] = mbufs[i];
215 } else if (unlikely(hdr_arp[i]->ether_hdr.ether_type == ETYPE_ARP)) {
216 dump_l3(tbase, mbufs[i]);
217 tx_ring(tbase, tbase->l3.ctrl_plane_ring, ARP_TO_CTRL, mbufs[i]);
219 } else if (unlikely(skip)) {
220 mbufs[i - skip] = mbufs[i];
226 TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, skip);
227 if (likely(nb_rx > 0)) {
228 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
231 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
235 uint16_t rx_pkt_hw(struct task_base *tbase, struct rte_mbuf ***mbufs)
237 return rx_pkt_hw_param(tbase, mbufs, 0, next_port, 0);
240 uint16_t rx_pkt_hw_pow2(struct task_base *tbase, struct rte_mbuf ***mbufs)
242 return rx_pkt_hw_param(tbase, mbufs, 0, next_port_pow2, 0);
245 uint16_t rx_pkt_hw1(struct task_base *tbase, struct rte_mbuf ***mbufs)
247 return rx_pkt_hw1_param(tbase, mbufs, 0, 0);
250 uint16_t rx_pkt_hw_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
252 return rx_pkt_hw_param(tbase, mbufs, 1, next_port, 0);
255 uint16_t rx_pkt_hw_pow2_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
257 return rx_pkt_hw_param(tbase, mbufs, 1, next_port_pow2, 0);
260 uint16_t rx_pkt_hw1_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
262 return rx_pkt_hw1_param(tbase, mbufs, 1, 0);
265 uint16_t rx_pkt_hw_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
267 return rx_pkt_hw_param(tbase, mbufs, 0, next_port, 1);
270 uint16_t rx_pkt_hw_pow2_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
272 return rx_pkt_hw_param(tbase, mbufs, 0, next_port_pow2, 1);
275 uint16_t rx_pkt_hw1_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
277 return rx_pkt_hw1_param(tbase, mbufs, 0, 1);
280 uint16_t rx_pkt_hw_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
282 return rx_pkt_hw_param(tbase, mbufs, 1, next_port, 1);
285 uint16_t rx_pkt_hw_pow2_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
287 return rx_pkt_hw_param(tbase, mbufs, 1, next_port_pow2, 1);
290 uint16_t rx_pkt_hw1_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
292 return rx_pkt_hw1_param(tbase, mbufs, 1, 1);
295 /* The following functions implement ring access */
296 uint16_t ring_deq(struct rte_ring *r, struct rte_mbuf **mbufs)
298 void **v_mbufs = (void **)mbufs;
300 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
301 return rte_ring_sc_dequeue_bulk(r, v_mbufs, MAX_RING_BURST) < 0? 0 : MAX_RING_BURST;
303 return rte_ring_sc_dequeue_bulk(r, v_mbufs, MAX_RING_BURST, NULL);
306 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
307 return rte_ring_sc_dequeue_burst(r, v_mbufs, MAX_RING_BURST);
309 return rte_ring_sc_dequeue_burst(r, v_mbufs, MAX_RING_BURST, NULL);
314 uint16_t rx_pkt_sw(struct task_base *tbase, struct rte_mbuf ***mbufs)
316 START_EMPTY_MEASSURE();
317 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
318 uint8_t lr = tbase->rx_params_sw.last_read_ring;
322 nb_rx = ring_deq(tbase->rx_params_sw.rx_rings[lr], *mbufs);
323 lr = lr + 1 == tbase->rx_params_sw.nb_rxrings? 0 : lr + 1;
324 } while(!nb_rx && lr != tbase->rx_params_sw.last_read_ring);
326 tbase->rx_params_sw.last_read_ring = lr;
329 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
333 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
338 /* Same as rx_pkt_sw expect with a mask for the number of receive
339 rings (can only be used if nb_rxring is a power of 2). */
340 uint16_t rx_pkt_sw_pow2(struct task_base *tbase, struct rte_mbuf ***mbufs)
342 START_EMPTY_MEASSURE();
343 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
344 uint8_t lr = tbase->rx_params_sw.last_read_ring;
348 nb_rx = ring_deq(tbase->rx_params_sw.rx_rings[lr], *mbufs);
349 lr = (lr + 1) & tbase->rx_params_sw.rxrings_mask;
350 } while(!nb_rx && lr != tbase->rx_params_sw.last_read_ring);
352 tbase->rx_params_sw.last_read_ring = lr;
355 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
359 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
364 uint16_t rx_pkt_self(struct task_base *tbase, struct rte_mbuf ***mbufs)
366 START_EMPTY_MEASSURE();
367 uint16_t nb_rx = tbase->ws_mbuf->idx[0].nb_rx;
369 tbase->ws_mbuf->idx[0].nb_rx = 0;
370 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
371 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
375 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
380 /* Used for tasks that do not receive packets (i.e. Packet
381 generation). Always returns 1 but never returns packets and does not
382 increment statistics. This function allows to use the same code path
383 as for tasks that actually receive packets. */
384 uint16_t rx_pkt_dummy(__attribute__((unused)) struct task_base *tbase,
385 __attribute__((unused)) struct rte_mbuf ***mbufs)
390 /* After the system has been configured, it is known if there is only
391 one RX ring. If this is the case, a more specialized version of the
392 function above can be used to save cycles. */
393 uint16_t rx_pkt_sw1(struct task_base *tbase, struct rte_mbuf ***mbufs)
395 START_EMPTY_MEASSURE();
396 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
397 uint16_t nb_rx = ring_deq(tbase->rx_params_sw1.rx_ring, *mbufs);
400 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
404 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
409 static uint16_t call_prev_rx_pkt(struct task_base *tbase, struct rte_mbuf ***mbufs)
413 tbase->aux->rx_prev_idx++;
414 ret = tbase->aux->rx_pkt_prev[tbase->aux->rx_prev_idx - 1](tbase, mbufs);
415 tbase->aux->rx_prev_idx--;
420 /* Only used when there are packets to be dumped. This function is
421 meant as a debugging tool and is therefore not optimized. When the
422 number of packets to dump falls back to 0, the original (optimized)
423 rx function is restored. This allows to support dumping packets
424 without any performance impact if the feature is not used. */
425 uint16_t rx_pkt_dump(struct task_base *tbase, struct rte_mbuf ***mbufs)
427 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
430 uint32_t n_dump = tbase->aux->task_rt_dump.n_print_rx;
431 n_dump = ret < n_dump? ret : n_dump;
433 if ((tbase->aux->task_rt_dump.input == NULL) || (tbase->aux->task_rt_dump.input->reply == NULL)) {
434 for (uint32_t i = 0; i < n_dump; ++i) {
435 plogdx_info((*mbufs)[i], "RX: ");
439 struct input *input = tbase->aux->task_rt_dump.input;
441 for (uint32_t i = 0; i < n_dump; ++i) {
442 /* TODO: Execute callback with full
443 data in a single call. */
447 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
448 int port_id = ((*mbufs)[i])->port;
450 int port_id = ((*mbufs)[i])->pkt.in_port;
452 strlen = snprintf(tmp, sizeof(tmp), "pktdump,%d,%d\n", port_id,
453 rte_pktmbuf_pkt_len((*mbufs)[i]));
455 input->reply(input, tmp, strlen);
456 input->reply(input, rte_pktmbuf_mtod((*mbufs)[i], char *), rte_pktmbuf_pkt_len((*mbufs)[i]));
457 input->reply(input, "\n", 1);
461 tbase->aux->task_rt_dump.n_print_rx -= n_dump;
463 if (0 == tbase->aux->task_rt_dump.n_print_rx) {
464 task_base_del_rx_pkt_function(tbase, rx_pkt_dump);
470 uint16_t rx_pkt_trace(struct task_base *tbase, struct rte_mbuf ***mbufs)
472 tbase->aux->task_rt_dump.cur_trace = 0;
473 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
476 uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
477 n_trace = ret < n_trace? ret : n_trace;
478 n_trace = n_trace <= MAX_RING_BURST ? n_trace : MAX_RING_BURST;
480 for (uint32_t i = 0; i < n_trace; ++i) {
481 uint8_t *pkt = rte_pktmbuf_mtod((*mbufs)[i], uint8_t *);
482 rte_memcpy(tbase->aux->task_rt_dump.pkt_cpy[i], pkt, sizeof(tbase->aux->task_rt_dump.pkt_cpy[i]));
483 tbase->aux->task_rt_dump.pkt_cpy_len[i] = rte_pktmbuf_pkt_len((*mbufs)[i]);
484 tbase->aux->task_rt_dump.pkt_mbuf_addr[i] = (*mbufs)[i];
486 tbase->aux->task_rt_dump.cur_trace += n_trace;
488 tbase->aux->task_rt_dump.n_trace -= n_trace;
489 /* Unset by TX when n_trace = 0 */
494 /* Gather the distribution of the number of packets that have been
495 received from one RX call. Since the value is only modified by the
496 task that receives the packet, no atomic operation is needed. */
497 uint16_t rx_pkt_distr(struct task_base *tbase, struct rte_mbuf ***mbufs)
499 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
501 if (likely(ret < RX_BUCKET_SIZE))
502 tbase->aux->rx_bucket[ret]++;
504 tbase->aux->rx_bucket[RX_BUCKET_SIZE - 1]++;
508 uint16_t rx_pkt_bw(struct task_base *tbase, struct rte_mbuf ***mbufs)
510 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
511 uint32_t tot_bytes = 0;
513 for (uint16_t i = 0; i < ret; ++i) {
514 tot_bytes += mbuf_wire_size((*mbufs)[i]);
517 TASK_STATS_ADD_RX_BYTES(&tbase->aux->stats, tot_bytes);
522 uint16_t rx_pkt_tsc(struct task_base *tbase, struct rte_mbuf ***mbufs)
524 uint64_t before = rte_rdtsc();
525 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
526 uint64_t after = rte_rdtsc();
528 tbase->aux->tsc_rx.before = before;
529 tbase->aux->tsc_rx.after = after;