Support packets in flight
[samplevnf.git] / VNFs / DPPD-PROX / rx_pkt.c
1 /*
2 // Copyright (c) 2010-2020 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_cycles.h>
18 #include <rte_ethdev.h>
19 #include <rte_version.h>
20
21 #include "rx_pkt.h"
22 #include "task_base.h"
23 #include "clock.h"
24 #include "stats.h"
25 #include "log.h"
26 #include "mbuf_utils.h"
27 #include "prefetch.h"
28 #include "arp.h"
29 #include "tx_pkt.h"
30 #include "handle_master.h"
31 #include "input.h"
32 #include "prox_ipv6.h" /* Needed for callback on dump */
33
34 #define TCP_PORT_BGP    rte_cpu_to_be_16(179)
35
36 /* _param version of the rx_pkt_hw functions are used to create two
37    instances of very similar variations of these functions. The
38    variations are specified by the "multi" parameter which significies
39    that the rte_eth_rx_burst function should be called multiple times.
40    The reason for this is that with the vector PMD, the maximum number
41    of packets being returned is 32. If packets have been split in
42    multiple mbufs then rte_eth_rx_burst might even receive less than
43    32 packets.
44    Some algorithms (like QoS) only work correctly if more than 32
45    packets are received if the dequeue step involves finding 32 packets.
46 */
47
48 #define MIN_PMD_RX      32
49 #define PROX_L3         1
50 #define PROX_NDP        2
51
52 static uint16_t rx_pkt_hw_port_queue(struct port_queue *pq, struct rte_mbuf **mbufs, int multi)
53 {
54         uint16_t nb_rx, n;
55
56         nb_rx = rte_eth_rx_burst(pq->port, pq->queue, mbufs, MAX_PKT_BURST);
57
58         if (multi) {
59                 n = nb_rx;
60                 while (n != 0 && MAX_PKT_BURST - nb_rx >= MIN_PMD_RX) {
61                         n = rte_eth_rx_burst(pq->port, pq->queue, mbufs + nb_rx, MIN_PMD_RX);
62                         nb_rx += n;
63                         PROX_PANIC(nb_rx > 64, "Received %d packets while expecting maximum %d\n", n, MIN_PMD_RX);
64                 }
65         }
66         return nb_rx;
67 }
68
69 static void next_port(struct rx_params_hw *rx_params_hw)
70 {
71         ++rx_params_hw->last_read_portid;
72         if (unlikely(rx_params_hw->last_read_portid == rx_params_hw->nb_rxports)) {
73                 rx_params_hw->last_read_portid = 0;
74         }
75 }
76
77 static void next_port_pow2(struct rx_params_hw *rx_params_hw)
78 {
79         rx_params_hw->last_read_portid = (rx_params_hw->last_read_portid + 1) & rx_params_hw->rxport_mask;
80 }
81
82 static inline void dump_l3(struct task_base *tbase, struct rte_mbuf *mbuf)
83 {
84         if (unlikely(tbase->aux->task_rt_dump.n_print_rx)) {
85                 if ((tbase->aux->task_rt_dump.input == NULL) || (tbase->aux->task_rt_dump.input->reply == NULL)) {
86                         plogdx_info(mbuf, "RX: ");
87                 } else {
88                         struct input *input = tbase->aux->task_rt_dump.input;
89                         char tmp[128];
90                         int strlen;
91 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
92                         int port_id = mbuf->port;
93 #else
94                         int port_id = mbuf->pkt.in_port;
95 #endif
96                         strlen = snprintf(tmp, sizeof(tmp), "pktdump,%d,%d\n", port_id,
97                               rte_pktmbuf_pkt_len(mbuf));
98                         input->reply(input, tmp, strlen);
99                         input->reply(input, rte_pktmbuf_mtod(mbuf, char *), rte_pktmbuf_pkt_len(mbuf));
100                         input->reply(input, "\n", 1);
101                 }
102                 tbase->aux->task_rt_dump.n_print_rx --;
103                 if (0 == tbase->aux->task_rt_dump.n_print_rx) {
104                         task_base_del_rx_pkt_function(tbase, rx_pkt_dump);
105                 }
106         }
107         if (unlikely(tbase->aux->task_rt_dump.n_trace)) {
108                 plogdx_info(mbuf, "RX: ");
109                 tbase->aux->task_rt_dump.n_trace--;
110         }
111 }
112
113 static inline void handle_ipv4(struct task_base *tbase, struct rte_mbuf **mbufs, int i, prox_rte_ipv4_hdr *pip, int *skip)
114 {
115         prox_rte_tcp_hdr *tcp = (prox_rte_tcp_hdr *)(pip + 1);
116         if (pip->next_proto_id == IPPROTO_ICMP) {
117                 dump_l3(tbase, mbufs[i]);
118                 tx_ring(tbase, tbase->l3.ctrl_plane_ring, ICMP_TO_MASTER, mbufs[i]);
119                 (*skip)++;
120         } else if ((tcp->src_port == TCP_PORT_BGP) || (tcp->dst_port == TCP_PORT_BGP)) {
121                 dump_l3(tbase, mbufs[i]);
122                 tx_ring(tbase, tbase->l3.ctrl_plane_ring, BGP_TO_MASTER, mbufs[i]);
123                 (*skip)++;
124         } else if (unlikely(*skip)) {
125                 mbufs[i - *skip] = mbufs[i];
126         }
127 }
128 static inline int handle_l3(struct task_base *tbase, uint16_t nb_rx, struct rte_mbuf ***mbufs_ptr)
129 {
130         struct rte_mbuf **mbufs = *mbufs_ptr;
131         int i;
132         struct ether_hdr_arp *hdr_arp[MAX_PKT_BURST];
133         prox_rte_ether_hdr *hdr;
134         prox_rte_ipv4_hdr *pip;
135         prox_rte_vlan_hdr *vlan;
136         int skip = 0;
137
138         for (i = 0; i < nb_rx; i++) {
139                 PREFETCH0(mbufs[i]);
140         }
141
142         for (i = 0; i < nb_rx; i++) {
143                 hdr_arp[i] = rte_pktmbuf_mtod(mbufs[i], struct ether_hdr_arp *);
144                 PREFETCH0(hdr_arp[i]);
145         }
146         for (i = 0; i < nb_rx; i++) {
147                 if (likely(hdr_arp[i]->ether_hdr.ether_type == ETYPE_IPv4)) {
148                         hdr = (prox_rte_ether_hdr *)hdr_arp[i];
149                         pip = (prox_rte_ipv4_hdr *)(hdr + 1);
150                         handle_ipv4(tbase, mbufs, i, pip, &skip);
151                 } else {
152                         switch (hdr_arp[i]->ether_hdr.ether_type) {
153                         case ETYPE_VLAN:
154                                 hdr = (prox_rte_ether_hdr *)hdr_arp[i];
155                                 vlan = (prox_rte_vlan_hdr *)(hdr + 1);
156                                 if (vlan->eth_proto == ETYPE_IPv4) {
157                                         pip = (prox_rte_ipv4_hdr *)(vlan + 1);
158                                         handle_ipv4(tbase, mbufs, i, pip, &skip);
159                                 } else if (vlan->eth_proto == ETYPE_ARP) {
160                                         dump_l3(tbase, mbufs[i]);
161                                         tx_ring(tbase, tbase->l3.ctrl_plane_ring, ARP_PKT_FROM_NET_TO_MASTER, mbufs[i]);
162                                         skip++;
163                                 }
164                                 break;
165                         case ETYPE_ARP:
166                                 dump_l3(tbase, mbufs[i]);
167                                 tx_ring(tbase, tbase->l3.ctrl_plane_ring, ARP_PKT_FROM_NET_TO_MASTER, mbufs[i]);
168                                 skip++;
169                                 break;
170                         default:
171                                 if (unlikely(skip)) {
172                                         mbufs[i - skip] = mbufs[i];
173                                 }
174                         }
175                 }
176         }
177         return skip;
178 }
179
180 static inline int handle_ndp(struct task_base *tbase, uint16_t nb_rx, struct rte_mbuf ***mbufs_ptr)
181 {
182         struct rte_mbuf **mbufs = *mbufs_ptr;
183         prox_rte_ipv6_hdr *ipv6_hdr;
184         int i;
185         prox_rte_ether_hdr *hdr[MAX_PKT_BURST];
186         int skip = 0;
187         uint16_t vlan = 0;
188
189         for (i = 0; i < nb_rx; i++) {
190                 PREFETCH0(mbufs[i]);
191         }
192         for (i = 0; i < nb_rx; i++) {
193                 hdr[i] = rte_pktmbuf_mtod(mbufs[i], prox_rte_ether_hdr *);
194                 PREFETCH0(hdr[i]);
195         }
196         for (i = 0; i < nb_rx; i++) {
197                 ipv6_hdr = prox_get_ipv6_hdr(hdr[i], rte_pktmbuf_pkt_len(mbufs[i]), &vlan);
198                 if (unlikely((ipv6_hdr) && (ipv6_hdr->proto == ICMPv6))) {
199                         dump_l3(tbase, mbufs[i]);
200                         tx_ring(tbase, tbase->l3.ctrl_plane_ring, NDP_PKT_FROM_NET_TO_MASTER, mbufs[i]);
201                         skip++;
202                 } else if (unlikely(skip)) {
203                         mbufs[i - skip] = mbufs[i];
204                 }
205         }
206         return skip;
207 }
208
209 static uint16_t rx_pkt_hw_param(struct task_base *tbase, struct rte_mbuf ***mbufs_ptr, int multi,
210                                 void (*next)(struct rx_params_hw *rx_param_hw), int l3_ndp)
211 {
212         uint8_t last_read_portid;
213         uint16_t nb_rx, ret;
214         int skip = 0;
215
216         START_EMPTY_MEASSURE();
217         *mbufs_ptr = tbase->ws_mbuf->mbuf[0] +
218                 (RTE_ALIGN_CEIL(tbase->ws_mbuf->idx[0].prod, 2) & WS_MBUF_MASK);
219
220         last_read_portid = tbase->rx_params_hw.last_read_portid;
221         struct port_queue *pq = &tbase->rx_params_hw.rx_pq[last_read_portid];
222
223         nb_rx = rx_pkt_hw_port_queue(pq, *mbufs_ptr, multi);
224         next(&tbase->rx_params_hw);
225
226         if (l3_ndp == PROX_L3)
227                 skip = handle_l3(tbase, nb_rx, mbufs_ptr);
228         else if (l3_ndp == PROX_NDP)
229                 skip = handle_ndp(tbase, nb_rx, mbufs_ptr);
230
231         if (skip)
232                 TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, skip);
233         if (likely(nb_rx > 0)) {
234                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
235                 return nb_rx - skip;
236         }
237         TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
238         return 0;
239 }
240
241 static inline uint16_t rx_pkt_hw1_param(struct task_base *tbase, struct rte_mbuf ***mbufs_ptr, int multi, int l3_ndp)
242 {
243         uint16_t nb_rx, n;
244         int skip = 0;
245
246         START_EMPTY_MEASSURE();
247         *mbufs_ptr = tbase->ws_mbuf->mbuf[0] +
248                 (RTE_ALIGN_CEIL(tbase->ws_mbuf->idx[0].prod, 2) & WS_MBUF_MASK);
249
250         nb_rx = rte_eth_rx_burst(tbase->rx_params_hw1.rx_pq.port,
251                                  tbase->rx_params_hw1.rx_pq.queue,
252                                  *mbufs_ptr, MAX_PKT_BURST);
253
254         if (multi) {
255                 n = nb_rx;
256                 while ((n != 0) && (MAX_PKT_BURST - nb_rx >= MIN_PMD_RX)) {
257                         n = rte_eth_rx_burst(tbase->rx_params_hw1.rx_pq.port,
258                                  tbase->rx_params_hw1.rx_pq.queue,
259                                  *mbufs_ptr + nb_rx, MIN_PMD_RX);
260                         nb_rx += n;
261                         PROX_PANIC(nb_rx > 64, "Received %d packets while expecting maximum %d\n", n, MIN_PMD_RX);
262                 }
263         }
264
265         if (unlikely(nb_rx == 0)) {
266                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
267                 return 0;
268         }
269
270         if (l3_ndp == PROX_L3)
271                 skip = handle_l3(tbase, nb_rx, mbufs_ptr);
272         else if (l3_ndp == PROX_NDP)
273                 skip = handle_ndp(tbase, nb_rx, mbufs_ptr);
274
275         if (skip)
276                 TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, skip);
277
278         TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
279         return nb_rx - skip;
280 }
281
282 uint16_t rx_pkt_hw(struct task_base *tbase, struct rte_mbuf ***mbufs)
283 {
284         return rx_pkt_hw_param(tbase, mbufs, 0, next_port, 0);
285 }
286
287 uint16_t rx_pkt_hw_pow2(struct task_base *tbase, struct rte_mbuf ***mbufs)
288 {
289         return rx_pkt_hw_param(tbase, mbufs, 0, next_port_pow2, 0);
290 }
291
292 uint16_t rx_pkt_hw1(struct task_base *tbase, struct rte_mbuf ***mbufs)
293 {
294         return rx_pkt_hw1_param(tbase, mbufs, 0, 0);
295 }
296
297 uint16_t rx_pkt_hw_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
298 {
299         return rx_pkt_hw_param(tbase, mbufs, 1, next_port, 0);
300 }
301
302 uint16_t rx_pkt_hw_pow2_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
303 {
304         return rx_pkt_hw_param(tbase, mbufs, 1, next_port_pow2, 0);
305 }
306
307 uint16_t rx_pkt_hw1_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
308 {
309         return rx_pkt_hw1_param(tbase, mbufs, 1, 0);
310 }
311
312 uint16_t rx_pkt_hw_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
313 {
314         return rx_pkt_hw_param(tbase, mbufs, 0, next_port, PROX_L3);
315 }
316
317 uint16_t rx_pkt_hw_ndp(struct task_base *tbase, struct rte_mbuf ***mbufs)
318 {
319         return rx_pkt_hw_param(tbase, mbufs, 0, next_port, PROX_NDP);
320 }
321
322 uint16_t rx_pkt_hw_pow2_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
323 {
324         return rx_pkt_hw_param(tbase, mbufs, 0, next_port_pow2, PROX_L3);
325 }
326
327 uint16_t rx_pkt_hw_pow2_ndp(struct task_base *tbase, struct rte_mbuf ***mbufs)
328 {
329         return rx_pkt_hw_param(tbase, mbufs, 0, next_port_pow2, PROX_NDP);
330 }
331
332 uint16_t rx_pkt_hw1_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
333 {
334         return rx_pkt_hw1_param(tbase, mbufs, 0, PROX_L3);
335 }
336
337 uint16_t rx_pkt_hw1_ndp(struct task_base *tbase, struct rte_mbuf ***mbufs)
338 {
339         return rx_pkt_hw1_param(tbase, mbufs, 0, PROX_NDP);
340 }
341
342 uint16_t rx_pkt_hw_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
343 {
344         return rx_pkt_hw_param(tbase, mbufs, 1, next_port, PROX_L3);
345 }
346
347 uint16_t rx_pkt_hw_multi_ndp(struct task_base *tbase, struct rte_mbuf ***mbufs)
348 {
349         return rx_pkt_hw_param(tbase, mbufs, 1, next_port, PROX_NDP);
350 }
351
352 uint16_t rx_pkt_hw_pow2_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
353 {
354         return rx_pkt_hw_param(tbase, mbufs, 1, next_port_pow2, PROX_L3);
355 }
356
357 uint16_t rx_pkt_hw_pow2_multi_ndp(struct task_base *tbase, struct rte_mbuf ***mbufs)
358 {
359         return rx_pkt_hw_param(tbase, mbufs, 1, next_port_pow2, PROX_NDP);
360 }
361
362 uint16_t rx_pkt_hw1_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
363 {
364         return rx_pkt_hw1_param(tbase, mbufs, 1, PROX_L3);
365 }
366
367 uint16_t rx_pkt_hw1_multi_ndp(struct task_base *tbase, struct rte_mbuf ***mbufs)
368 {
369         return rx_pkt_hw1_param(tbase, mbufs, 1, PROX_NDP);
370 }
371
372 /* The following functions implement ring access */
373 uint16_t ring_deq(struct rte_ring *r, struct rte_mbuf **mbufs)
374 {
375         void **v_mbufs = (void **)mbufs;
376 #ifdef BRAS_RX_BULK
377 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
378         return rte_ring_sc_dequeue_bulk(r, v_mbufs, MAX_RING_BURST) < 0? 0 : MAX_RING_BURST;
379 #else
380         return rte_ring_sc_dequeue_bulk(r, v_mbufs, MAX_RING_BURST, NULL);
381 #endif
382 #else
383 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
384         return rte_ring_sc_dequeue_burst(r, v_mbufs, MAX_RING_BURST);
385 #else
386         return rte_ring_sc_dequeue_burst(r, v_mbufs, MAX_RING_BURST, NULL);
387 #endif
388 #endif
389 }
390
391 uint16_t rx_pkt_sw(struct task_base *tbase, struct rte_mbuf ***mbufs)
392 {
393         START_EMPTY_MEASSURE();
394         *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
395         uint8_t lr = tbase->rx_params_sw.last_read_ring;
396         uint16_t nb_rx;
397
398         do {
399                 nb_rx = ring_deq(tbase->rx_params_sw.rx_rings[lr], *mbufs);
400                 lr = lr + 1 == tbase->rx_params_sw.nb_rxrings? 0 : lr + 1;
401         } while(!nb_rx && lr != tbase->rx_params_sw.last_read_ring);
402
403         tbase->rx_params_sw.last_read_ring = lr;
404
405         if (nb_rx != 0) {
406                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
407                 return nb_rx;
408         }
409         else {
410                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
411                 return 0;
412         }
413 }
414
415 /* Same as rx_pkt_sw expect with a mask for the number of receive
416    rings (can only be used if nb_rxring is a power of 2). */
417 uint16_t rx_pkt_sw_pow2(struct task_base *tbase, struct rte_mbuf ***mbufs)
418 {
419         START_EMPTY_MEASSURE();
420         *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
421         uint8_t lr = tbase->rx_params_sw.last_read_ring;
422         uint16_t nb_rx;
423
424         do {
425                 nb_rx = ring_deq(tbase->rx_params_sw.rx_rings[lr], *mbufs);
426                 lr = (lr + 1) & tbase->rx_params_sw.rxrings_mask;
427         } while(!nb_rx && lr != tbase->rx_params_sw.last_read_ring);
428
429         tbase->rx_params_sw.last_read_ring = lr;
430
431         if (nb_rx != 0) {
432                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
433                 return nb_rx;
434         }
435         else {
436                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
437                 return 0;
438         }
439 }
440
441 uint16_t rx_pkt_self(struct task_base *tbase, struct rte_mbuf ***mbufs)
442 {
443         START_EMPTY_MEASSURE();
444         uint16_t nb_rx = tbase->ws_mbuf->idx[0].nb_rx;
445         if (nb_rx) {
446                 tbase->ws_mbuf->idx[0].nb_rx = 0;
447                 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
448                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
449                 return nb_rx;
450         }
451         else {
452                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
453                 return 0;
454         }
455 }
456
457 /* Used for tasks that do not receive packets (i.e. Packet
458 generation).  Always returns 1 but never returns packets and does not
459 increment statistics. This function allows to use the same code path
460 as for tasks that actually receive packets. */
461 uint16_t rx_pkt_dummy(__attribute__((unused)) struct task_base *tbase,
462                       __attribute__((unused)) struct rte_mbuf ***mbufs)
463 {
464         return 1;
465 }
466
467 /* After the system has been configured, it is known if there is only
468    one RX ring. If this is the case, a more specialized version of the
469    function above can be used to save cycles. */
470 uint16_t rx_pkt_sw1(struct task_base *tbase, struct rte_mbuf ***mbufs)
471 {
472         START_EMPTY_MEASSURE();
473         *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
474         uint16_t nb_rx = ring_deq(tbase->rx_params_sw1.rx_ring, *mbufs);
475
476         if (nb_rx != 0) {
477                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
478                 return nb_rx;
479         }
480         else {
481                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
482                 return 0;
483         }
484 }
485
486 static uint16_t call_prev_rx_pkt(struct task_base *tbase, struct rte_mbuf ***mbufs)
487 {
488         uint16_t ret;
489
490         tbase->aux->rx_prev_idx++;
491         ret = tbase->aux->rx_pkt_prev[tbase->aux->rx_prev_idx - 1](tbase, mbufs);
492         tbase->aux->rx_prev_idx--;
493
494         return ret;
495 }
496
497 /* Only used when there are packets to be dumped. This function is
498    meant as a debugging tool and is therefore not optimized. When the
499    number of packets to dump falls back to 0, the original (optimized)
500    rx function is restored. This allows to support dumping packets
501    without any performance impact if the feature is not used. */
502 uint16_t rx_pkt_dump(struct task_base *tbase, struct rte_mbuf ***mbufs)
503 {
504         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
505
506         if (ret) {
507                 uint32_t n_dump = tbase->aux->task_rt_dump.n_print_rx;
508                 n_dump = ret < n_dump? ret : n_dump;
509
510                 if ((tbase->aux->task_rt_dump.input == NULL) || (tbase->aux->task_rt_dump.input->reply == NULL)) {
511                         for (uint32_t i = 0; i < n_dump; ++i) {
512                                 plogdx_info((*mbufs)[i], "RX: ");
513                         }
514                 }
515                 else {
516                         struct input *input = tbase->aux->task_rt_dump.input;
517
518                         for (uint32_t i = 0; i < n_dump; ++i) {
519                                 /* TODO: Execute callback with full
520                                    data in a single call. */
521                                 char tmp[128];
522                                 int strlen;
523
524 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
525                                 int port_id = ((*mbufs)[i])->port;
526 #else
527                                 int port_id = ((*mbufs)[i])->pkt.in_port;
528 #endif
529                                 strlen = snprintf(tmp, sizeof(tmp), "pktdump,%d,%d\n", port_id,
530                                                       rte_pktmbuf_pkt_len((*mbufs)[i]));
531
532                                 input->reply(input, tmp, strlen);
533                                 input->reply(input, rte_pktmbuf_mtod((*mbufs)[i], char *), rte_pktmbuf_pkt_len((*mbufs)[i]));
534                                 input->reply(input, "\n", 1);
535                         }
536                 }
537
538                 tbase->aux->task_rt_dump.n_print_rx -= n_dump;
539
540                 if (0 == tbase->aux->task_rt_dump.n_print_rx) {
541                         task_base_del_rx_pkt_function(tbase, rx_pkt_dump);
542                 }
543         }
544         return ret;
545 }
546
547 uint16_t rx_pkt_trace(struct task_base *tbase, struct rte_mbuf ***mbufs)
548 {
549         tbase->aux->task_rt_dump.cur_trace = 0;
550         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
551
552         if (ret) {
553                 uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
554                 n_trace = ret < n_trace? ret : n_trace;
555                 n_trace = n_trace <= MAX_RING_BURST ? n_trace : MAX_RING_BURST;
556
557                 for (uint32_t i = 0; i < n_trace; ++i) {
558                         uint8_t *pkt = rte_pktmbuf_mtod((*mbufs)[i], uint8_t *);
559                         rte_memcpy(tbase->aux->task_rt_dump.pkt_cpy[i], pkt, sizeof(tbase->aux->task_rt_dump.pkt_cpy[i]));
560                         tbase->aux->task_rt_dump.pkt_cpy_len[i] = rte_pktmbuf_pkt_len((*mbufs)[i]);
561                         tbase->aux->task_rt_dump.pkt_mbuf_addr[i] = (*mbufs)[i];
562                 }
563                 tbase->aux->task_rt_dump.cur_trace += n_trace;
564
565                 tbase->aux->task_rt_dump.n_trace -= n_trace;
566                 /* Unset by TX when n_trace = 0 */
567         }
568         return ret;
569 }
570
571 /* Gather the distribution of the number of packets that have been
572    received from one RX call. Since the value is only modified by the
573    task that receives the packet, no atomic operation is needed. */
574 uint16_t rx_pkt_distr(struct task_base *tbase, struct rte_mbuf ***mbufs)
575 {
576         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
577
578         if (likely(ret < RX_BUCKET_SIZE))
579                 tbase->aux->rx_bucket[ret]++;
580         else
581                 tbase->aux->rx_bucket[RX_BUCKET_SIZE - 1]++;
582         return ret;
583 }
584
585 uint16_t rx_pkt_bw(struct task_base *tbase, struct rte_mbuf ***mbufs)
586 {
587         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
588         uint32_t tot_bytes = 0;
589
590         for (uint16_t i = 0; i < ret; ++i) {
591                 tot_bytes += mbuf_wire_size((*mbufs)[i]);
592         }
593
594         TASK_STATS_ADD_RX_BYTES(&tbase->aux->stats, tot_bytes);
595
596         return ret;
597 }
598
599 uint16_t rx_pkt_tsc(struct task_base *tbase, struct rte_mbuf ***mbufs)
600 {
601         uint64_t before = rte_rdtsc();
602         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
603         uint64_t after = rte_rdtsc();
604
605         tbase->aux->tsc_rx.before = before;
606         tbase->aux->tsc_rx.after = after;
607
608         return ret;
609 }