2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include <rte_cycles.h>
18 #include <rte_ethdev.h>
19 #include <rte_version.h>
22 #include "task_base.h"
26 #include "mbuf_utils.h"
30 #include "handle_master.h"
31 #include "input.h" /* Needed for callback on dump */
33 /* _param version of the rx_pkt_hw functions are used to create two
34 instances of very similar variations of these functions. The
35 variations are specified by the "multi" parameter which significies
36 that the rte_eth_rx_burst function should be called multiple times.
37 The reason for this is that with the vector PMD, the maximum number
38 of packets being returned is 32. If packets have been split in
39 multiple mbufs then rte_eth_rx_burst might even receive less than
41 Some algorithms (like QoS) only work correctly if more than 32
42 packets are received if the dequeue step involves finding 32 packets.
47 static uint16_t rx_pkt_hw_port_queue(struct port_queue *pq, struct rte_mbuf **mbufs, int multi)
51 nb_rx = rte_eth_rx_burst(pq->port, pq->queue, mbufs, MAX_PKT_BURST);
55 while (n != 0 && MAX_PKT_BURST - nb_rx >= MIN_PMD_RX) {
56 n = rte_eth_rx_burst(pq->port, pq->queue, mbufs + nb_rx, MIN_PMD_RX);
58 PROX_PANIC(nb_rx > 64, "Received %d packets while expecting maximum %d\n", n, MIN_PMD_RX);
64 static void next_port(struct rx_params_hw *rx_params_hw)
66 ++rx_params_hw->last_read_portid;
67 if (unlikely(rx_params_hw->last_read_portid == rx_params_hw->nb_rxports)) {
68 rx_params_hw->last_read_portid = 0;
72 static void next_port_pow2(struct rx_params_hw *rx_params_hw)
74 rx_params_hw->last_read_portid = (rx_params_hw->last_read_portid + 1) & rx_params_hw->rxport_mask;
77 static inline void dump_l3(struct task_base *tbase, struct rte_mbuf *mbuf)
79 if (unlikely(tbase->aux->task_rt_dump.n_print_rx)) {
80 if ((tbase->aux->task_rt_dump.input == NULL) || (tbase->aux->task_rt_dump.input->reply == NULL)) {
81 plogdx_info(mbuf, "RX: ");
83 struct input *input = tbase->aux->task_rt_dump.input;
86 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
87 int port_id = mbuf->port;
89 int port_id = mbuf->pkt.in_port;
91 strlen = snprintf(tmp, sizeof(tmp), "pktdump,%d,%d\n", port_id,
92 rte_pktmbuf_pkt_len(mbuf));
93 input->reply(input, tmp, strlen);
94 input->reply(input, rte_pktmbuf_mtod(mbuf, char *), rte_pktmbuf_pkt_len(mbuf));
95 input->reply(input, "\n", 1);
97 tbase->aux->task_rt_dump.n_print_rx --;
98 if (0 == tbase->aux->task_rt_dump.n_print_rx) {
99 task_base_del_rx_pkt_function(tbase, rx_pkt_dump);
102 if (unlikely(tbase->aux->task_rt_dump.n_trace)) {
103 plogdx_info(mbuf, "RX: ");
104 tbase->aux->task_rt_dump.n_trace--;
108 static uint16_t rx_pkt_hw_param(struct task_base *tbase, struct rte_mbuf ***mbufs_ptr, int multi,
109 void (*next)(struct rx_params_hw *rx_param_hw), int l3)
111 uint8_t last_read_portid;
115 START_EMPTY_MEASSURE();
116 *mbufs_ptr = tbase->ws_mbuf->mbuf[0] +
117 (RTE_ALIGN_CEIL(tbase->ws_mbuf->idx[0].prod, 2) & WS_MBUF_MASK);
119 last_read_portid = tbase->rx_params_hw.last_read_portid;
120 struct port_queue *pq = &tbase->rx_params_hw.rx_pq[last_read_portid];
122 nb_rx = rx_pkt_hw_port_queue(pq, *mbufs_ptr, multi);
123 next(&tbase->rx_params_hw);
126 struct rte_mbuf **mbufs = *mbufs_ptr;
128 struct ether_hdr_arp *hdr[MAX_PKT_BURST];
129 for (i = 0; i < nb_rx; i++) {
132 for (i = 0; i < nb_rx; i++) {
133 hdr[i] = rte_pktmbuf_mtod(mbufs[i], struct ether_hdr_arp *);
136 for (i = 0; i < nb_rx; i++) {
137 if (unlikely(hdr[i]->ether_hdr.ether_type == ETYPE_ARP)) {
138 dump_l3(tbase, mbufs[i]);
139 tx_ring(tbase, tbase->l3.ctrl_plane_ring, ARP_TO_CTRL, mbufs[i]);
141 } else if (unlikely(skip)) {
142 mbufs[i - skip] = mbufs[i];
148 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, skip);
149 if (likely(nb_rx > 0)) {
150 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
153 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
157 static inline uint16_t rx_pkt_hw1_param(struct task_base *tbase, struct rte_mbuf ***mbufs_ptr, int multi, int l3)
162 START_EMPTY_MEASSURE();
163 *mbufs_ptr = tbase->ws_mbuf->mbuf[0] +
164 (RTE_ALIGN_CEIL(tbase->ws_mbuf->idx[0].prod, 2) & WS_MBUF_MASK);
166 nb_rx = rte_eth_rx_burst(tbase->rx_params_hw1.rx_pq.port,
167 tbase->rx_params_hw1.rx_pq.queue,
168 *mbufs_ptr, MAX_PKT_BURST);
172 while ((n != 0) && (MAX_PKT_BURST - nb_rx >= MIN_PMD_RX)) {
173 n = rte_eth_rx_burst(tbase->rx_params_hw1.rx_pq.port,
174 tbase->rx_params_hw1.rx_pq.queue,
175 *mbufs_ptr + nb_rx, MIN_PMD_RX);
177 PROX_PANIC(nb_rx > 64, "Received %d packets while expecting maximum %d\n", n, MIN_PMD_RX);
182 struct rte_mbuf **mbufs = *mbufs_ptr;
184 struct ether_hdr_arp *hdr[MAX_PKT_BURST];
185 for (i = 0; i < nb_rx; i++) {
188 for (i = 0; i < nb_rx; i++) {
189 hdr[i] = rte_pktmbuf_mtod(mbufs[i], struct ether_hdr_arp *);
192 for (i = 0; i < nb_rx; i++) {
193 if (unlikely(hdr[i]->ether_hdr.ether_type == ETYPE_ARP)) {
194 dump_l3(tbase, mbufs[i]);
195 tx_ring(tbase, tbase->l3.ctrl_plane_ring, ARP_TO_CTRL, mbufs[i]);
197 } else if (unlikely(skip)) {
198 mbufs[i - skip] = mbufs[i];
204 TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, skip);
205 if (likely(nb_rx > 0)) {
206 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
209 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
213 uint16_t rx_pkt_hw(struct task_base *tbase, struct rte_mbuf ***mbufs)
215 return rx_pkt_hw_param(tbase, mbufs, 0, next_port, 0);
218 uint16_t rx_pkt_hw_pow2(struct task_base *tbase, struct rte_mbuf ***mbufs)
220 return rx_pkt_hw_param(tbase, mbufs, 0, next_port_pow2, 0);
223 uint16_t rx_pkt_hw1(struct task_base *tbase, struct rte_mbuf ***mbufs)
225 return rx_pkt_hw1_param(tbase, mbufs, 0, 0);
228 uint16_t rx_pkt_hw_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
230 return rx_pkt_hw_param(tbase, mbufs, 1, next_port, 0);
233 uint16_t rx_pkt_hw_pow2_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
235 return rx_pkt_hw_param(tbase, mbufs, 1, next_port_pow2, 0);
238 uint16_t rx_pkt_hw1_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
240 return rx_pkt_hw1_param(tbase, mbufs, 1, 0);
243 uint16_t rx_pkt_hw_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
245 return rx_pkt_hw_param(tbase, mbufs, 0, next_port, 1);
248 uint16_t rx_pkt_hw_pow2_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
250 return rx_pkt_hw_param(tbase, mbufs, 0, next_port_pow2, 1);
253 uint16_t rx_pkt_hw1_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
255 return rx_pkt_hw1_param(tbase, mbufs, 0, 1);
258 uint16_t rx_pkt_hw_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
260 return rx_pkt_hw_param(tbase, mbufs, 1, next_port, 1);
263 uint16_t rx_pkt_hw_pow2_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
265 return rx_pkt_hw_param(tbase, mbufs, 1, next_port_pow2, 1);
268 uint16_t rx_pkt_hw1_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
270 return rx_pkt_hw1_param(tbase, mbufs, 1, 1);
273 /* The following functions implement ring access */
274 uint16_t ring_deq(struct rte_ring *r, struct rte_mbuf **mbufs)
276 void **v_mbufs = (void **)mbufs;
278 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
279 return rte_ring_sc_dequeue_bulk(r, v_mbufs, MAX_RING_BURST) < 0? 0 : MAX_RING_BURST;
281 return rte_ring_sc_dequeue_bulk(r, v_mbufs, MAX_RING_BURST, NULL);
284 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
285 return rte_ring_sc_dequeue_burst(r, v_mbufs, MAX_RING_BURST);
287 return rte_ring_sc_dequeue_burst(r, v_mbufs, MAX_RING_BURST, NULL);
292 uint16_t rx_pkt_sw(struct task_base *tbase, struct rte_mbuf ***mbufs)
294 START_EMPTY_MEASSURE();
295 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
296 uint8_t lr = tbase->rx_params_sw.last_read_ring;
300 nb_rx = ring_deq(tbase->rx_params_sw.rx_rings[lr], *mbufs);
301 lr = lr + 1 == tbase->rx_params_sw.nb_rxrings? 0 : lr + 1;
302 } while(!nb_rx && lr != tbase->rx_params_sw.last_read_ring);
304 tbase->rx_params_sw.last_read_ring = lr;
307 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
311 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
316 /* Same as rx_pkt_sw expect with a mask for the number of receive
317 rings (can only be used if nb_rxring is a power of 2). */
318 uint16_t rx_pkt_sw_pow2(struct task_base *tbase, struct rte_mbuf ***mbufs)
320 START_EMPTY_MEASSURE();
321 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
322 uint8_t lr = tbase->rx_params_sw.last_read_ring;
326 nb_rx = ring_deq(tbase->rx_params_sw.rx_rings[lr], *mbufs);
327 lr = (lr + 1) & tbase->rx_params_sw.rxrings_mask;
328 } while(!nb_rx && lr != tbase->rx_params_sw.last_read_ring);
330 tbase->rx_params_sw.last_read_ring = lr;
333 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
337 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
342 uint16_t rx_pkt_self(struct task_base *tbase, struct rte_mbuf ***mbufs)
344 START_EMPTY_MEASSURE();
345 uint16_t nb_rx = tbase->ws_mbuf->idx[0].nb_rx;
347 tbase->ws_mbuf->idx[0].nb_rx = 0;
348 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
349 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
353 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
358 /* Used for tasks that do not receive packets (i.e. Packet
359 generation). Always returns 1 but never returns packets and does not
360 increment statistics. This function allows to use the same code path
361 as for tasks that actually receive packets. */
362 uint16_t rx_pkt_dummy(__attribute__((unused)) struct task_base *tbase,
363 __attribute__((unused)) struct rte_mbuf ***mbufs)
368 /* After the system has been configured, it is known if there is only
369 one RX ring. If this is the case, a more specialized version of the
370 function above can be used to save cycles. */
371 uint16_t rx_pkt_sw1(struct task_base *tbase, struct rte_mbuf ***mbufs)
373 START_EMPTY_MEASSURE();
374 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
375 uint16_t nb_rx = ring_deq(tbase->rx_params_sw1.rx_ring, *mbufs);
378 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
382 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
387 static uint16_t call_prev_rx_pkt(struct task_base *tbase, struct rte_mbuf ***mbufs)
391 tbase->aux->rx_prev_idx++;
392 ret = tbase->aux->rx_pkt_prev[tbase->aux->rx_prev_idx - 1](tbase, mbufs);
393 tbase->aux->rx_prev_idx--;
398 /* Only used when there are packets to be dumped. This function is
399 meant as a debugging tool and is therefore not optimized. When the
400 number of packets to dump falls back to 0, the original (optimized)
401 rx function is restored. This allows to support dumping packets
402 without any performance impact if the feature is not used. */
403 uint16_t rx_pkt_dump(struct task_base *tbase, struct rte_mbuf ***mbufs)
405 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
408 uint32_t n_dump = tbase->aux->task_rt_dump.n_print_rx;
409 n_dump = ret < n_dump? ret : n_dump;
411 if ((tbase->aux->task_rt_dump.input == NULL) || (tbase->aux->task_rt_dump.input->reply == NULL)) {
412 for (uint32_t i = 0; i < n_dump; ++i) {
413 plogdx_info((*mbufs)[i], "RX: ");
417 struct input *input = tbase->aux->task_rt_dump.input;
419 for (uint32_t i = 0; i < n_dump; ++i) {
420 /* TODO: Execute callback with full
421 data in a single call. */
425 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
426 int port_id = ((*mbufs)[i])->port;
428 int port_id = ((*mbufs)[i])->pkt.in_port;
430 strlen = snprintf(tmp, sizeof(tmp), "pktdump,%d,%d\n", port_id,
431 rte_pktmbuf_pkt_len((*mbufs)[i]));
433 input->reply(input, tmp, strlen);
434 input->reply(input, rte_pktmbuf_mtod((*mbufs)[i], char *), rte_pktmbuf_pkt_len((*mbufs)[i]));
435 input->reply(input, "\n", 1);
439 tbase->aux->task_rt_dump.n_print_rx -= n_dump;
441 if (0 == tbase->aux->task_rt_dump.n_print_rx) {
442 task_base_del_rx_pkt_function(tbase, rx_pkt_dump);
448 uint16_t rx_pkt_trace(struct task_base *tbase, struct rte_mbuf ***mbufs)
450 tbase->aux->task_rt_dump.cur_trace = 0;
451 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
454 uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
455 n_trace = ret < n_trace? ret : n_trace;
456 n_trace = n_trace <= MAX_RING_BURST ? n_trace : MAX_RING_BURST;
458 for (uint32_t i = 0; i < n_trace; ++i) {
459 uint8_t *pkt = rte_pktmbuf_mtod((*mbufs)[i], uint8_t *);
460 rte_memcpy(tbase->aux->task_rt_dump.pkt_cpy[i], pkt, sizeof(tbase->aux->task_rt_dump.pkt_cpy[i]));
461 tbase->aux->task_rt_dump.pkt_cpy_len[i] = rte_pktmbuf_pkt_len((*mbufs)[i]);
462 tbase->aux->task_rt_dump.pkt_mbuf_addr[i] = (*mbufs)[i];
464 tbase->aux->task_rt_dump.cur_trace += n_trace;
466 tbase->aux->task_rt_dump.n_trace -= n_trace;
467 /* Unset by TX when n_trace = 0 */
472 /* Gather the distribution of the number of packets that have been
473 received from one RX call. Since the value is only modified by the
474 task that receives the packet, no atomic operation is needed. */
475 uint16_t rx_pkt_distr(struct task_base *tbase, struct rte_mbuf ***mbufs)
477 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
479 if (likely(ret < RX_BUCKET_SIZE))
480 tbase->aux->rx_bucket[ret]++;
482 tbase->aux->rx_bucket[RX_BUCKET_SIZE - 1]++;
486 uint16_t rx_pkt_bw(struct task_base *tbase, struct rte_mbuf ***mbufs)
488 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
489 uint32_t tot_bytes = 0;
491 for (uint16_t i = 0; i < ret; ++i) {
492 tot_bytes += mbuf_wire_size((*mbufs)[i]);
495 TASK_STATS_ADD_RX_BYTES(&tbase->aux->stats, tot_bytes);
500 uint16_t rx_pkt_tsc(struct task_base *tbase, struct rte_mbuf ***mbufs)
502 uint64_t before = rte_rdtsc();
503 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
504 uint64_t after = rte_rdtsc();
506 tbase->aux->tsc_rx.before = before;
507 tbase->aux->tsc_rx.after = after;
512 uint16_t rx_pkt_all(struct task_base *tbase, struct rte_mbuf ***mbufs)
516 struct rte_mbuf **new_mbufs;
517 struct rte_mbuf **dst = tbase->aux->all_mbufs;
519 /* In case we receive less than MAX_PKT_BURST packets in one
520 iteration, do no perform any copying of mbuf pointers. Use
521 the buffer itself instead. */
522 ret = call_prev_rx_pkt(tbase, &new_mbufs);
523 if (ret < MAX_PKT_BURST/2) {
528 memcpy(dst + tot, new_mbufs, ret * sizeof(*dst));
533 ret = call_prev_rx_pkt(tbase, &new_mbufs);
534 memcpy(dst + tot, new_mbufs, ret * sizeof(*dst));
536 } while (ret == MAX_PKT_BURST/2 && tot < MAX_RX_PKT_ALL - MAX_PKT_BURST);
538 if (tot >= MAX_RX_PKT_ALL - MAX_PKT_BURST) {
539 plog_err("Could not receive all packets - buffer full\n");