2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include <rte_cycles.h>
18 #include <rte_ethdev.h>
19 #include <rte_version.h>
22 #include "task_base.h"
26 #include "mbuf_utils.h"
27 #include "input.h" /* Needed for callback on dump */
29 /* _param version of the rx_pkt_hw functions are used to create two
30 instances of very similar variations of these functions. The
31 variations are specified by the "multi" parameter which significies
32 that the rte_eth_rx_burst function should be called multiple times.
33 The reason for this is that with the vector PMD, the maximum number
34 of packets being returned is 32. If packets have been split in
35 multiple mbufs then rte_eth_rx_burst might even receive less than
37 Some algorithms (like QoS) only work correctly if more than 32
38 packets are received if the dequeue step involves finding 32 packets.
43 static uint16_t rx_pkt_hw_port_queue(struct port_queue *pq, struct rte_mbuf **mbufs, int multi)
47 nb_rx = rte_eth_rx_burst(pq->port, pq->queue, mbufs, MAX_PKT_BURST);
51 while (n != 0 && MAX_PKT_BURST - nb_rx >= MIN_PMD_RX) {
52 n = rte_eth_rx_burst(pq->port, pq->queue, mbufs + nb_rx, MIN_PMD_RX);
54 PROX_PANIC(nb_rx > 64, "Received %d packets while expecting maximum %d\n", n, MIN_PMD_RX);
60 static void next_port(struct rx_params_hw *rx_params_hw)
62 ++rx_params_hw->last_read_portid;
63 if (unlikely(rx_params_hw->last_read_portid == rx_params_hw->nb_rxports)) {
64 rx_params_hw->last_read_portid = 0;
68 static void next_port_pow2(struct rx_params_hw *rx_params_hw)
70 rx_params_hw->last_read_portid = (rx_params_hw->last_read_portid + 1) & rx_params_hw->rxport_mask;
73 static uint16_t rx_pkt_hw_param(struct task_base *tbase, struct rte_mbuf ***mbufs, int multi,
74 void (*next)(struct rx_params_hw *rx_param_hw))
76 uint8_t last_read_portid;
79 START_EMPTY_MEASSURE();
80 *mbufs = tbase->ws_mbuf->mbuf[0] +
81 (RTE_ALIGN_CEIL(tbase->ws_mbuf->idx[0].prod, 2) & WS_MBUF_MASK);
83 last_read_portid = tbase->rx_params_hw.last_read_portid;
84 struct port_queue *pq = &tbase->rx_params_hw.rx_pq[last_read_portid];
86 nb_rx = rx_pkt_hw_port_queue(pq, *mbufs, multi);
87 next(&tbase->rx_params_hw);
89 if (likely(nb_rx > 0)) {
90 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
93 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
97 static inline uint16_t rx_pkt_hw1_param(struct task_base *tbase, struct rte_mbuf ***mbufs, int multi)
101 START_EMPTY_MEASSURE();
102 *mbufs = tbase->ws_mbuf->mbuf[0] +
103 (RTE_ALIGN_CEIL(tbase->ws_mbuf->idx[0].prod, 2) & WS_MBUF_MASK);
105 nb_rx = rte_eth_rx_burst(tbase->rx_params_hw1.rx_pq.port,
106 tbase->rx_params_hw1.rx_pq.queue,
107 *mbufs, MAX_PKT_BURST);
111 while ((n != 0) && (MAX_PKT_BURST - nb_rx >= MIN_PMD_RX)) {
112 n = rte_eth_rx_burst(tbase->rx_params_hw1.rx_pq.port,
113 tbase->rx_params_hw1.rx_pq.queue,
114 *mbufs + nb_rx, MIN_PMD_RX);
116 PROX_PANIC(nb_rx > 64, "Received %d packets while expecting maximum %d\n", n, MIN_PMD_RX);
120 if (likely(nb_rx > 0)) {
121 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
124 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
128 uint16_t rx_pkt_hw(struct task_base *tbase, struct rte_mbuf ***mbufs)
130 return rx_pkt_hw_param(tbase, mbufs, 0, next_port);
133 uint16_t rx_pkt_hw_pow2(struct task_base *tbase, struct rte_mbuf ***mbufs)
135 return rx_pkt_hw_param(tbase, mbufs, 0, next_port_pow2);
138 uint16_t rx_pkt_hw1(struct task_base *tbase, struct rte_mbuf ***mbufs)
140 return rx_pkt_hw1_param(tbase, mbufs, 0);
143 uint16_t rx_pkt_hw_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
145 return rx_pkt_hw_param(tbase, mbufs, 1, next_port);
148 uint16_t rx_pkt_hw_pow2_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
150 return rx_pkt_hw_param(tbase, mbufs, 1, next_port_pow2);
153 uint16_t rx_pkt_hw1_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
155 return rx_pkt_hw1_param(tbase, mbufs, 1);
158 /* The following functions implement ring access */
159 static uint16_t ring_deq(struct rte_ring *r, struct rte_mbuf **mbufs)
161 void **v_mbufs = (void **)mbufs;
163 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
164 return rte_ring_sc_dequeue_bulk(r, v_mbufs, MAX_RING_BURST) < 0? 0 : MAX_RING_BURST;
166 return rte_ring_sc_dequeue_bulk(r, v_mbufs, MAX_RING_BURST, NULL);
169 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
170 return rte_ring_sc_dequeue_burst(r, v_mbufs, MAX_RING_BURST);
172 return rte_ring_sc_dequeue_burst(r, v_mbufs, MAX_RING_BURST, NULL);
177 uint16_t rx_pkt_sw(struct task_base *tbase, struct rte_mbuf ***mbufs)
179 START_EMPTY_MEASSURE();
180 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
181 uint8_t lr = tbase->rx_params_sw.last_read_ring;
185 nb_rx = ring_deq(tbase->rx_params_sw.rx_rings[lr], *mbufs);
186 lr = lr + 1 == tbase->rx_params_sw.nb_rxrings? 0 : lr + 1;
187 } while(!nb_rx && lr != tbase->rx_params_sw.last_read_ring);
189 tbase->rx_params_sw.last_read_ring = lr;
192 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
196 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
201 /* Same as rx_pkt_sw expect with a mask for the number of receive
202 rings (can only be used if nb_rxring is a power of 2). */
203 uint16_t rx_pkt_sw_pow2(struct task_base *tbase, struct rte_mbuf ***mbufs)
205 START_EMPTY_MEASSURE();
206 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
207 uint8_t lr = tbase->rx_params_sw.last_read_ring;
211 nb_rx = ring_deq(tbase->rx_params_sw.rx_rings[lr], *mbufs);
212 lr = (lr + 1) & tbase->rx_params_sw.rxrings_mask;
213 } while(!nb_rx && lr != tbase->rx_params_sw.last_read_ring);
215 tbase->rx_params_sw.last_read_ring = lr;
218 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
222 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
227 uint16_t rx_pkt_self(struct task_base *tbase, struct rte_mbuf ***mbufs)
229 START_EMPTY_MEASSURE();
230 uint16_t nb_rx = tbase->ws_mbuf->idx[0].nb_rx;
232 tbase->ws_mbuf->idx[0].nb_rx = 0;
233 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
234 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
238 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
243 /* Used for tasks that do not receive packets (i.e. Packet
244 generation). Always returns 1 but never returns packets and does not
245 increment statistics. This function allows to use the same code path
246 as for tasks that actually receive packets. */
247 uint16_t rx_pkt_dummy(__attribute__((unused)) struct task_base *tbase,
248 __attribute__((unused)) struct rte_mbuf ***mbufs)
253 /* After the system has been configured, it is known if there is only
254 one RX ring. If this is the case, a more specialized version of the
255 function above can be used to save cycles. */
256 uint16_t rx_pkt_sw1(struct task_base *tbase, struct rte_mbuf ***mbufs)
258 START_EMPTY_MEASSURE();
259 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
260 uint16_t nb_rx = ring_deq(tbase->rx_params_sw1.rx_ring, *mbufs);
263 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
267 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
272 static uint16_t call_prev_rx_pkt(struct task_base *tbase, struct rte_mbuf ***mbufs)
276 if (tbase->aux->rx_prev_idx + 1 == tbase->aux->rx_prev_count) {
277 ret = tbase->aux->rx_pkt_prev[tbase->aux->rx_prev_idx](tbase, mbufs);
279 tbase->aux->rx_prev_idx++;
280 ret = tbase->aux->rx_pkt_prev[tbase->aux->rx_prev_idx](tbase, mbufs);
281 tbase->aux->rx_prev_idx--;
287 /* Only used when there are packets to be dumped. This function is
288 meant as a debugging tool and is therefore not optimized. When the
289 number of packets to dump falls back to 0, the original (optimized)
290 rx function is restored. This allows to support dumping packets
291 without any performance impact if the feature is not used. */
292 uint16_t rx_pkt_dump(struct task_base *tbase, struct rte_mbuf ***mbufs)
294 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
297 uint32_t n_dump = tbase->aux->task_rt_dump.n_print_rx;
298 n_dump = ret < n_dump? ret : n_dump;
300 if (tbase->aux->task_rt_dump.input->reply == NULL) {
301 for (uint32_t i = 0; i < n_dump; ++i) {
302 plogd_info((*mbufs)[i], "RX: ");
306 struct input *input = tbase->aux->task_rt_dump.input;
308 for (uint32_t i = 0; i < n_dump; ++i) {
309 /* TODO: Execute callback with full
310 data in a single call. */
314 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
315 int port_id = ((*mbufs)[i])->port;
317 int port_id = ((*mbufs)[i])->pkt.in_port;
319 strlen = snprintf(tmp, sizeof(tmp), "pktdump,%d,%d\n", port_id,
320 rte_pktmbuf_pkt_len((*mbufs)[i]));
322 input->reply(input, tmp, strlen);
323 input->reply(input, rte_pktmbuf_mtod((*mbufs)[i], char *), rte_pktmbuf_pkt_len((*mbufs)[i]));
324 input->reply(input, "\n", 1);
328 tbase->aux->task_rt_dump.n_print_rx -= n_dump;
330 if (0 == tbase->aux->task_rt_dump.n_print_rx) {
331 task_base_del_rx_pkt_function(tbase, rx_pkt_dump);
337 uint16_t rx_pkt_trace(struct task_base *tbase, struct rte_mbuf ***mbufs)
339 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
342 uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
343 n_trace = ret < n_trace? ret : n_trace;
344 tbase->aux->task_rt_dump.cur_trace = n_trace;
346 for (uint32_t i = 0; i < n_trace; ++i) {
347 uint8_t *pkt = rte_pktmbuf_mtod((*mbufs)[i], uint8_t *);
348 rte_memcpy(tbase->aux->task_rt_dump.pkt_cpy[i], pkt, sizeof(tbase->aux->task_rt_dump.pkt_cpy[i]));
349 tbase->aux->task_rt_dump.pkt_cpy_len[i] = rte_pktmbuf_pkt_len((*mbufs)[i]);
350 tbase->aux->task_rt_dump.pkt_mbuf_addr[i] = (*mbufs)[i];
353 tbase->aux->task_rt_dump.n_trace -= n_trace;
354 /* Unset by TX when n_trace = 0 */
359 /* Gather the distribution of the number of packets that have been
360 received from one RX call. Since the value is only modified by the
361 task that receives the packet, no atomic operation is needed. */
362 uint16_t rx_pkt_distr(struct task_base *tbase, struct rte_mbuf ***mbufs)
364 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
366 tbase->aux->rx_bucket[ret]++;
370 uint16_t rx_pkt_bw(struct task_base *tbase, struct rte_mbuf ***mbufs)
372 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
373 uint32_t tot_bytes = 0;
375 for (uint16_t i = 0; i < ret; ++i) {
376 tot_bytes += mbuf_wire_size((*mbufs)[i]);
379 TASK_STATS_ADD_RX_BYTES(&tbase->aux->stats, tot_bytes);
384 uint16_t rx_pkt_tsc(struct task_base *tbase, struct rte_mbuf ***mbufs)
386 uint64_t before = rte_rdtsc();
387 uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
388 uint64_t after = rte_rdtsc();
390 tbase->aux->tsc_rx.before = before;
391 tbase->aux->tsc_rx.after = after;
396 uint16_t rx_pkt_all(struct task_base *tbase, struct rte_mbuf ***mbufs)
400 struct rte_mbuf **new_mbufs;
401 struct rte_mbuf **dst = tbase->aux->all_mbufs;
403 /* In case we receive less than MAX_PKT_BURST packets in one
404 iteration, do no perform any copying of mbuf pointers. Use
405 the buffer itself instead. */
406 ret = call_prev_rx_pkt(tbase, &new_mbufs);
407 if (ret < MAX_PKT_BURST/2) {
412 memcpy(dst + tot, new_mbufs, ret * sizeof(*dst));
417 ret = call_prev_rx_pkt(tbase, &new_mbufs);
418 memcpy(dst + tot, new_mbufs, ret * sizeof(*dst));
420 } while (ret == MAX_PKT_BURST/2 && tot < MAX_RX_PKT_ALL - MAX_PKT_BURST);
422 if (tot >= MAX_RX_PKT_ALL - MAX_PKT_BURST) {
423 plog_err("Could not receive all packets - buffer full\n");