6a6112b585b480ca1508138b4c3ab362d37808de
[samplevnf.git] / VNFs / DPPD-PROX / rx_pkt.c
1 /*
2 // Copyright (c) 2010-2020 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_cycles.h>
18 #include <rte_ethdev.h>
19 #include <rte_version.h>
20
21 #include "rx_pkt.h"
22 #include "task_base.h"
23 #include "clock.h"
24 #include "stats.h"
25 #include "log.h"
26 #include "mbuf_utils.h"
27 #include "prefetch.h"
28 #include "arp.h"
29 #include "tx_pkt.h"
30 #include "handle_master.h"
31 #include "input.h" /* Needed for callback on dump */
32
33 #define TCP_PORT_BGP    rte_cpu_to_be_16(179)
34
35 /* _param version of the rx_pkt_hw functions are used to create two
36    instances of very similar variations of these functions. The
37    variations are specified by the "multi" parameter which significies
38    that the rte_eth_rx_burst function should be called multiple times.
39    The reason for this is that with the vector PMD, the maximum number
40    of packets being returned is 32. If packets have been split in
41    multiple mbufs then rte_eth_rx_burst might even receive less than
42    32 packets.
43    Some algorithms (like QoS) only work correctly if more than 32
44    packets are received if the dequeue step involves finding 32 packets.
45 */
46
47 #define MIN_PMD_RX 32
48
49 static uint16_t rx_pkt_hw_port_queue(struct port_queue *pq, struct rte_mbuf **mbufs, int multi)
50 {
51         uint16_t nb_rx, n;
52
53         nb_rx = rte_eth_rx_burst(pq->port, pq->queue, mbufs, MAX_PKT_BURST);
54
55         if (multi) {
56                 n = nb_rx;
57                 while (n != 0 && MAX_PKT_BURST - nb_rx >= MIN_PMD_RX) {
58                         n = rte_eth_rx_burst(pq->port, pq->queue, mbufs + nb_rx, MIN_PMD_RX);
59                         nb_rx += n;
60                         PROX_PANIC(nb_rx > 64, "Received %d packets while expecting maximum %d\n", n, MIN_PMD_RX);
61                 }
62         }
63         return nb_rx;
64 }
65
66 static void next_port(struct rx_params_hw *rx_params_hw)
67 {
68         ++rx_params_hw->last_read_portid;
69         if (unlikely(rx_params_hw->last_read_portid == rx_params_hw->nb_rxports)) {
70                 rx_params_hw->last_read_portid = 0;
71         }
72 }
73
74 static void next_port_pow2(struct rx_params_hw *rx_params_hw)
75 {
76         rx_params_hw->last_read_portid = (rx_params_hw->last_read_portid + 1) & rx_params_hw->rxport_mask;
77 }
78
79 static inline void dump_l3(struct task_base *tbase, struct rte_mbuf *mbuf)
80 {
81         if (unlikely(tbase->aux->task_rt_dump.n_print_rx)) {
82                 if ((tbase->aux->task_rt_dump.input == NULL) || (tbase->aux->task_rt_dump.input->reply == NULL)) {
83                         plogdx_info(mbuf, "RX: ");
84                 } else {
85                         struct input *input = tbase->aux->task_rt_dump.input;
86                         char tmp[128];
87                         int strlen;
88 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
89                         int port_id = mbuf->port;
90 #else
91                         int port_id = mbuf->pkt.in_port;
92 #endif
93                         strlen = snprintf(tmp, sizeof(tmp), "pktdump,%d,%d\n", port_id,
94                               rte_pktmbuf_pkt_len(mbuf));
95                         input->reply(input, tmp, strlen);
96                         input->reply(input, rte_pktmbuf_mtod(mbuf, char *), rte_pktmbuf_pkt_len(mbuf));
97                         input->reply(input, "\n", 1);
98                 }
99                 tbase->aux->task_rt_dump.n_print_rx --;
100                 if (0 == tbase->aux->task_rt_dump.n_print_rx) {
101                         task_base_del_rx_pkt_function(tbase, rx_pkt_dump);
102                 }
103         }
104         if (unlikely(tbase->aux->task_rt_dump.n_trace)) {
105                 plogdx_info(mbuf, "RX: ");
106                 tbase->aux->task_rt_dump.n_trace--;
107         }
108 }
109
110 static uint16_t rx_pkt_hw_param(struct task_base *tbase, struct rte_mbuf ***mbufs_ptr, int multi,
111                                 void (*next)(struct rx_params_hw *rx_param_hw), int l3)
112 {
113         uint8_t last_read_portid;
114         uint16_t nb_rx;
115         int skip = 0;
116
117         START_EMPTY_MEASSURE();
118         *mbufs_ptr = tbase->ws_mbuf->mbuf[0] +
119                 (RTE_ALIGN_CEIL(tbase->ws_mbuf->idx[0].prod, 2) & WS_MBUF_MASK);
120
121         last_read_portid = tbase->rx_params_hw.last_read_portid;
122         struct port_queue *pq = &tbase->rx_params_hw.rx_pq[last_read_portid];
123
124         nb_rx = rx_pkt_hw_port_queue(pq, *mbufs_ptr, multi);
125         next(&tbase->rx_params_hw);
126
127         if (l3) {
128                 struct rte_mbuf **mbufs = *mbufs_ptr;
129                 int i;
130                 struct ether_hdr_arp *hdr_arp[MAX_PKT_BURST];
131                 prox_rte_ether_hdr *hdr;
132                 for (i = 0; i < nb_rx; i++) {
133                         PREFETCH0(mbufs[i]);
134                 }
135                 for (i = 0; i < nb_rx; i++) {
136                         hdr_arp[i] = rte_pktmbuf_mtod(mbufs[i], struct ether_hdr_arp *);
137                         PREFETCH0(hdr_arp[i]);
138                 }
139                 for (i = 0; i < nb_rx; i++) {
140                         if (likely(hdr_arp[i]->ether_hdr.ether_type == ETYPE_IPv4)) {
141                                 hdr = (prox_rte_ether_hdr *)hdr_arp[i];
142                                 prox_rte_ipv4_hdr *pip = (prox_rte_ipv4_hdr *)(hdr + 1);
143                                 prox_rte_tcp_hdr *tcp = (prox_rte_tcp_hdr *)(pip + 1);
144                                 if (pip->next_proto_id == IPPROTO_ICMP) {
145                                         dump_l3(tbase, mbufs[i]);
146                                         tx_ring(tbase, tbase->l3.ctrl_plane_ring, ICMP_TO_CTRL, mbufs[i]);
147                                         skip++;
148                                 } else if ((tcp->src_port == TCP_PORT_BGP) || (tcp->dst_port == TCP_PORT_BGP)) {
149                                         dump_l3(tbase, mbufs[i]);
150                                         tx_ring(tbase, tbase->l3.ctrl_plane_ring, BGP_TO_CTRL, mbufs[i]);
151                                         skip++;
152                                 } else if (unlikely(skip)) {
153                                         mbufs[i - skip] = mbufs[i];
154                                 }
155                         } else if (unlikely(hdr_arp[i]->ether_hdr.ether_type == ETYPE_ARP)) {
156                                 dump_l3(tbase, mbufs[i]);
157                                 tx_ring(tbase, tbase->l3.ctrl_plane_ring, ARP_TO_CTRL, mbufs[i]);
158                                 skip++;
159                         } else if (unlikely(skip)) {
160                                 mbufs[i - skip] = mbufs[i];
161                         }
162                 }
163         }
164
165         if (skip)
166                 TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, skip);
167         if (likely(nb_rx > 0)) {
168                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
169                 return nb_rx - skip;
170         }
171         TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
172         return 0;
173 }
174
175 static inline uint16_t rx_pkt_hw1_param(struct task_base *tbase, struct rte_mbuf ***mbufs_ptr, int multi, int l3)
176 {
177         uint16_t nb_rx, n;
178         int skip = 0;
179
180         START_EMPTY_MEASSURE();
181         *mbufs_ptr = tbase->ws_mbuf->mbuf[0] +
182                 (RTE_ALIGN_CEIL(tbase->ws_mbuf->idx[0].prod, 2) & WS_MBUF_MASK);
183
184         nb_rx = rte_eth_rx_burst(tbase->rx_params_hw1.rx_pq.port,
185                                  tbase->rx_params_hw1.rx_pq.queue,
186                                  *mbufs_ptr, MAX_PKT_BURST);
187
188         if (multi) {
189                 n = nb_rx;
190                 while ((n != 0) && (MAX_PKT_BURST - nb_rx >= MIN_PMD_RX)) {
191                         n = rte_eth_rx_burst(tbase->rx_params_hw1.rx_pq.port,
192                                  tbase->rx_params_hw1.rx_pq.queue,
193                                  *mbufs_ptr + nb_rx, MIN_PMD_RX);
194                         nb_rx += n;
195                         PROX_PANIC(nb_rx > 64, "Received %d packets while expecting maximum %d\n", n, MIN_PMD_RX);
196                 }
197         }
198
199         if (l3) {
200                 struct rte_mbuf **mbufs = *mbufs_ptr;
201                 int i;
202                 struct ether_hdr_arp *hdr_arp[MAX_PKT_BURST];
203                 prox_rte_ether_hdr *hdr;
204                 for (i = 0; i < nb_rx; i++) {
205                         PREFETCH0(mbufs[i]);
206                 }
207                 for (i = 0; i < nb_rx; i++) {
208                         hdr_arp[i] = rte_pktmbuf_mtod(mbufs[i], struct ether_hdr_arp *);
209                         PREFETCH0(hdr_arp[i]);
210                 }
211                 for (i = 0; i < nb_rx; i++) {
212                         // plog_info("ether_type = %x\n", hdr_arp[i]->ether_hdr.ether_type);
213                         if (likely(hdr_arp[i]->ether_hdr.ether_type == ETYPE_IPv4)) {
214                                 hdr = (prox_rte_ether_hdr *)hdr_arp[i];
215                                 prox_rte_ipv4_hdr *pip = (prox_rte_ipv4_hdr *)(hdr + 1);
216                                 prox_rte_tcp_hdr *tcp = (prox_rte_tcp_hdr *)(pip + 1);
217                                 if (pip->next_proto_id == IPPROTO_ICMP) {
218                                         dump_l3(tbase, mbufs[i]);
219                                         tx_ring(tbase, tbase->l3.ctrl_plane_ring, ICMP_TO_CTRL, mbufs[i]);
220                                         skip++;
221                                 } else if ((tcp->src_port == TCP_PORT_BGP) || (tcp->dst_port == TCP_PORT_BGP)) {
222                                         dump_l3(tbase, mbufs[i]);
223                                         tx_ring(tbase, tbase->l3.ctrl_plane_ring, BGP_TO_CTRL, mbufs[i]);
224                                         skip++;
225                                 } else if (unlikely(skip)) {
226                                         mbufs[i - skip] = mbufs[i];
227                                 }
228                         } else if (unlikely(hdr_arp[i]->ether_hdr.ether_type == ETYPE_ARP)) {
229                                 dump_l3(tbase, mbufs[i]);
230                                 tx_ring(tbase, tbase->l3.ctrl_plane_ring, ARP_TO_CTRL, mbufs[i]);
231                                 skip++;
232                         } else if (unlikely(skip)) {
233                                 mbufs[i - skip] = mbufs[i];
234                         }
235                 }
236         }
237
238         if (skip)
239                 TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, skip);
240         if (likely(nb_rx > 0)) {
241                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
242                 return nb_rx - skip;
243         }
244         TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
245         return 0;
246 }
247
248 uint16_t rx_pkt_hw(struct task_base *tbase, struct rte_mbuf ***mbufs)
249 {
250         return rx_pkt_hw_param(tbase, mbufs, 0, next_port, 0);
251 }
252
253 uint16_t rx_pkt_hw_pow2(struct task_base *tbase, struct rte_mbuf ***mbufs)
254 {
255         return rx_pkt_hw_param(tbase, mbufs, 0, next_port_pow2, 0);
256 }
257
258 uint16_t rx_pkt_hw1(struct task_base *tbase, struct rte_mbuf ***mbufs)
259 {
260         return rx_pkt_hw1_param(tbase, mbufs, 0, 0);
261 }
262
263 uint16_t rx_pkt_hw_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
264 {
265         return rx_pkt_hw_param(tbase, mbufs, 1, next_port, 0);
266 }
267
268 uint16_t rx_pkt_hw_pow2_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
269 {
270         return rx_pkt_hw_param(tbase, mbufs, 1, next_port_pow2, 0);
271 }
272
273 uint16_t rx_pkt_hw1_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
274 {
275         return rx_pkt_hw1_param(tbase, mbufs, 1, 0);
276 }
277
278 uint16_t rx_pkt_hw_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
279 {
280         return rx_pkt_hw_param(tbase, mbufs, 0, next_port, 1);
281 }
282
283 uint16_t rx_pkt_hw_pow2_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
284 {
285         return rx_pkt_hw_param(tbase, mbufs, 0, next_port_pow2, 1);
286 }
287
288 uint16_t rx_pkt_hw1_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
289 {
290         return rx_pkt_hw1_param(tbase, mbufs, 0, 1);
291 }
292
293 uint16_t rx_pkt_hw_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
294 {
295         return rx_pkt_hw_param(tbase, mbufs, 1, next_port, 1);
296 }
297
298 uint16_t rx_pkt_hw_pow2_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
299 {
300         return rx_pkt_hw_param(tbase, mbufs, 1, next_port_pow2, 1);
301 }
302
303 uint16_t rx_pkt_hw1_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
304 {
305         return rx_pkt_hw1_param(tbase, mbufs, 1, 1);
306 }
307
308 /* The following functions implement ring access */
309 uint16_t ring_deq(struct rte_ring *r, struct rte_mbuf **mbufs)
310 {
311         void **v_mbufs = (void **)mbufs;
312 #ifdef BRAS_RX_BULK
313 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
314         return rte_ring_sc_dequeue_bulk(r, v_mbufs, MAX_RING_BURST) < 0? 0 : MAX_RING_BURST;
315 #else
316         return rte_ring_sc_dequeue_bulk(r, v_mbufs, MAX_RING_BURST, NULL);
317 #endif
318 #else
319 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
320         return rte_ring_sc_dequeue_burst(r, v_mbufs, MAX_RING_BURST);
321 #else
322         return rte_ring_sc_dequeue_burst(r, v_mbufs, MAX_RING_BURST, NULL);
323 #endif
324 #endif
325 }
326
327 uint16_t rx_pkt_sw(struct task_base *tbase, struct rte_mbuf ***mbufs)
328 {
329         START_EMPTY_MEASSURE();
330         *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
331         uint8_t lr = tbase->rx_params_sw.last_read_ring;
332         uint16_t nb_rx;
333
334         do {
335                 nb_rx = ring_deq(tbase->rx_params_sw.rx_rings[lr], *mbufs);
336                 lr = lr + 1 == tbase->rx_params_sw.nb_rxrings? 0 : lr + 1;
337         } while(!nb_rx && lr != tbase->rx_params_sw.last_read_ring);
338
339         tbase->rx_params_sw.last_read_ring = lr;
340
341         if (nb_rx != 0) {
342                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
343                 return nb_rx;
344         }
345         else {
346                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
347                 return 0;
348         }
349 }
350
351 /* Same as rx_pkt_sw expect with a mask for the number of receive
352    rings (can only be used if nb_rxring is a power of 2). */
353 uint16_t rx_pkt_sw_pow2(struct task_base *tbase, struct rte_mbuf ***mbufs)
354 {
355         START_EMPTY_MEASSURE();
356         *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
357         uint8_t lr = tbase->rx_params_sw.last_read_ring;
358         uint16_t nb_rx;
359
360         do {
361                 nb_rx = ring_deq(tbase->rx_params_sw.rx_rings[lr], *mbufs);
362                 lr = (lr + 1) & tbase->rx_params_sw.rxrings_mask;
363         } while(!nb_rx && lr != tbase->rx_params_sw.last_read_ring);
364
365         tbase->rx_params_sw.last_read_ring = lr;
366
367         if (nb_rx != 0) {
368                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
369                 return nb_rx;
370         }
371         else {
372                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
373                 return 0;
374         }
375 }
376
377 uint16_t rx_pkt_self(struct task_base *tbase, struct rte_mbuf ***mbufs)
378 {
379         START_EMPTY_MEASSURE();
380         uint16_t nb_rx = tbase->ws_mbuf->idx[0].nb_rx;
381         if (nb_rx) {
382                 tbase->ws_mbuf->idx[0].nb_rx = 0;
383                 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
384                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
385                 return nb_rx;
386         }
387         else {
388                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
389                 return 0;
390         }
391 }
392
393 /* Used for tasks that do not receive packets (i.e. Packet
394 generation).  Always returns 1 but never returns packets and does not
395 increment statistics. This function allows to use the same code path
396 as for tasks that actually receive packets. */
397 uint16_t rx_pkt_dummy(__attribute__((unused)) struct task_base *tbase,
398                       __attribute__((unused)) struct rte_mbuf ***mbufs)
399 {
400         return 1;
401 }
402
403 /* After the system has been configured, it is known if there is only
404    one RX ring. If this is the case, a more specialized version of the
405    function above can be used to save cycles. */
406 uint16_t rx_pkt_sw1(struct task_base *tbase, struct rte_mbuf ***mbufs)
407 {
408         START_EMPTY_MEASSURE();
409         *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
410         uint16_t nb_rx = ring_deq(tbase->rx_params_sw1.rx_ring, *mbufs);
411
412         if (nb_rx != 0) {
413                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
414                 return nb_rx;
415         }
416         else {
417                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
418                 return 0;
419         }
420 }
421
422 static uint16_t call_prev_rx_pkt(struct task_base *tbase, struct rte_mbuf ***mbufs)
423 {
424         uint16_t ret;
425
426         tbase->aux->rx_prev_idx++;
427         ret = tbase->aux->rx_pkt_prev[tbase->aux->rx_prev_idx - 1](tbase, mbufs);
428         tbase->aux->rx_prev_idx--;
429
430         return ret;
431 }
432
433 /* Only used when there are packets to be dumped. This function is
434    meant as a debugging tool and is therefore not optimized. When the
435    number of packets to dump falls back to 0, the original (optimized)
436    rx function is restored. This allows to support dumping packets
437    without any performance impact if the feature is not used. */
438 uint16_t rx_pkt_dump(struct task_base *tbase, struct rte_mbuf ***mbufs)
439 {
440         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
441
442         if (ret) {
443                 uint32_t n_dump = tbase->aux->task_rt_dump.n_print_rx;
444                 n_dump = ret < n_dump? ret : n_dump;
445
446                 if ((tbase->aux->task_rt_dump.input == NULL) || (tbase->aux->task_rt_dump.input->reply == NULL)) {
447                         for (uint32_t i = 0; i < n_dump; ++i) {
448                                 plogdx_info((*mbufs)[i], "RX: ");
449                         }
450                 }
451                 else {
452                         struct input *input = tbase->aux->task_rt_dump.input;
453
454                         for (uint32_t i = 0; i < n_dump; ++i) {
455                                 /* TODO: Execute callback with full
456                                    data in a single call. */
457                                 char tmp[128];
458                                 int strlen;
459
460 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
461                                 int port_id = ((*mbufs)[i])->port;
462 #else
463                                 int port_id = ((*mbufs)[i])->pkt.in_port;
464 #endif
465                                 strlen = snprintf(tmp, sizeof(tmp), "pktdump,%d,%d\n", port_id,
466                                                       rte_pktmbuf_pkt_len((*mbufs)[i]));
467
468                                 input->reply(input, tmp, strlen);
469                                 input->reply(input, rte_pktmbuf_mtod((*mbufs)[i], char *), rte_pktmbuf_pkt_len((*mbufs)[i]));
470                                 input->reply(input, "\n", 1);
471                         }
472                 }
473
474                 tbase->aux->task_rt_dump.n_print_rx -= n_dump;
475
476                 if (0 == tbase->aux->task_rt_dump.n_print_rx) {
477                         task_base_del_rx_pkt_function(tbase, rx_pkt_dump);
478                 }
479         }
480         return ret;
481 }
482
483 uint16_t rx_pkt_trace(struct task_base *tbase, struct rte_mbuf ***mbufs)
484 {
485         tbase->aux->task_rt_dump.cur_trace = 0;
486         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
487
488         if (ret) {
489                 uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
490                 n_trace = ret < n_trace? ret : n_trace;
491                 n_trace = n_trace <= MAX_RING_BURST ? n_trace : MAX_RING_BURST;
492
493                 for (uint32_t i = 0; i < n_trace; ++i) {
494                         uint8_t *pkt = rte_pktmbuf_mtod((*mbufs)[i], uint8_t *);
495                         rte_memcpy(tbase->aux->task_rt_dump.pkt_cpy[i], pkt, sizeof(tbase->aux->task_rt_dump.pkt_cpy[i]));
496                         tbase->aux->task_rt_dump.pkt_cpy_len[i] = rte_pktmbuf_pkt_len((*mbufs)[i]);
497                         tbase->aux->task_rt_dump.pkt_mbuf_addr[i] = (*mbufs)[i];
498                 }
499                 tbase->aux->task_rt_dump.cur_trace += n_trace;
500
501                 tbase->aux->task_rt_dump.n_trace -= n_trace;
502                 /* Unset by TX when n_trace = 0 */
503         }
504         return ret;
505 }
506
507 /* Gather the distribution of the number of packets that have been
508    received from one RX call. Since the value is only modified by the
509    task that receives the packet, no atomic operation is needed. */
510 uint16_t rx_pkt_distr(struct task_base *tbase, struct rte_mbuf ***mbufs)
511 {
512         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
513
514         if (likely(ret < RX_BUCKET_SIZE))
515                 tbase->aux->rx_bucket[ret]++;
516         else
517                 tbase->aux->rx_bucket[RX_BUCKET_SIZE - 1]++;
518         return ret;
519 }
520
521 uint16_t rx_pkt_bw(struct task_base *tbase, struct rte_mbuf ***mbufs)
522 {
523         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
524         uint32_t tot_bytes = 0;
525
526         for (uint16_t i = 0; i < ret; ++i) {
527                 tot_bytes += mbuf_wire_size((*mbufs)[i]);
528         }
529
530         TASK_STATS_ADD_RX_BYTES(&tbase->aux->stats, tot_bytes);
531
532         return ret;
533 }
534
535 uint16_t rx_pkt_tsc(struct task_base *tbase, struct rte_mbuf ***mbufs)
536 {
537         uint64_t before = rte_rdtsc();
538         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
539         uint64_t after = rte_rdtsc();
540
541         tbase->aux->tsc_rx.before = before;
542         tbase->aux->tsc_rx.after = after;
543
544         return ret;
545 }