Multiple changes for June release
[samplevnf.git] / VNFs / DPPD-PROX / genl4_bundle.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <string.h>
18 #include <rte_hash.h>
19 #include <rte_memory.h>
20 #include <rte_hash_crc.h>
21 #include <rte_cycles.h>
22 #include <rte_version.h>
23
24 #include "prox_malloc.h"
25 #include "prox_assert.h"
26 #include "cdf.h"
27 #include "defines.h"
28 #include "genl4_bundle.h"
29 #include "log.h"
30 #include "pkt_parser.h"
31 #include "prox_lua_types.h"
32
33 #if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0)
34 #define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE
35 #define RTE_CACHE_LINE_ROUNDUP CACHE_LINE_ROUNDUP
36 #endif
37
38 /* zero on success */
39 int bundle_ctx_pool_create(const char *name, uint32_t n_elems, struct bundle_ctx_pool *ret, uint32_t *occur, uint32_t n_occur, struct bundle_cfg *cfg, int socket_id)
40 {
41         size_t memsize;
42         uint8_t *mem;
43
44         const struct rte_hash_parameters params = {
45                 .name = name,
46                 .entries = rte_align32pow2(n_elems) * 8,
47                 //.bucket_entries = 8,
48                 .key_len = sizeof(struct pkt_tuple),
49                 .hash_func = rte_hash_crc,
50                 .hash_func_init_val = 0,
51                 .socket_id = socket_id,
52         };
53
54         ret->hash = rte_hash_create(&params);
55         if (NULL == ret->hash)
56                 return -1;
57
58         uint32_t rand_pool_size = 0, tot_occur = 0;
59
60         if (occur) {
61                 for (uint32_t i = 0; i < n_occur; ++i) {
62                         tot_occur += occur[i];
63                 }
64
65                 rand_pool_size = (n_elems + (tot_occur - 1))/tot_occur*tot_occur;
66         }
67
68         memsize = 0;
69         memsize += RTE_CACHE_LINE_ROUNDUP(params.entries * sizeof(ret->hash_entries[0]));
70         memsize += RTE_CACHE_LINE_ROUNDUP(n_elems * sizeof(ret->free_bundles[0]));
71         memsize += RTE_CACHE_LINE_ROUNDUP(n_elems * sizeof(ret->bundles[0]));
72         if (occur)
73                 memsize += RTE_CACHE_LINE_ROUNDUP(rand_pool_size * sizeof(ret->occur));
74         mem = prox_zmalloc(memsize, socket_id);
75         if (NULL == mem)
76                 return -1;
77
78         ret->hash_entries = (struct bundle_ctx **) mem;
79         mem += RTE_CACHE_LINE_ROUNDUP(params.entries * sizeof(ret->hash_entries[0]));
80         ret->free_bundles = (struct bundle_ctx **) mem;
81         mem += RTE_CACHE_LINE_ROUNDUP(n_elems * sizeof(ret->free_bundles[0]));
82         if (occur) {
83                 ret->occur = (uint32_t *)mem;
84                 mem += RTE_CACHE_LINE_ROUNDUP(rand_pool_size * sizeof(ret->occur));
85
86                 ret->seed = rte_rdtsc();
87
88                 size_t cur_occur = 0;
89                 size_t j = 0;
90
91                 for (uint32_t i = 0; i < rand_pool_size; ++i) {
92                         while (j >= occur[cur_occur]) {
93                                 cur_occur++;
94                                 if (cur_occur == n_occur)
95                                         cur_occur = 0;
96                                 j = 0;
97                         }
98                         j++;
99                         ret->occur[i] = cur_occur;
100                 }
101                 ret->n_occur = rand_pool_size;
102         }
103         ret->bundles = (struct bundle_ctx *) mem;
104
105         ret->bundle_cfg = cfg;
106         for (unsigned i = 0; i < n_elems; ++i) {
107                 ret->free_bundles[i] = &ret->bundles[i];
108         }
109         ret->n_free_bundles = n_elems;
110         ret->tot_bundles    = n_elems;
111
112         return 0;
113 }
114
115 struct bundle_ctx *bundle_ctx_pool_get(struct bundle_ctx_pool *p)
116 {
117         if (p->n_free_bundles > 0)
118                 return p->free_bundles[--p->n_free_bundles];
119         return NULL;
120 }
121
122 static struct bundle_cfg *bundle_ctx_get_cfg(struct bundle_ctx_pool *p)
123 {
124         uint32_t rand = 0;
125
126         /* get rand in [0, RAND_MAX rounded down] */
127         do {
128                 rand = rand_r(&p->seed);
129         } while (rand >= RAND_MAX/p->n_occur*p->n_occur);
130
131         rand /= RAND_MAX/p->n_occur;
132
133         PROX_ASSERT(p->n_occur);
134         PROX_ASSERT(rand < p->n_occur);
135
136         uint32_t r = p->occur[rand];
137         p->occur[rand] = p->occur[--p->n_occur];
138
139         return &p->bundle_cfg[r];
140 }
141
142 static void bundle_ctx_put_cfg(struct bundle_ctx_pool *p, const struct bundle_cfg *cfg)
143 {
144         if (p->occur) {
145                 uint32_t r = cfg - p->bundle_cfg;
146                 p->occur[p->n_occur++] = r;
147         }
148 }
149
150 struct bundle_ctx *bundle_ctx_pool_get_w_cfg(struct bundle_ctx_pool *p)
151 {
152         if (p->n_free_bundles > 0) {
153                 struct bundle_ctx *ret = p->free_bundles[--p->n_free_bundles];
154                 ret->cfg = bundle_ctx_get_cfg(p);
155                 return ret;
156         }
157
158         return NULL;
159 }
160
161 void bundle_ctx_pool_put(struct bundle_ctx_pool *p, struct bundle_ctx *bundle)
162 {
163         bundle_ctx_put_cfg(p, bundle->cfg);
164         p->free_bundles[p->n_free_bundles++] = bundle;
165 }
166
167 static void bundle_cleanup(struct bundle_ctx *bundle)
168 {
169         if (bundle->heap_ref.elem != NULL) {
170                 heap_del(bundle->heap, &bundle->heap_ref);
171         }
172 }
173
174 static int bundle_iterate_streams(struct bundle_ctx *bundle, struct bundle_ctx_pool *pool, unsigned *seed, struct l4_stats *l4_stats)
175 {
176         enum l4gen_peer peer;
177         int ret = 0, old;
178
179         while (bundle->ctx.stream_cfg->is_ended(&bundle->ctx)) {
180
181                 if (bundle->ctx.stream_cfg->proto == IPPROTO_TCP) {
182                         if (bundle->ctx.retransmits == 0)
183                                 l4_stats->tcp_finished_no_retransmit++;
184                         else
185                                 l4_stats->tcp_finished_retransmit++;
186                 }
187                 else
188                         l4_stats->udp_finished++;
189
190                 if (bundle->stream_idx + 1 != bundle->cfg->n_stream_cfgs) {
191                         ret = 1;
192                         bundle->stream_idx++;
193
194                         stream_ctx_reset_move(&bundle->ctx, bundle->cfg->stream_cfgs[bundle->stream_idx]);
195
196                         /* Update tuple */
197                         old = rte_hash_del_key(pool->hash, &bundle->tuple);
198                         if (old < 0) {
199                                 plogx_err("Failed to delete key while trying to change tuple: %d (%s)\n",old, strerror(-old));
200                         }
201                         plogx_dbg("Moving to stream with idx %d\n", bundle->stream_idx);
202
203                         /* In case there are multiple streams, clients
204                            randomized but ports fixed, it is still
205                            possible to hit an infinite loop here. The
206                            situations is hit if a client:port is
207                            connected to a server:port in one of the
208                            streams while client:port is regenerated
209                            for the first stream. There is no conflict
210                            yet since the server:port is
211                            different. Note that this is bug since a
212                            client:port can only have one open
213                            connection. */
214                         int retries = 0;
215                         do {
216                                 bundle_create_tuple(&bundle->tuple, &bundle->cfg->clients, bundle->ctx.stream_cfg, 0, seed);
217
218                                 ret = rte_hash_lookup(pool->hash, (const void *)&bundle->tuple);
219                                 if (++retries == 1000) {
220                                         plogx_warn("Already tried 1K times\n");
221                                         plogx_warn("Going from %d to %d\n", bundle->stream_idx -1, bundle->stream_idx);
222                                 }
223                         } while (ret >= 0);
224
225                         ret = rte_hash_add_key(pool->hash, &bundle->tuple);
226                         if (ret < 0) {
227                                 plogx_err("Failed to add key while moving to next stream!\n");
228                                 return -1;
229                         }
230                         pool->hash_entries[ret] = pool->hash_entries[old];
231
232                         if (bundle->ctx.stream_cfg->proto == IPPROTO_TCP)
233                                 l4_stats->tcp_created++;
234                         else
235                                 l4_stats->udp_created++;
236                 }
237                 else {
238                         int a = rte_hash_del_key(pool->hash, &bundle->tuple);
239                         PROX_PANIC(a < 0, "Del failed (%d)! during finished all bundle (%d)\n", a, bundle->cfg->n_stream_cfgs);
240                         bundle_cleanup(bundle);
241                         bundle_ctx_pool_put(pool, bundle);
242
243                         return -1;
244                 }
245         }
246         return ret;
247 }
248
249 void bundle_create_tuple(struct pkt_tuple *tp, const struct host_set *clients, const struct stream_cfg *stream_cfg, int rnd_ip, unsigned  *seed)
250 {
251         tp->dst_port = clients->port;
252         tp->dst_port &= ~clients->port_mask;
253         tp->dst_port |= rand_r(seed) & clients->port_mask;
254
255         if (rnd_ip) {
256                 tp->dst_addr = clients->ip;
257                 tp->dst_addr &= ~clients->ip_mask;
258                 tp->dst_addr |= rand_r(seed) & clients->ip_mask;
259         }
260
261         tp->src_addr = stream_cfg->servers.ip;
262         tp->src_port = stream_cfg->servers.port;
263         plogx_dbg("bundle_create_tuple() with proto = %x, %d\n", stream_cfg->proto, rnd_ip);
264         tp->proto_id = stream_cfg->proto;
265
266         tp->l2_types[0] = 0x0008;
267 }
268
269 void bundle_init_w_cfg(struct bundle_ctx *bundle, const struct bundle_cfg *cfg, struct heap *heap, enum l4gen_peer peer, unsigned *seed)
270 {
271         bundle->cfg = cfg;
272         bundle_init(bundle, heap, peer, seed);
273 }
274
275 void bundle_init(struct bundle_ctx *bundle, struct heap *heap, enum l4gen_peer peer, unsigned *seed)
276 {
277         bundle->heap_ref.elem = NULL;
278         bundle->heap = heap;
279         memset(&bundle->ctx, 0, sizeof(bundle->ctx));
280         // TODO; assert that there is at least one stream
281         bundle->stream_idx = 0;
282
283         stream_ctx_init(&bundle->ctx, peer, bundle->cfg->stream_cfgs[bundle->stream_idx], &bundle->tuple);
284         bundle_create_tuple(&bundle->tuple, &bundle->cfg->clients, bundle->ctx.stream_cfg, peer == PEER_CLIENT, seed);
285 }
286
287 void bundle_expire(struct bundle_ctx *bundle, struct bundle_ctx_pool *pool, struct l4_stats *l4_stats)
288 {
289         struct pkt_tuple *pt = &bundle->tuple;
290
291         plogx_dbg("Client = "IPv4_BYTES_FMT":%d, Server = "IPv4_BYTES_FMT":%d\n",
292                   IPv4_BYTES(((uint8_t*)&pt->dst_addr)),
293                   rte_bswap16(pt->dst_port),
294                   IPv4_BYTES(((uint8_t*)&pt->src_addr)),
295                   rte_bswap16(pt->src_port));
296
297         int a = rte_hash_del_key(pool->hash, bundle);
298         if (a < 0) {
299                 plogx_err("Del failed with error %d: '%s'\n", a, strerror(-a));
300                 plogx_err("ended = %d\n", bundle->ctx.flags & STREAM_CTX_F_TCP_ENDED);
301         }
302
303         if (bundle->ctx.stream_cfg->proto == IPPROTO_TCP)
304                 l4_stats->tcp_expired++;
305         else
306                 l4_stats->udp_expired++;
307
308         bundle_cleanup(bundle);
309         bundle_ctx_pool_put(pool, bundle);
310 }
311
312 int bundle_proc_data(struct bundle_ctx *bundle, struct rte_mbuf *mbuf, struct l4_meta *l4_meta, struct bundle_ctx_pool *pool, unsigned *seed, struct l4_stats *l4_stats)
313 {
314         int ret;
315         uint64_t next_tsc;
316
317         if (bundle->heap_ref.elem != NULL) {
318                 heap_del(bundle->heap, &bundle->heap_ref);
319         }
320
321         if (bundle_iterate_streams(bundle, pool, seed, l4_stats) < 0)
322                 return -1;
323
324         uint32_t retx_before = bundle->ctx.retransmits;
325         next_tsc = UINT64_MAX;
326         ret = bundle->ctx.stream_cfg->proc(&bundle->ctx, mbuf, l4_meta, &next_tsc);
327
328         if (bundle->ctx.flags & STREAM_CTX_F_EXPIRED) {
329                 bundle_expire(bundle, pool, l4_stats);
330                 return -1;
331         }
332         else if (next_tsc != UINT64_MAX) {
333                 heap_add(bundle->heap, &bundle->heap_ref, rte_rdtsc() + next_tsc);
334         }
335         l4_stats->tcp_retransmits += bundle->ctx.retransmits - retx_before;
336
337         if (bundle_iterate_streams(bundle, pool, seed, l4_stats) > 0) {
338                 if (bundle->heap_ref.elem != NULL) {
339                         heap_del(bundle->heap, &bundle->heap_ref);
340                 }
341                 heap_add(bundle->heap, &bundle->heap_ref, rte_rdtsc());
342         }
343
344         return ret;
345 }
346
347 uint32_t bundle_cfg_length(struct bundle_cfg *cfg)
348 {
349         uint32_t ret = 0;
350
351         for (uint32_t i = 0; i < cfg->n_stream_cfgs; ++i) {
352                 ret += cfg->stream_cfgs[i]->n_bytes;
353         }
354
355         return ret;
356 }
357
358 uint32_t bundle_cfg_max_n_segments(struct bundle_cfg *cfg)
359 {
360         uint32_t ret = 0;
361         uint32_t cur;
362
363         for (uint32_t i = 0; i < cfg->n_stream_cfgs; ++i) {
364                 cur = stream_cfg_max_n_segments(cfg->stream_cfgs[i]);
365                 ret = ret > cur? ret: cur;
366         }
367
368         return ret;
369 }