Merge "[l2l3 stack] implements new arp state machine & arp buffering"
[samplevnf.git] / VNFs / DPPD-PROX / rx_pkt.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_cycles.h>
18 #include <rte_ethdev.h>
19 #include <rte_version.h>
20
21 #include "rx_pkt.h"
22 #include "task_base.h"
23 #include "clock.h"
24 #include "stats.h"
25 #include "log.h"
26 #include "mbuf_utils.h"
27 #include "input.h" /* Needed for callback on dump */
28
29 /* _param version of the rx_pkt_hw functions are used to create two
30    instances of very similar variations of these functions. The
31    variations are specified by the "multi" parameter which significies
32    that the rte_eth_rx_burst function should be called multiple times.
33    The reason for this is that with the vector PMD, the maximum number
34    of packets being returned is 32. If packets have been split in
35    multiple mbufs then rte_eth_rx_burst might even receive less than
36    32 packets.
37    Some algorithms (like QoS) only work correctly if more than 32
38    packets are received if the dequeue step involves finding 32 packets.
39 */
40
41 #define MIN_PMD_RX 32
42
43 static uint16_t rx_pkt_hw_port_queue(struct port_queue *pq, struct rte_mbuf **mbufs, int multi)
44 {
45         uint16_t nb_rx, n;
46
47         nb_rx = rte_eth_rx_burst(pq->port, pq->queue, mbufs, MAX_PKT_BURST);
48
49         if (multi) {
50                 n = nb_rx;
51                 while (n != 0 && MAX_PKT_BURST - nb_rx >= MIN_PMD_RX) {
52                         n = rte_eth_rx_burst(pq->port, pq->queue, mbufs + nb_rx, MIN_PMD_RX);
53                         nb_rx += n;
54                         PROX_PANIC(nb_rx > 64, "Received %d packets while expecting maximum %d\n", n, MIN_PMD_RX);
55                 }
56         }
57         return nb_rx;
58 }
59
60 static void next_port(struct rx_params_hw *rx_params_hw)
61 {
62         ++rx_params_hw->last_read_portid;
63         if (unlikely(rx_params_hw->last_read_portid == rx_params_hw->nb_rxports)) {
64                 rx_params_hw->last_read_portid = 0;
65         }
66 }
67
68 static void next_port_pow2(struct rx_params_hw *rx_params_hw)
69 {
70         rx_params_hw->last_read_portid = (rx_params_hw->last_read_portid + 1) & rx_params_hw->rxport_mask;
71 }
72
73 static uint16_t rx_pkt_hw_param(struct task_base *tbase, struct rte_mbuf ***mbufs, int multi,
74                                 void (*next)(struct rx_params_hw *rx_param_hw))
75 {
76         uint8_t last_read_portid;
77         uint16_t nb_rx;
78
79         START_EMPTY_MEASSURE();
80         *mbufs = tbase->ws_mbuf->mbuf[0] +
81                 (RTE_ALIGN_CEIL(tbase->ws_mbuf->idx[0].prod, 2) & WS_MBUF_MASK);
82
83         last_read_portid = tbase->rx_params_hw.last_read_portid;
84         struct port_queue *pq = &tbase->rx_params_hw.rx_pq[last_read_portid];
85
86         nb_rx = rx_pkt_hw_port_queue(pq, *mbufs, multi);
87         next(&tbase->rx_params_hw);
88
89         if (likely(nb_rx > 0)) {
90                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
91                 return nb_rx;
92         }
93         TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
94         return 0;
95 }
96
97 static inline uint16_t rx_pkt_hw1_param(struct task_base *tbase, struct rte_mbuf ***mbufs, int multi)
98 {
99         uint16_t nb_rx, n;
100
101         START_EMPTY_MEASSURE();
102         *mbufs = tbase->ws_mbuf->mbuf[0] +
103                 (RTE_ALIGN_CEIL(tbase->ws_mbuf->idx[0].prod, 2) & WS_MBUF_MASK);
104
105         nb_rx = rte_eth_rx_burst(tbase->rx_params_hw1.rx_pq.port,
106                                  tbase->rx_params_hw1.rx_pq.queue,
107                                  *mbufs, MAX_PKT_BURST);
108
109         if (multi) {
110                 n = nb_rx;
111                 while ((n != 0) && (MAX_PKT_BURST - nb_rx >= MIN_PMD_RX)) {
112                         n = rte_eth_rx_burst(tbase->rx_params_hw1.rx_pq.port,
113                                  tbase->rx_params_hw1.rx_pq.queue,
114                                  *mbufs + nb_rx, MIN_PMD_RX);
115                         nb_rx += n;
116                         PROX_PANIC(nb_rx > 64, "Received %d packets while expecting maximum %d\n", n, MIN_PMD_RX);
117                 }
118         }
119
120         if (likely(nb_rx > 0)) {
121                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
122                 return nb_rx;
123         }
124         TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
125         return 0;
126 }
127
128 uint16_t rx_pkt_hw(struct task_base *tbase, struct rte_mbuf ***mbufs)
129 {
130         return rx_pkt_hw_param(tbase, mbufs, 0, next_port);
131 }
132
133 uint16_t rx_pkt_hw_pow2(struct task_base *tbase, struct rte_mbuf ***mbufs)
134 {
135         return rx_pkt_hw_param(tbase, mbufs, 0, next_port_pow2);
136 }
137
138 uint16_t rx_pkt_hw1(struct task_base *tbase, struct rte_mbuf ***mbufs)
139 {
140         return rx_pkt_hw1_param(tbase, mbufs, 0);
141 }
142
143 uint16_t rx_pkt_hw_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
144 {
145         return rx_pkt_hw_param(tbase, mbufs, 1, next_port);
146 }
147
148 uint16_t rx_pkt_hw_pow2_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
149 {
150         return rx_pkt_hw_param(tbase, mbufs, 1, next_port_pow2);
151 }
152
153 uint16_t rx_pkt_hw1_multi(struct task_base *tbase, struct rte_mbuf ***mbufs)
154 {
155         return rx_pkt_hw1_param(tbase, mbufs, 1);
156 }
157
158 /* The following functions implement ring access */
159 static uint16_t ring_deq(struct rte_ring *r, struct rte_mbuf **mbufs)
160 {
161         void **v_mbufs = (void **)mbufs;
162 #ifdef BRAS_RX_BULK
163 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
164         return rte_ring_sc_dequeue_bulk(r, v_mbufs, MAX_RING_BURST) < 0? 0 : MAX_RING_BURST;
165 #else
166         return rte_ring_sc_dequeue_bulk(r, v_mbufs, MAX_RING_BURST, NULL);
167 #endif
168 #else
169 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
170         return rte_ring_sc_dequeue_burst(r, v_mbufs, MAX_RING_BURST);
171 #else
172         return rte_ring_sc_dequeue_burst(r, v_mbufs, MAX_RING_BURST, NULL);
173 #endif
174 #endif
175 }
176
177 uint16_t rx_pkt_sw(struct task_base *tbase, struct rte_mbuf ***mbufs)
178 {
179         START_EMPTY_MEASSURE();
180         *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
181         uint8_t lr = tbase->rx_params_sw.last_read_ring;
182         uint16_t nb_rx;
183
184         do {
185                 nb_rx = ring_deq(tbase->rx_params_sw.rx_rings[lr], *mbufs);
186                 lr = lr + 1 == tbase->rx_params_sw.nb_rxrings? 0 : lr + 1;
187         } while(!nb_rx && lr != tbase->rx_params_sw.last_read_ring);
188
189         tbase->rx_params_sw.last_read_ring = lr;
190
191         if (nb_rx != 0) {
192                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
193                 return nb_rx;
194         }
195         else {
196                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
197                 return 0;
198         }
199 }
200
201 /* Same as rx_pkt_sw expect with a mask for the number of receive
202    rings (can only be used if nb_rxring is a power of 2). */
203 uint16_t rx_pkt_sw_pow2(struct task_base *tbase, struct rte_mbuf ***mbufs)
204 {
205         START_EMPTY_MEASSURE();
206         *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
207         uint8_t lr = tbase->rx_params_sw.last_read_ring;
208         uint16_t nb_rx;
209
210         do {
211                 nb_rx = ring_deq(tbase->rx_params_sw.rx_rings[lr], *mbufs);
212                 lr = (lr + 1) & tbase->rx_params_sw.rxrings_mask;
213         } while(!nb_rx && lr != tbase->rx_params_sw.last_read_ring);
214
215         tbase->rx_params_sw.last_read_ring = lr;
216
217         if (nb_rx != 0) {
218                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
219                 return nb_rx;
220         }
221         else {
222                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
223                 return 0;
224         }
225 }
226
227 uint16_t rx_pkt_self(struct task_base *tbase, struct rte_mbuf ***mbufs)
228 {
229         START_EMPTY_MEASSURE();
230         uint16_t nb_rx = tbase->ws_mbuf->idx[0].nb_rx;
231         if (nb_rx) {
232                 tbase->ws_mbuf->idx[0].nb_rx = 0;
233                 *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
234                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
235                 return nb_rx;
236         }
237         else {
238                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
239                 return 0;
240         }
241 }
242
243 /* Used for tasks that do not receive packets (i.e. Packet
244 generation).  Always returns 1 but never returns packets and does not
245 increment statistics. This function allows to use the same code path
246 as for tasks that actually receive packets. */
247 uint16_t rx_pkt_dummy(__attribute__((unused)) struct task_base *tbase,
248                       __attribute__((unused)) struct rte_mbuf ***mbufs)
249 {
250         return 1;
251 }
252
253 /* After the system has been configured, it is known if there is only
254    one RX ring. If this is the case, a more specialized version of the
255    function above can be used to save cycles. */
256 uint16_t rx_pkt_sw1(struct task_base *tbase, struct rte_mbuf ***mbufs)
257 {
258         START_EMPTY_MEASSURE();
259         *mbufs = tbase->ws_mbuf->mbuf[0] + (tbase->ws_mbuf->idx[0].prod & WS_MBUF_MASK);
260         uint16_t nb_rx = ring_deq(tbase->rx_params_sw1.rx_ring, *mbufs);
261
262         if (nb_rx != 0) {
263                 TASK_STATS_ADD_RX(&tbase->aux->stats, nb_rx);
264                 return nb_rx;
265         }
266         else {
267                 TASK_STATS_ADD_IDLE(&tbase->aux->stats, rte_rdtsc() - cur_tsc);
268                 return 0;
269         }
270 }
271
272 static uint16_t call_prev_rx_pkt(struct task_base *tbase, struct rte_mbuf ***mbufs)
273 {
274         uint16_t ret;
275
276         if (tbase->aux->rx_prev_idx + 1 == tbase->aux->rx_prev_count) {
277                 ret = tbase->aux->rx_pkt_prev[tbase->aux->rx_prev_idx](tbase, mbufs);
278         } else {
279                 tbase->aux->rx_prev_idx++;
280                 ret = tbase->aux->rx_pkt_prev[tbase->aux->rx_prev_idx](tbase, mbufs);
281                 tbase->aux->rx_prev_idx--;
282         }
283
284         return ret;
285 }
286
287 /* Only used when there are packets to be dumped. This function is
288    meant as a debugging tool and is therefore not optimized. When the
289    number of packets to dump falls back to 0, the original (optimized)
290    rx function is restored. This allows to support dumping packets
291    without any performance impact if the feature is not used. */
292 uint16_t rx_pkt_dump(struct task_base *tbase, struct rte_mbuf ***mbufs)
293 {
294         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
295
296         if (ret) {
297                 uint32_t n_dump = tbase->aux->task_rt_dump.n_print_rx;
298                 n_dump = ret < n_dump? ret : n_dump;
299
300                 if (tbase->aux->task_rt_dump.input->reply == NULL) {
301                         for (uint32_t i = 0; i < n_dump; ++i) {
302                                 plogd_info((*mbufs)[i], "RX: ");
303                         }
304                 }
305                 else {
306                         struct input *input = tbase->aux->task_rt_dump.input;
307
308                         for (uint32_t i = 0; i < n_dump; ++i) {
309                                 /* TODO: Execute callback with full
310                                    data in a single call. */
311                                 char tmp[128];
312                                 int strlen;
313
314 #if RTE_VERSION >= RTE_VERSION_NUM(1,8,0,0)
315                                 int port_id = ((*mbufs)[i])->port;
316 #else
317                                 int port_id = ((*mbufs)[i])->pkt.in_port;
318 #endif
319                                 strlen = snprintf(tmp, sizeof(tmp), "pktdump,%d,%d\n", port_id,
320                                                       rte_pktmbuf_pkt_len((*mbufs)[i]));
321
322                                 input->reply(input, tmp, strlen);
323                                 input->reply(input, rte_pktmbuf_mtod((*mbufs)[i], char *), rte_pktmbuf_pkt_len((*mbufs)[i]));
324                                 input->reply(input, "\n", 1);
325                         }
326                 }
327
328                 tbase->aux->task_rt_dump.n_print_rx -= n_dump;
329
330                 if (0 == tbase->aux->task_rt_dump.n_print_rx) {
331                         task_base_del_rx_pkt_function(tbase, rx_pkt_dump);
332                 }
333         }
334         return ret;
335 }
336
337 uint16_t rx_pkt_trace(struct task_base *tbase, struct rte_mbuf ***mbufs)
338 {
339         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
340
341         if (ret) {
342                 uint32_t n_trace = tbase->aux->task_rt_dump.n_trace;
343                 n_trace = ret < n_trace? ret : n_trace;
344                 tbase->aux->task_rt_dump.cur_trace = n_trace;
345
346                 for (uint32_t i = 0; i < n_trace; ++i) {
347                         uint8_t *pkt = rte_pktmbuf_mtod((*mbufs)[i], uint8_t *);
348                         rte_memcpy(tbase->aux->task_rt_dump.pkt_cpy[i], pkt, sizeof(tbase->aux->task_rt_dump.pkt_cpy[i]));
349                         tbase->aux->task_rt_dump.pkt_cpy_len[i] = rte_pktmbuf_pkt_len((*mbufs)[i]);
350                         tbase->aux->task_rt_dump.pkt_mbuf_addr[i] = (*mbufs)[i];
351                 }
352
353                 tbase->aux->task_rt_dump.n_trace -= n_trace;
354                 /* Unset by TX when n_trace = 0 */
355         }
356         return ret;
357 }
358
359 /* Gather the distribution of the number of packets that have been
360    received from one RX call. Since the value is only modified by the
361    task that receives the packet, no atomic operation is needed. */
362 uint16_t rx_pkt_distr(struct task_base *tbase, struct rte_mbuf ***mbufs)
363 {
364         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
365
366         tbase->aux->rx_bucket[ret]++;
367         return ret;
368 }
369
370 uint16_t rx_pkt_bw(struct task_base *tbase, struct rte_mbuf ***mbufs)
371 {
372         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
373         uint32_t tot_bytes = 0;
374
375         for (uint16_t i = 0; i < ret; ++i) {
376                 tot_bytes += mbuf_wire_size((*mbufs)[i]);
377         }
378
379         TASK_STATS_ADD_RX_BYTES(&tbase->aux->stats, tot_bytes);
380
381         return ret;
382 }
383
384 uint16_t rx_pkt_tsc(struct task_base *tbase, struct rte_mbuf ***mbufs)
385 {
386         uint64_t before = rte_rdtsc();
387         uint16_t ret = call_prev_rx_pkt(tbase, mbufs);
388         uint64_t after = rte_rdtsc();
389
390         tbase->aux->tsc_rx.before = before;
391         tbase->aux->tsc_rx.after = after;
392
393         return ret;
394 }
395
396 uint16_t rx_pkt_all(struct task_base *tbase, struct rte_mbuf ***mbufs)
397 {
398         uint16_t tot = 0;
399         uint16_t ret = 0;
400         struct rte_mbuf **new_mbufs;
401         struct rte_mbuf **dst = tbase->aux->all_mbufs;
402
403         /* In case we receive less than MAX_PKT_BURST packets in one
404            iteration, do no perform any copying of mbuf pointers. Use
405            the buffer itself instead. */
406         ret = call_prev_rx_pkt(tbase, &new_mbufs);
407         if (ret < MAX_PKT_BURST/2) {
408                 *mbufs = new_mbufs;
409                 return ret;
410         }
411
412         memcpy(dst + tot, new_mbufs, ret * sizeof(*dst));
413         tot += ret;
414         *mbufs = dst;
415
416         do {
417                 ret = call_prev_rx_pkt(tbase, &new_mbufs);
418                 memcpy(dst + tot, new_mbufs, ret * sizeof(*dst));
419                 tot += ret;
420         } while (ret == MAX_PKT_BURST/2 && tot < MAX_RX_PKT_ALL - MAX_PKT_BURST);
421
422         if (tot >= MAX_RX_PKT_ALL - MAX_PKT_BURST) {
423                 plog_err("Could not receive all packets - buffer full\n");
424         }
425
426         return tot;
427 }