Add support for counting non dataplane related packets
[samplevnf.git] / VNFs / DPPD-PROX / rx_pkt.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_cycles.h>
18 #include <rte_ethdev.h>
19 #include <rte_version.h>
20
21 #include "rx_pkt.h"
22 #include "task_base.h"
23 #include "clock.h"
24 #include "stats.h"
25 #include "log.h"
26 #include "mbuf_utils.h"
27 #include "prefetch.h"
28 #include "arp.h"
29 #include "tx_pkt.h"
30 #include "handle_master.h"
31 #include "input.h" /* Needed for callback on dump */
32
33 /* _param version of the rx_pkt_hw functions are used to create two
34    instances of very similar variations of these functions. The
35    variations are specified by the "multi" parameter which significies
36    that the rte_eth_rx_burst function should be called multiple times.
37    The reason for this is that with the vector PMD, the maximum number
38    of packets being returned is 32. If packets have been split in
39    multiple mbufs then rte_eth_rx_burst might even receive less than
40    32 packets.
41    Some algorithms (like QoS) only work correctly if more than 32
42    packets are received if the dequeue step involves finding 32 packets.
43 */
44
45 #define MIN_PMD_RX 32
46
47 static uint16_t rx_pkt_hw_port_queue(struct port_queue *pq, struct rte_mbuf **mbufs, int multi)
48 {
49         uint16_t nb_rx, n;
50
51         nb_rx = rte_eth_rx_burst(pq->port, pq->queue, mbufs, MAX_PKT_BURST);
52
53         if (multi) {
54                 n = nb_rx;
55                 while (n != 0 && MAX_PKT_BURST - nb_rx >= MIN_PMD_RX) {
56                         n = rte_eth_rx_burst(pq->port, pq->queue, mbufs + nb_rx, MIN_PMD_RX);
57                         nb_rx += n;
58                         PROX_PANIC(nb_rx > 64, "Received %d packets while expecting maximum %d\n", n, MIN_PMD_RX);
59                 }
60         }
61         return nb_rx;
62 }
63
64 static void next_port(struct rx_params_hw *rx_params_hw)
65 {
66         ++rx_params_hw->last_read_portid;
67         if (unlikely(rx_params_hw->last_read_portid == rx_params_hw->nb_rxports)) {
68                 rx_params_hw->last_read_portid = 0;
69         }
70 }
71
72 static void next_port_pow2(struct rx_params_hw *rx_params_hw)
73 {
74         rx_params_hw->last_read_portid = (rx_params_hw->last_read_portid + 1) & rx_params_hw->rxport_mask;
75 }
76
77 static inline void dump_l3(struct task_base *tbase, struct rte_mbuf *mbuf)
78 {
79         if (unlikely(tbase->aux->task_rt_dump.n_print_rx)) {
80                 if ((tbase->aux->task_rt_dump.input == NULL) || (tbase->aux->task_rt_dump.input->reply == NULL)) {
81                         plogdx_info(mbuf, "RX: ");
82                 } else {
83                         struct input *input = tbase->aux->task_rt_dump.input;
84                         char tmp[128];
85                         int strlen;
86 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
87                         int port_id = mbuf->port;
88 #else
89                         int port_id = mbuf->pkt.in_port;
90 #endif
91                         strlen = snprintf(tmp, sizeof(tmp), "pktdump,%d,%d\n", port_id,
92                               rte_pktmbuf_pkt_len(mbuf));
93                         input->reply(input, tmp, strlen);
94                         input->reply(input, rte_pktmbuf_mtod(mbuf, char *), rte_pktmbuf_pkt_len(mbuf));
95                         input->reply(input, "\n", 1);
96                 }
97                 tbase->aux->task_rt_dump.n_print_rx --;
98                 if (0 == tbase->aux->task_rt_dump.n_print_rx) {
99                         task_base_del_rx_pkt_function(tbase, rx_pkt_dump);
100                 }
101         }
102         if (unlikely(tbase->aux->task_rt_dump.n_trace)) {
103                 plogdx_info(mbuf, "RX: ");
104                 tbase->aux->task_rt_dump.n_trace--;
105         }
106 }
107
108 static uint16_t rx_pkt_hw_param(struct task_base *tbase, struct rte_mbuf ***mbufs_ptr, int multi,
109                                 void (*next)(struct rx_params_hw *rx_param_hw), int l3)
110 {
111         uint8_t last_read_portid;
112         uint16_t nb_rx;
113         int skip = 0;
114
115         START_EMPTY_MEASSURE();
116         *mbufs_ptr = tbase->ws_mbuf->mbuf[0] +
117                 (RTE_ALIGN_CEIL(tbase->ws_mbuf->idx[0].prod, 2) & WS_MBUF_MASK);
118
119         last_read_portid = tbase->rx_params_hw.last_read_portid;
120         struct port_queue *pq = &tbase->rx_params_hw.rx_pq[last_read_portid];
121
122         nb_rx = rx_pkt_hw_port_queue(pq, *mbufs_ptr, multi);
123         next(&tbase->rx_params_hw);
124
125         if (l3) {
126                 struct rte_mbuf **mbufs = *mbufs_ptr;
127                 int i;
128                 struct ether_hdr_arp *hdr[MAX_PKT_BURST];
129                 for (i = 0; i < nb_rx; i++) {
130                         PREFETCH0(mbufs[i]);
131                 }
132                 for (i = 0; i < nb_rx; i++) {
133                         hdr[i] = rte_pktmbuf_mtod(mbufs[i], struct ether_hdr_arp *);
134                         PREFETCH0(hdr[i]);
135                 }
136                 for (i = 0; i < nb_rx; i++) {
137                         if (unlikely(hdr[i]->ether_hdr.ether_type == ETYPE_ARP)) {
138                                 dump_l3(tbase, mbufs[i]);
139                                 tx_ring(tbase, tbase->l3.ctrl_plane_ring, ARP_TO_CTRL, mbufs[i]);
140                                 skip++;
141                         } else if (unlikely(skip)) {
142                                 mbufs[i - skip] = mbufs[i];
143                         }
144                 }
145         }
146
147         if (skip)
148                 TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, skip);
149         if (likely(nb_rx > 0)) {
150                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
151                 return nb_rx - skip;
152         }
153         TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
154         return 0;
155 }
156
157 static inline uint16_t rx_pkt_hw1_param(struct task_base *tbase, struct rte_mbuf ***mbufs_ptr, int multi, int l3)
158 {
159         uint16_t nb_rx, n;
160         int skip = 0;
161
162         START_EMPTY_MEASSURE();
163         *mbufs_ptr = tbase->ws_mbuf->mbuf[0] +
164                 (RTE_ALIGN_CEIL(tbase->ws_mbuf->idx[0].prod, 2) & WS_MBUF_MASK);
165
166         nb_rx = rte_eth_rx_burst(tbase->rx_params_hw1.rx_pq.port,
167                                  tbase->rx_params_hw1.rx_pq.queue,
168                                  *mbufs_ptr, MAX_PKT_BURST);
169
170         if (multi) {
171                 n = nb_rx;
172                 while ((n != 0) && (MAX_PKT_BURST - nb_rx >= MIN_PMD_RX)) {
173                         n = rte_eth_rx_burst(tbase->rx_params_hw1.rx_pq.port,
174                                  tbase->rx_params_hw1.rx_pq.queue,
175                                  *mbufs_ptr + nb_rx, MIN_PMD_RX);
176                         nb_rx += n;
177                         PROX_PANIC(nb_rx > 64, "Received %d packets while expecting maximum %d\n", n, MIN_PMD_RX);
178                 }
179         }
180
181         if (l3) {
182                 struct rte_mbuf **mbufs = *mbufs_ptr;
183                 int i;
184                 struct ether_hdr_arp *hdr[MAX_PKT_BURST];
185                 for (i = 0; i < nb_rx; i++) {
186                         PREFETCH0(mbufs[i]);
187                 }
188                 for (i = 0; i < nb_rx; i++) {
189                         hdr[i] = rte_pktmbuf_mtod(mbufs[i], struct ether_hdr_arp *);
190                         PREFETCH0(hdr[i]);
191                 }
192                 for (i = 0; i < nb_rx; i++) {
193                         if (unlikely(hdr[i]->ether_hdr.ether_type == ETYPE_ARP)) {
194                                 dump_l3(tbase, mbufs[i]);
195                                 tx_ring(tbase, tbase->l3.ctrl_plane_ring, ARP_TO_CTRL, mbufs[i]);
196                                 skip++;
197                         } else if (unlikely(skip)) {
198                                 mbufs[i - skip] = mbufs[i];
199                         }
200                 }
201         }
202
203         if (skip)
204                 TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, skip);
205         if (likely(nb_rx > 0)) {
206                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
207                 return nb_rx - skip;
208         }
209         TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
210         return 0;
211 }
212
213 uint16_t rx_pkt_hw(struct task_base *tbase, struct rte_mbuf ***mbufs)
214 {
215         return rx_pkt_hw_param(tbase, mbufs, 0, next_port, 0);
216 }
217
218 uint16_t rx_pkt_hw_pow2(struct task_base *tbase, struct rte_mbuf ***mbufs)
219 {
220         return rx_pkt_hw_param(tbase, mbufs, 0, next_port_pow2, 0);
221 }
222
223 uint16_t rx_pkt_hw1(struct task_base *tbase, struct rte_mbuf ***mbufs)
224 {
225         return rx_pkt_hw1_param(tbase, mbufs, 0, 0);
226 }
227
228 uint16_t rx_pkt_hw_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
229 {
230         return rx_pkt_hw_param(tbase, mbufs, 1, next_port, 0);
231 }
232
233 uint16_t rx_pkt_hw_pow2_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
234 {
235         return rx_pkt_hw_param(tbase, mbufs, 1, next_port_pow2, 0);
236 }
237
238 uint16_t rx_pkt_hw1_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
239 {
240         return rx_pkt_hw1_param(tbase, mbufs, 1, 0);
241 }
242
243 uint16_t rx_pkt_hw_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
244 {
245         return rx_pkt_hw_param(tbase, mbufs, 0, next_port, 1);
246 }
247
248 uint16_t rx_pkt_hw_pow2_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
249 {
250         return rx_pkt_hw_param(tbase, mbufs, 0, next_port_pow2, 1);
251 }
252
253 uint16_t rx_pkt_hw1_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
254 {
255         return rx_pkt_hw1_param(tbase, mbufs, 0, 1);
256 }
257
258 uint16_t rx_pkt_hw_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
259 {
260         return rx_pkt_hw_param(tbase, mbufs, 1, next_port, 1);
261 }
262
263 uint16_t rx_pkt_hw_pow2_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
264 {
265         return rx_pkt_hw_param(tbase, mbufs, 1, next_port_pow2, 1);
266 }
267
268 uint16_t rx_pkt_hw1_multi_l3(struct task_base *tbase, struct rte_mbuf ***mbufs)
269 {
270         return rx_pkt_hw1_param(tbase, mbufs, 1, 1);
271 }
272
273 /* The following functions implement ring access */
274 uint16_t ring_deq(struct rte_ring *r, struct rte_mbuf **mbufs)
275 {
276         void **v_mbufs = (void **)mbufs;
277 #ifdef BRAS_RX_BULK
278 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
279         return rte_ring_sc_dequeue_bulk(r, v_mbufs, MAX_RING_BURST) < 0? 0 : MAX_RING_BURST;
280 #else
281         return rte_ring_sc_dequeue_bulk(r, v_mbufs, MAX_RING_BURST, NULL);
282 #endif
283 #else
284 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
285         return rte_ring_sc_dequeue_burst(r, v_mbufs, MAX_RING_BURST);
286 #else
287         return rte_ring_sc_dequeue_burst(r, v_mbufs, MAX_RING_BURST, NULL);
288 #endif
289 #endif
290 }
291
292 uint16_t rx_pkt_sw(struct task_base *tbase, struct rte_mbuf ***mbufs)
293 {
294         START_EMPTY_MEASSURE();
295         *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
296         uint8_t lr = tbase->rx_params_sw.last_read_ring;
297         uint16_t nb_rx;
298
299         do {
300                 nb_rx = ring_deq(tbase->rx_params_sw.rx_rings[lr], *mbufs);
301                 lr = lr + 1 == tbase->rx_params_sw.nb_rxrings? 0 : lr + 1;
302         } while(!nb_rx && lr != tbase->rx_params_sw.last_read_ring);
303
304         tbase->rx_params_sw.last_read_ring = lr;
305
306         if (nb_rx != 0) {
307                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
308                 return nb_rx;
309         }
310         else {
311                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
312                 return 0;
313         }
314 }
315
316 /* Same as rx_pkt_sw expect with a mask for the number of receive
317    rings (can only be used if nb_rxring is a power of 2). */
318 uint16_t rx_pkt_sw_pow2(struct task_base *tbase, struct rte_mbuf ***mbufs)
319 {
320         START_EMPTY_MEASSURE();
321         *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
322         uint8_t lr = tbase->rx_params_sw.last_read_ring;
323         uint16_t nb_rx;
324
325         do {
326                 nb_rx = ring_deq(tbase->rx_params_sw.rx_rings[lr], *mbufs);
327                 lr = (lr + 1) & tbase->rx_params_sw.rxrings_mask;
328         } while(!nb_rx && lr != tbase->rx_params_sw.last_read_ring);
329
330         tbase->rx_params_sw.last_read_ring = lr;
331
332         if (nb_rx != 0) {
333                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
334                 return nb_rx;
335         }
336         else {
337                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
338                 return 0;
339         }
340 }
341
342 uint16_t rx_pkt_self(struct task_base *tbase, struct rte_mbuf ***mbufs)
343 {
344         START_EMPTY_MEASSURE();
345         uint16_t nb_rx = tbase->ws_mbuf->idx[0].nb_rx;
346         if (nb_rx) {
347                 tbase->ws_mbuf->idx[0].nb_rx = 0;
348                 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
349                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
350                 return nb_rx;
351         }
352         else {
353                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
354                 return 0;
355         }
356 }
357
358 /* Used for tasks that do not receive packets (i.e. Packet
359 generation).  Always returns 1 but never returns packets and does not
360 increment statistics. This function allows to use the same code path
361 as for tasks that actually receive packets. */
362 uint16_t rx_pkt_dummy(__attribute__((unused)) struct task_base *tbase,
363                       __attribute__((unused)) struct rte_mbuf ***mbufs)
364 {
365         return 1;
366 }
367
368 /* After the system has been configured, it is known if there is only
369    one RX ring. If this is the case, a more specialized version of the
370    function above can be used to save cycles. */
371 uint16_t rx_pkt_sw1(struct task_base *tbase, struct rte_mbuf ***mbufs)
372 {
373         START_EMPTY_MEASSURE();
374         *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
375         uint16_t nb_rx = ring_deq(tbase->rx_params_sw1.rx_ring, *mbufs);
376
377         if (nb_rx != 0) {
378                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
379                 return nb_rx;
380         }
381         else {
382                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
383                 return 0;
384         }
385 }
386
387 static uint16_t call_prev_rx_pkt(struct task_base *tbase, struct rte_mbuf ***mbufs)
388 {
389         uint16_t ret;
390
391         tbase->aux->rx_prev_idx++;
392         ret = tbase->aux->rx_pkt_prev[tbase->aux->rx_prev_idx - 1](tbase, mbufs);
393         tbase->aux->rx_prev_idx--;
394
395         return ret;
396 }
397
398 /* Only used when there are packets to be dumped. This function is
399    meant as a debugging tool and is therefore not optimized. When the
400    number of packets to dump falls back to 0, the original (optimized)
401    rx function is restored. This allows to support dumping packets
402    without any performance impact if the feature is not used. */
403 uint16_t rx_pkt_dump(struct task_base *tbase, struct rte_mbuf ***mbufs)
404 {
405         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
406
407         if (ret) {
408                 uint32_t n_dump = tbase->aux->task_rt_dump.n_print_rx;
409                 n_dump = ret < n_dump? ret : n_dump;
410
411                 if ((tbase->aux->task_rt_dump.input == NULL) || (tbase->aux->task_rt_dump.input->reply == NULL)) {
412                         for (uint32_t i = 0; i < n_dump; ++i) {
413                                 plogdx_info((*mbufs)[i], "RX: ");
414                         }
415                 }
416                 else {
417                         struct input *input = tbase->aux->task_rt_dump.input;
418
419                         for (uint32_t i = 0; i < n_dump; ++i) {
420                                 /* TODO: Execute callback with full
421                                    data in a single call. */
422                                 char tmp[128];
423                                 int strlen;
424
425 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
426                                 int port_id = ((*mbufs)[i])->port;
427 #else
428                                 int port_id = ((*mbufs)[i])->pkt.in_port;
429 #endif
430                                 strlen = snprintf(tmp, sizeof(tmp), "pktdump,%d,%d\n", port_id,
431                                                       rte_pktmbuf_pkt_len((*mbufs)[i]));
432
433                                 input->reply(input, tmp, strlen);
434                                 input->reply(input, rte_pktmbuf_mtod((*mbufs)[i], char *), rte_pktmbuf_pkt_len((*mbufs)[i]));
435                                 input->reply(input, "\n", 1);
436                         }
437                 }
438
439                 tbase->aux->task_rt_dump.n_print_rx -= n_dump;
440
441                 if (0 == tbase->aux->task_rt_dump.n_print_rx) {
442                         task_base_del_rx_pkt_function(tbase, rx_pkt_dump);
443                 }
444         }
445         return ret;
446 }
447
448 uint16_t rx_pkt_trace(struct task_base *tbase, struct rte_mbuf ***mbufs)
449 {
450         tbase->aux->task_rt_dump.cur_trace = 0;
451         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
452
453         if (ret) {
454                 uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
455                 n_trace = ret < n_trace? ret : n_trace;
456                 n_trace = n_trace <= MAX_RING_BURST ? n_trace : MAX_RING_BURST;
457
458                 for (uint32_t i = 0; i < n_trace; ++i) {
459                         uint8_t *pkt = rte_pktmbuf_mtod((*mbufs)[i], uint8_t *);
460                         rte_memcpy(tbase->aux->task_rt_dump.pkt_cpy[i], pkt, sizeof(tbase->aux->task_rt_dump.pkt_cpy[i]));
461                         tbase->aux->task_rt_dump.pkt_cpy_len[i] = rte_pktmbuf_pkt_len((*mbufs)[i]);
462                         tbase->aux->task_rt_dump.pkt_mbuf_addr[i] = (*mbufs)[i];
463                 }
464                 tbase->aux->task_rt_dump.cur_trace += n_trace;
465
466                 tbase->aux->task_rt_dump.n_trace -= n_trace;
467                 /* Unset by TX when n_trace = 0 */
468         }
469         return ret;
470 }
471
472 /* Gather the distribution of the number of packets that have been
473    received from one RX call. Since the value is only modified by the
474    task that receives the packet, no atomic operation is needed. */
475 uint16_t rx_pkt_distr(struct task_base *tbase, struct rte_mbuf ***mbufs)
476 {
477         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
478
479         if (likely(ret < RX_BUCKET_SIZE))
480                 tbase->aux->rx_bucket[ret]++;
481         else
482                 tbase->aux->rx_bucket[RX_BUCKET_SIZE - 1]++;
483         return ret;
484 }
485
486 uint16_t rx_pkt_bw(struct task_base *tbase, struct rte_mbuf ***mbufs)
487 {
488         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
489         uint32_t tot_bytes = 0;
490
491         for (uint16_t i = 0; i < ret; ++i) {
492                 tot_bytes += mbuf_wire_size((*mbufs)[i]);
493         }
494
495         TASK_STATS_ADD_RX_BYTES(&tbase->aux->stats, tot_bytes);
496
497         return ret;
498 }
499
500 uint16_t rx_pkt_tsc(struct task_base *tbase, struct rte_mbuf ***mbufs)
501 {
502         uint64_t before = rte_rdtsc();
503         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
504         uint64_t after = rte_rdtsc();
505
506         tbase->aux->tsc_rx.before = before;
507         tbase->aux->tsc_rx.after = after;
508
509         return ret;
510 }
511
512 uint16_t rx_pkt_all(struct task_base *tbase, struct rte_mbuf ***mbufs)
513 {
514         uint16_t tot = 0;
515         uint16_t ret = 0;
516         struct rte_mbuf **new_mbufs;
517         struct rte_mbuf **dst = tbase->aux->all_mbufs;
518
519         /* In case we receive less than MAX_PKT_BURST packets in one
520            iteration, do no perform any copying of mbuf pointers. Use
521            the buffer itself instead. */
522         ret = call_prev_rx_pkt(tbase, &new_mbufs);
523         if (ret < MAX_PKT_BURST/2) {
524                 *mbufs = new_mbufs;
525                 return ret;
526         }
527
528         memcpy(dst + tot, new_mbufs, ret * sizeof(*dst));
529         tot += ret;
530         *mbufs = dst;
531
532         do {
533                 ret = call_prev_rx_pkt(tbase, &new_mbufs);
534                 memcpy(dst + tot, new_mbufs, ret * sizeof(*dst));
535                 tot += ret;
536         } while (ret == MAX_PKT_BURST/2 && tot < MAX_RX_PKT_ALL - MAX_PKT_BURST);
537
538         if (tot >= MAX_RX_PKT_ALL - MAX_PKT_BURST) {
539                 plog_err("Could not receive all packets - buffer full\n");
540         }
541
542         return tot;
543 }