Merge "Fix latency accuracy and dumping latencies to file"
[samplevnf.git] / VNFs / DPPD-PROX / handle_genl4.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_mbuf.h>
18 #include <pcap.h>
19 #include <string.h>
20 #include <stdlib.h>
21 #include <rte_cycles.h>
22 #include <rte_version.h>
23 #include <rte_ip.h>
24 #include <rte_udp.h>
25 #include <rte_tcp.h>
26 #include <rte_hash.h>
27 #include <rte_hash_crc.h>
28
29 #include "prox_lua.h"
30 #include "prox_lua_types.h"
31 #include "prox_malloc.h"
32 #include "file_utils.h"
33 #include "hash_set.h"
34 #include "prox_assert.h"
35 #include "prox_args.h"
36 #include "defines.h"
37 #include "pkt_parser.h"
38 #include "handle_lat.h"
39 #include "task_init.h"
40 #include "task_base.h"
41 #include "prox_port_cfg.h"
42 #include "lconf.h"
43 #include "log.h"
44 #include "quit.h"
45 #include "heap.h"
46 #include "mbuf_utils.h"
47 #include "genl4_bundle.h"
48 #include "genl4_stream_udp.h"
49 #include "genl4_stream_tcp.h"
50 #include "cdf.h"
51 #include "fqueue.h"
52 #include "token_time.h"
53 #include "commands.h"
54 #include "prox_shared.h"
55
56 #if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0)
57 #define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE
58 #endif
59
60 struct new_tuple {
61         uint32_t dst_addr;
62         uint8_t proto_id;
63         uint16_t dst_port;
64         uint16_t l2_types[4];
65 } __attribute__((packed));
66
67 enum handle_state {HANDLE_QUEUED, HANDLE_SCHEDULED};
68
69 struct task_gen_server {
70         struct task_base base;
71         struct l4_stats l4_stats;
72         struct rte_mempool *mempool;
73         struct rte_hash *listen_hash;
74         /* Listening bundles contain only 1 part since the state of a
75            multi_part comm is kept mostly at the client side*/
76         struct bundle_cfg     **listen_entries;
77         struct bundle_ctx_pool bundle_ctx_pool;
78         struct bundle_cfg *bundle_cfgs; /* Loaded configurations */
79         struct token_time token_time;
80         enum handle_state handle_state;
81         struct heap *heap;
82         struct fqueue *fqueue;
83         struct rte_mbuf *cur_mbufs[MAX_PKT_BURST];
84         uint32_t cur_mbufs_beg;
85         uint32_t cur_mbufs_end;
86         uint32_t cancelled;
87         uint8_t  out_saved;
88         struct rte_mbuf *mbuf_saved;
89         uint64_t last_tsc;
90         unsigned seed;
91         /* Handle scheduled events */
92         struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
93         uint32_t n_new_mbufs;
94 };
95
96 struct task_gen_client {
97         struct task_base base;
98         struct l4_stats l4_stats;
99         struct rte_mempool *mempool;
100         struct bundle_ctx_pool bundle_ctx_pool;
101         struct bundle_cfg *bundle_cfgs; /* Loaded configurations */
102         struct token_time token_time;
103         /* Create new connections and handle scheduled events */
104         struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
105         uint32_t new_conn_cost;
106         uint32_t new_conn_tokens;
107         uint64_t new_conn_last_tsc;
108         uint32_t n_new_mbufs;
109         uint64_t last_tsc;
110         struct cdf *cdf;
111         unsigned seed;
112         struct heap *heap;
113 };
114
115 static int refill_mbufs(uint32_t *n_new_mbufs, struct rte_mempool *mempool, struct rte_mbuf **mbufs)
116 {
117         if (*n_new_mbufs == MAX_PKT_BURST)
118                 return 0;
119
120         if (rte_mempool_get_bulk(mempool, (void **)mbufs, MAX_PKT_BURST - *n_new_mbufs) < 0) {
121                 plogx_err("4Mempool alloc failed for %d mbufs\n", MAX_PKT_BURST - *n_new_mbufs);
122                 return -1;
123         }
124
125         for (uint32_t i = 0; i < MAX_PKT_BURST - *n_new_mbufs; ++i) {
126                 init_mbuf_seg(mbufs[i]);
127         }
128
129         *n_new_mbufs = MAX_PKT_BURST;
130
131         return 0;
132 }
133
134 static const struct bundle_cfg *server_accept(struct task_gen_server *task, struct new_tuple *nt)
135 {
136         int ret = rte_hash_lookup(task->listen_hash, nt);
137
138         if (ret < 0)
139                 return NULL;
140         else
141                 return task->listen_entries[ret];
142 }
143
144 static int handle_gen_bulk_client(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
145 {
146         struct task_gen_client *task = (struct task_gen_client *)tbase;
147         uint8_t out[MAX_PKT_BURST] = {0};
148         struct bundle_ctx *conn;
149         int ret;
150
151         if (n_pkts) {
152                 for (int i = 0; i < n_pkts; ++i) {
153                         struct pkt_tuple pt;
154                         struct l4_meta l4_meta;
155
156                         if (parse_pkt(mbufs[i], &pt, &l4_meta)) {
157                                 plogdx_err(mbufs[i], "Parsing failed\n");
158                                 out[i] = OUT_DISCARD;
159                                 continue;
160                         }
161
162                         ret = rte_hash_lookup(task->bundle_ctx_pool.hash, (const void *)&pt);
163
164                         if (ret < 0) {
165                                 plogx_dbg("Client: packet RX that does not belong to connection:"
166                                           "Client = "IPv4_BYTES_FMT":%d, Server = "IPv4_BYTES_FMT":%d\n",
167                                           IPv4_BYTES(((uint8_t*)&pt.dst_addr)),
168                                           rte_bswap16(pt.dst_port),
169                                           IPv4_BYTES(((uint8_t*)&pt.src_addr)),
170                                           rte_bswap16(pt.src_port));
171
172                                 plogdx_dbg(mbufs[i], NULL);
173
174                                 if (pt.proto_id == IPPROTO_TCP) {
175                                         stream_tcp_create_rst(mbufs[i], &l4_meta, &pt);
176                                         out[i] = 0;
177                                         continue;
178                                 }
179                                 else {
180                                         out[i] = OUT_DISCARD;
181                                         continue;
182                                 }
183                         }
184
185                         conn = task->bundle_ctx_pool.hash_entries[ret];
186                         ret = bundle_proc_data(conn, mbufs[i], &l4_meta, &task->bundle_ctx_pool, &task->seed, &task->l4_stats);
187                         out[i] = ret == 0? 0: OUT_HANDLED;
188                 }
189                 task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
190         }
191
192         /* If there is at least one callback to handle, handle at most MAX_PKT_BURST */
193         if (heap_top_is_lower(task->heap, rte_rdtsc())) {
194                 if (0 != refill_mbufs(&task->n_new_mbufs, task->mempool, task->new_mbufs))
195                         return 0;
196
197                 uint16_t n_called_back = 0;
198                 while (heap_top_is_lower(task->heap, rte_rdtsc()) && n_called_back < MAX_PKT_BURST) {
199                         conn = BUNDLE_CTX_UPCAST(heap_pop(task->heap));
200
201                         /* handle packet TX (retransmit or delayed transmit) */
202                         ret = bundle_proc_data(conn, task->new_mbufs[n_called_back], NULL, &task->bundle_ctx_pool, &task->seed, &task->l4_stats);
203
204                         if (ret == 0) {
205                                 out[n_called_back] = 0;
206                                 n_called_back++;
207                         }
208                 }
209                 plogx_dbg("During callback, will send %d packets\n", n_called_back);
210
211                 task->base.tx_pkt(&task->base, task->new_mbufs, n_called_back, out);
212                 task->n_new_mbufs -= n_called_back;
213         }
214
215         uint32_t n_new = task->bundle_ctx_pool.n_free_bundles;
216         n_new = n_new > MAX_PKT_BURST? MAX_PKT_BURST : n_new;
217
218         uint64_t diff = (rte_rdtsc() - task->new_conn_last_tsc)/task->new_conn_cost;
219         task->new_conn_last_tsc += diff * task->new_conn_cost;
220         task->new_conn_tokens += diff;
221
222         if (task->new_conn_tokens > 16)
223                 task->new_conn_tokens = 16;
224         if (n_new > task->new_conn_tokens)
225                 n_new = task->new_conn_tokens;
226         task->new_conn_tokens -= n_new;
227         if (n_new == 0)
228                 return 0;
229
230         if (0 != refill_mbufs(&task->n_new_mbufs, task->mempool, task->new_mbufs))
231                 return 0;
232
233         for (uint32_t i = 0; i < n_new; ++i) {
234                 struct bundle_ctx *bundle_ctx = bundle_ctx_pool_get_w_cfg(&task->bundle_ctx_pool);
235                 PROX_ASSERT(bundle_ctx);
236
237                 struct pkt_tuple *pt = &bundle_ctx->tuple;
238
239                 int n_retries = 0;
240                 do {
241                         /* Note that the actual packet sent will
242                            contain swapped addresses and ports
243                            (i.e. pkt.src <=> tuple.dst). The incoming
244                            packet will match this struct. */
245                         bundle_init(bundle_ctx, task->heap, PEER_CLIENT, &task->seed);
246
247                         ret = rte_hash_lookup(task->bundle_ctx_pool.hash, (const void *)pt);
248                         if (ret >= 0) {
249                                 if (n_retries++ == 1000) {
250                                         plogx_err("Already tried 1K times\n");
251                                 }
252                         }
253                 } while (ret >= 0);
254
255                 ret = rte_hash_add_key(task->bundle_ctx_pool.hash, (const void *)pt);
256
257                 if (ret < 0) {
258                         plogx_err("Failed to add key ret = %d, n_free = %d\n", ret, task->bundle_ctx_pool.n_free_bundles);
259                         bundle_ctx_pool_put(&task->bundle_ctx_pool, bundle_ctx);
260
261                         pkt_tuple_debug2(pt);
262                         out[i] = OUT_DISCARD;
263                         continue;
264                 }
265
266                 task->bundle_ctx_pool.hash_entries[ret] = bundle_ctx;
267
268                 if (bundle_ctx->ctx.stream_cfg->proto == IPPROTO_TCP)
269                         task->l4_stats.tcp_created++;
270                 else
271                         task->l4_stats.udp_created++;
272
273                 task->l4_stats.bundles_created++;
274
275                 ret = bundle_proc_data(bundle_ctx, task->new_mbufs[i], NULL, &task->bundle_ctx_pool, &task->seed, &task->l4_stats);
276                 out[i] = ret == 0? 0: OUT_HANDLED;
277         }
278
279         int ret2 = task->base.tx_pkt(&task->base, task->new_mbufs, n_new, out);
280         task->n_new_mbufs -= n_new;
281         return ret2;
282 }
283
284 static int handle_gen_queued(struct task_gen_server *task)
285 {
286         uint8_t out[MAX_PKT_BURST];
287         struct bundle_ctx *conn;
288         struct pkt_tuple pkt_tuple;
289         struct l4_meta l4_meta;
290         uint16_t j;
291         uint16_t cancelled = 0;
292         int ret;
293
294         if (task->cur_mbufs_beg == task->cur_mbufs_end) {
295                 task->cur_mbufs_end = fqueue_get(task->fqueue, task->cur_mbufs, MAX_PKT_BURST);
296                 task->cur_mbufs_beg = 0;
297         }
298         uint16_t n_pkts = task->cur_mbufs_end - task->cur_mbufs_beg;
299         struct rte_mbuf **mbufs = task->cur_mbufs + task->cur_mbufs_beg;
300
301         j = task->cancelled;
302         if (task->cancelled) {
303                 uint16_t pkt_len = mbuf_wire_size(mbufs[0]);
304
305                 if (token_time_take(&task->token_time, pkt_len) != 0)
306                         return -1;
307
308                 out[0] = task->out_saved;
309                 task->cancelled = 0;
310         }
311
312         /* Main proc loop */
313         for (; j < n_pkts; ++j) {
314
315                 if (parse_pkt(mbufs[j], &pkt_tuple, &l4_meta)) {
316                         plogdx_err(mbufs[j], "Unknown packet, parsing failed\n");
317                         out[j] = OUT_DISCARD;
318                 }
319
320                 conn = NULL;
321                 ret = rte_hash_lookup(task->bundle_ctx_pool.hash, (const void *)&pkt_tuple);
322
323                 if (ret >= 0)
324                         conn = task->bundle_ctx_pool.hash_entries[ret];
325                 else {
326                         /* If not part of existing connection, try to create a connection */
327                         struct new_tuple nt;
328                         nt.dst_addr = pkt_tuple.dst_addr;
329                         nt.proto_id = pkt_tuple.proto_id;
330                         nt.dst_port = pkt_tuple.dst_port;
331                         rte_memcpy(nt.l2_types, pkt_tuple.l2_types, sizeof(nt.l2_types));
332                         const struct bundle_cfg *n;
333
334                         if (NULL != (n = server_accept(task, &nt))) {
335                                 conn = bundle_ctx_pool_get(&task->bundle_ctx_pool);
336                                 if (!conn) {
337                                         out[j] = OUT_DISCARD;
338                                         plogx_err("No more free bundles to accept new connection\n");
339                                         continue;
340                                 }
341                                 ret = rte_hash_add_key(task->bundle_ctx_pool.hash, (const void *)&pkt_tuple);
342                                 if (ret < 0) {
343                                         out[j] = OUT_DISCARD;
344                                         bundle_ctx_pool_put(&task->bundle_ctx_pool, conn);
345                                         plog_err("Adding key failed while trying to accept connection\n");
346                                         continue;
347                                 }
348
349                                 task->bundle_ctx_pool.hash_entries[ret] = conn;
350
351                                 bundle_init_w_cfg(conn, n, task->heap, PEER_SERVER, &task->seed);
352                                 conn->tuple = pkt_tuple;
353
354                                 if (conn->ctx.stream_cfg->proto == IPPROTO_TCP)
355                                         task->l4_stats.tcp_created++;
356                                 else
357                                         task->l4_stats.udp_created++;
358                         }
359                         else {
360                                 plog_err("Packet received for service that does not exist :\n"
361                                          "source ip = %0x:%u\n"
362                                          "dst ip    = %0x:%u\n",
363                                          pkt_tuple.src_addr, rte_bswap16(pkt_tuple.src_port),
364                                          pkt_tuple.dst_addr, rte_bswap16(pkt_tuple.dst_port));
365                         }
366                 }
367
368                 /* bundle contains either an active connection or a
369                    newly created connection. If it is NULL, then not
370                    listening. */
371                 if (NULL != conn) {
372                         ret = bundle_proc_data(conn, mbufs[j], &l4_meta, &task->bundle_ctx_pool, &task->seed, &task->l4_stats);
373
374                         out[j] = ret == 0? 0: OUT_HANDLED;
375
376                         if (ret == 0) {
377                                 uint16_t pkt_len = mbuf_wire_size(mbufs[j]);
378
379                                 if (token_time_take(&task->token_time, pkt_len) != 0) {
380                                         task->out_saved = out[j];
381                                         task->cancelled = 1;
382                                         task->base.tx_pkt(&task->base, mbufs, j, out);
383                                         task->cur_mbufs_beg += j;
384                                         return -1;
385                                 }
386                         }
387                 }
388                 else {
389                         pkt_tuple_debug(&pkt_tuple);
390                         plogd_dbg(mbufs[j], NULL);
391                         out[j] = OUT_DISCARD;
392                 }
393         }
394
395         task->base.tx_pkt(&task->base, mbufs, j, out);
396
397         task->cur_mbufs_beg += j;
398         return 0;
399 }
400
401 static int handle_gen_scheduled(struct task_gen_server *task)
402 {
403         struct bundle_ctx *conn;
404         uint8_t out[MAX_PKT_BURST];
405         int ret;
406         uint16_t n_called_back = 0;
407
408         if (task->cancelled) {
409                 struct rte_mbuf *mbuf = task->mbuf_saved;
410
411                 uint16_t pkt_len = mbuf_wire_size(mbuf);
412                 if (token_time_take(&task->token_time, pkt_len) == 0) {
413                         task->cancelled = 0;
414                         out[0] = 0;
415                         task->base.tx_pkt(&task->base, &mbuf, 1, out);
416                 }
417                 else {
418                         return -1;
419                 }
420         }
421
422         if (0 != refill_mbufs(&task->n_new_mbufs, task->mempool, task->new_mbufs))
423                 return -1;
424
425         conn = NULL;
426         while (heap_top_is_lower(task->heap, rte_rdtsc()) && n_called_back < task->n_new_mbufs) {
427                 conn = BUNDLE_CTX_UPCAST(heap_pop(task->heap));
428
429                 /* handle packet TX (retransmit or delayed transmit) */
430                 ret = bundle_proc_data(conn, task->new_mbufs[n_called_back], NULL, &task->bundle_ctx_pool, &task->seed, &task->l4_stats);
431
432                 if (ret == 0) {
433                         struct rte_mbuf *mbuf = task->new_mbufs[n_called_back];
434                         uint16_t pkt_len = mbuf_wire_size(mbuf);
435
436                         if (token_time_take(&task->token_time, pkt_len) == 0) {
437                                 out[n_called_back] = 0;
438                                 n_called_back++;
439                         }
440                         else {
441
442                                 struct ether_hdr *eth = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
443                                 struct ipv4_hdr *ip = (struct ipv4_hdr*)(eth + 1);
444                                 struct tcp_hdr *tcp = (struct tcp_hdr*)(ip + 1);
445
446                                 task->out_saved = 0;
447                                 task->cancelled = 1;
448                                 task->mbuf_saved = mbuf;
449                                 task->base.tx_pkt(&task->base, task->new_mbufs, n_called_back, out);
450                                 /* The mbuf that is currently been
451                                    processed (and which has been
452                                    cancelled) is saved in
453                                    task->mbuf_saved. It will be
454                                    restored as the first mbuf when
455                                    this function is called again. */
456                                 task->n_new_mbufs -= (n_called_back + 1);
457                                 return -1;
458                         }
459                 }
460         }
461
462         task->base.tx_pkt(&task->base, task->new_mbufs, n_called_back, out);
463         task->n_new_mbufs -= n_called_back;
464
465         return 0;
466 }
467
468 static int handle_gen_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
469 {
470         struct task_gen_server *task = (struct task_gen_server *)tbase;
471         struct bundle_ctx *conn;
472         int ret, ret2 = 0;
473
474         token_time_update(&task->token_time, rte_rdtsc());
475
476         if ((ret = fqueue_put(task->fqueue, mbufs, n_pkts)) != n_pkts) {
477                 uint8_t out[MAX_PKT_BURST];
478                 for (uint16_t j = 0; j < n_pkts - ret; ++j)
479                         out[j] = OUT_DISCARD;
480
481                 ret2 = task->base.tx_pkt(&task->base, mbufs + ret, n_pkts - ret, out);
482         }
483         if (task->handle_state == HANDLE_QUEUED) {
484                 if (handle_gen_queued(task) == 0) {
485                         if (handle_gen_scheduled(task) != 0)
486                                 task->handle_state = HANDLE_SCHEDULED;
487                 }
488         }
489         else {
490                 if (handle_gen_scheduled(task) == 0) {
491                         if (handle_gen_queued(task) != 0)
492                                 task->handle_state = HANDLE_QUEUED;
493                 }
494         }
495         return ret2;
496 }
497
498 static int lua_to_host_set(struct lua_State *L, enum lua_place from, const char *name, struct host_set *h)
499 {
500         int pop;
501         if ((pop = lua_getfrom(L, from, name)) < 0)
502                 return -1;
503
504         if (!lua_istable(L, -1))
505                 return -1;
506
507         uint32_t port = 0, port_mask = 0;
508
509         if (lua_to_ip(L, TABLE, "ip", &h->ip) || lua_to_int(L, TABLE, "port", &port))
510                 return -1;
511
512         if (lua_to_int(L, TABLE, "ip_mask", &h->ip_mask))
513                 h->ip_mask = 0;
514         if (lua_to_int(L, TABLE, "port_mask", &port_mask))
515                 h->port_mask = 0;
516
517         h->port = rte_bswap16(port);
518         h->port_mask = rte_bswap16(port_mask);
519         h->ip = rte_bswap32(h->ip);
520         h->ip_mask = rte_bswap32(h->ip_mask);
521
522         lua_pop(L, pop);
523         return 0;
524 }
525
526 static int file_read_cached(const char *file_name, uint8_t **mem, uint32_t beg, uint32_t len, uint32_t socket, struct hash_set *hs)
527 {
528         if (len == 0) {
529                 *mem = 0;
530                 return 0;
531         }
532
533         uint8_t *data_mem;
534
535         /* Since the configuration can reference the same file from
536            multiple places, use prox_shared infrastructure to detect
537            this and return previously loaded data. */
538         char name[256];
539
540         snprintf(name, sizeof(name), "%u-%u:%s", beg, len, file_name);
541         *mem = prox_sh_find_socket(socket, name);
542         if (*mem)
543                 return 0;
544
545         /* check if the file has been loaded on the other socket. */
546         if (socket == 1 && (data_mem = prox_sh_find_socket(0, name))) {
547                 uint8_t *data_find = hash_set_find(hs, data_mem, len);
548                 if (!data_find) {
549                         data_find = prox_zmalloc(len, socket);
550                         PROX_PANIC(data_find == NULL, "Failed to allocate memory (%u bytes) to hold header for peer\n", len);
551
552                         rte_memcpy(data_find, data_mem, len);
553                         hash_set_add(hs, data_find, len);
554                 }
555                 *mem = data_find;
556                 prox_sh_add_socket(socket, name, *mem);
557                 return 0;
558         }
559
560         /* It is possible that a file with a different name contains
561            the same data. In that case, search all loaded files and
562            compare the data to reduce memory utilization.*/
563         data_mem = malloc(len);
564         PROX_PANIC(data_mem == NULL, "Failed to allocate temporary memory to hold data\n");
565
566         if (file_read_content(file_name, data_mem, beg, len)) {
567                 plog_err("%s\n", file_get_error());
568                 return -1;
569         }
570
571         uint8_t *data_find = hash_set_find(hs, data_mem, len);
572         if (!data_find) {
573                 data_find = prox_zmalloc(len, socket);
574                 PROX_PANIC(data_find == NULL, "Failed to allocate memory (%u bytes) to hold header for peer\n", len);
575
576                 rte_memcpy(data_find, data_mem, len);
577                 hash_set_add(hs, data_find, len);
578         }
579
580         free(data_mem);
581
582         *mem = data_find;
583         prox_sh_add_socket(socket, name, *mem);
584         return 0;
585 }
586
587 static int lua_to_peer_data(struct lua_State *L, enum lua_place from, const char *name, uint32_t socket, struct peer_data *peer_data, size_t *cl, struct hash_set *hs)
588 {
589         uint32_t hdr_len, hdr_beg, content_len, content_beg;
590         char hdr_file[256], content_file[256];
591         int pop;
592
593         if ((pop = lua_getfrom(L, from, name)) < 0)
594                 return -1;
595
596         if (!lua_istable(L, -1))
597                 return -1;
598
599         if (lua_getfrom(L, TABLE, "header") < 0)
600                 return -1;
601         if (lua_to_int(L, TABLE, "len", &hdr_len) < 0)
602                 return -1;
603         if (lua_to_int(L, TABLE, "beg", &hdr_beg) < 0)
604                 return -1;
605         if (lua_to_string(L, TABLE, "file_name", hdr_file, sizeof(hdr_file)) < 0)
606                 return -1;
607         lua_pop(L, 1);
608
609         if (lua_getfrom(L, TABLE, "content") < 0)
610                 return -1;
611         if (lua_to_int(L, TABLE, "len", &content_len) < 0)
612                 return -1;
613         if (lua_to_int(L, TABLE, "beg", &content_beg) < 0)
614                 return -1;
615         if (lua_to_string(L, TABLE, "file_name", content_file, sizeof(content_file)) < 0)
616                 return -1;
617         lua_pop(L, 1);
618
619         if (hdr_len == UINT32_MAX) {
620                 long ret = file_get_size(hdr_file);
621
622                 if (ret < 0) {
623                         plog_err("%s", file_get_error());
624                         return -1;
625                 }
626                 hdr_len = ret - hdr_beg;
627         }
628
629         if (content_len == UINT32_MAX) {
630                 long ret = file_get_size(content_file);
631
632                 if (ret < 0) {
633                         plog_err("%s", file_get_error());
634                         return -1;
635                 }
636                 content_len = ret - content_beg;
637         }
638         *cl = content_len;
639         peer_data->hdr_len = hdr_len;
640
641         if (file_read_cached(hdr_file, &peer_data->hdr, hdr_beg, hdr_len, socket, hs))
642                 return -1;
643         if (file_read_cached(content_file, &peer_data->content, content_beg, content_len, socket, hs))
644                 return -1;
645
646         lua_pop(L, pop);
647         return 0;
648 }
649
650 static int lua_to_peer_action(struct lua_State *L, enum lua_place from, const char *name, struct peer_action *action, size_t client_contents_len, size_t server_contents_len)
651 {
652         int pop;
653
654         if ((pop = lua_getfrom(L, from, name)) < 0)
655                 return -1;
656
657         if (!lua_istable(L, -1))
658                 return -1;
659
660         uint32_t peer, beg, len;
661         if (lua_to_int(L, TABLE, "peer", &peer) ||
662             lua_to_int(L, TABLE, "beg", &beg) ||
663             lua_to_int(L, TABLE, "len", &len)) {
664                 return -1;
665         }
666         size_t data_len = (peer == PEER_CLIENT? client_contents_len : server_contents_len);
667         if (len == (uint32_t)-1)
668                 len = data_len - beg;
669
670         PROX_PANIC(beg + len > data_len, "Accessing data past the end (starting at %u for %u bytes) while total length is %zu\n", beg, len, data_len);
671
672         action->peer = peer;
673         action->beg = beg;
674         action->len = len;
675         lua_pop(L, pop);
676         return 0;
677 }
678
679 static int lua_to_stream_cfg(struct lua_State *L, enum lua_place from, const char *name, uint32_t socket, struct stream_cfg **stream_cfg, struct hash_set *hs)
680 {
681         int pop;
682         struct stream_cfg *ret;
683
684         if ((pop = lua_getfrom(L, from, name)) < 0)
685                 return -1;
686
687         if (lua_getfrom(L, TABLE, "actions") < 0)
688                 return -1;
689
690         lua_len(prox_lua(), -1);
691         uint32_t n_actions = lua_tointeger(prox_lua(), -1);
692         lua_pop(prox_lua(), 1);
693
694         lua_pop(L, 1);
695
696         size_t mem_size = 0;
697         mem_size += sizeof(*ret);
698         /* one additional action is allocated to allow inserting an
699            additional "default" action to close down TCP sessions from
700            the client side. */
701         mem_size += sizeof(ret->actions[0]) * (n_actions + 1);
702
703         ret = prox_zmalloc(sizeof(*ret) + mem_size, socket);
704         ret->n_actions = n_actions;
705
706         size_t client_contents_len, server_contents_len;
707         char proto[16];
708         uint32_t timeout_us, timeout_time_wait_us;
709         plogx_dbg("loading stream\n");
710         if (lua_to_host_set(L, TABLE, "servers", &ret->servers))
711                 return -1;
712         if (lua_to_string(L, TABLE, "l4_proto", proto, sizeof(proto)))
713                 return -1;
714         if (lua_to_peer_data(L, TABLE, "client_data", socket, &ret->data[PEER_CLIENT], &client_contents_len, hs))
715                 return -1;
716         if (lua_to_peer_data(L, TABLE, "server_data", socket, &ret->data[PEER_SERVER], &server_contents_len, hs))
717                 return -1;
718
719         if (lua_to_int(L, TABLE, "timeout", &timeout_us)) {
720                 timeout_us = 1000000;
721         }
722
723         ret->tsc_timeout = usec_to_tsc(timeout_us);
724
725         double up, dn;
726
727         if (lua_to_double(L, TABLE, "up_bps", &up))
728                 up = 5000;// Default rate is 40 Mbps
729
730         if (lua_to_double(L, TABLE, "dn_bps", &dn))
731                 dn = 5000;// Default rate is 40 Mbps
732
733         const uint64_t hz = rte_get_tsc_hz();
734
735         ret->tt_cfg[PEER_CLIENT] = token_time_cfg_create(up, hz, ETHER_MAX_LEN + 20);
736         ret->tt_cfg[PEER_SERVER] = token_time_cfg_create(dn, hz, ETHER_MAX_LEN + 20);
737
738         if (!strcmp(proto, "tcp")) {
739                 ret->proto = IPPROTO_TCP;
740                 ret->proc = stream_tcp_proc;
741                 ret->is_ended = stream_tcp_is_ended;
742
743                 if (lua_to_int(L, TABLE, "timeout_time_wait", &timeout_time_wait_us)) {
744                         timeout_time_wait_us = 2000000;
745                 }
746
747                 ret->tsc_timeout_time_wait = usec_to_tsc(timeout_time_wait_us);
748         }
749         else if (!strcmp(proto, "udp")) {
750                 plogx_dbg("loading UDP\n");
751                 ret->proto = IPPROTO_UDP;
752                 ret->proc = stream_udp_proc;
753                 ret->is_ended = stream_udp_is_ended;
754         }
755         else
756                 return -1;
757
758         /* get all actions */
759         if (lua_getfrom(L, TABLE, "actions") < 0)
760                 return -1;
761
762         uint32_t idx = 0;
763         lua_pushnil(L);
764         while (lua_next(L, -2)) {
765                 if (lua_to_peer_action(L, STACK, NULL, &ret->actions[idx], client_contents_len, server_contents_len))
766                         return -1;
767
768                 stream_cfg_verify_action(ret, &ret->actions[idx]);
769
770                 idx++;
771
772                 lua_pop(L, 1);
773         }
774         lua_pop(L, 1);
775
776         /* For TCP, one of the peers initiates closing down the
777            connection. This is signified by the last action having
778            with zero length. If such an action is not specified in the
779            configuration file, the default is for the client to close
780            the connection. This means that the TCP connection at the
781            client will go into a TIME_WAIT state and the server
782            releases all the resources avoiding resource starvation at
783            the server. */
784         if (ret->proto == IPPROTO_TCP && ret->actions[ret->n_actions - 1].len != 0) {
785                 ret->actions[ret->n_actions].len = 0;
786                 ret->actions[ret->n_actions].beg = 0;
787                 ret->actions[ret->n_actions].peer = PEER_CLIENT;
788                 ret->n_actions++;
789         }
790
791         if (IPPROTO_TCP == ret->proto)
792                 stream_tcp_calc_len(ret, &ret->n_pkts, &ret->n_bytes);
793         else
794                 stream_udp_calc_len(ret, &ret->n_pkts, &ret->n_bytes);
795
796         lua_pop(L, pop);
797         *stream_cfg = ret;
798         return 0;
799 }
800
801 static int lua_to_bundle_cfg(struct lua_State *L, enum lua_place from, const char *name, uint8_t socket, struct bundle_cfg *bundle, struct hash_set *hs)
802 {
803         int pop, pop2, idx;
804         int clients_loaded = 0;
805
806         if ((pop = lua_getfrom(L, from, name)) < 0)
807                 return -1;
808
809         if (!lua_istable(L, -1))
810                 return -1;
811
812         lua_len(prox_lua(), -1);
813         bundle->n_stream_cfgs = lua_tointeger(prox_lua(), -1);
814         lua_pop(prox_lua(), 1);
815
816         bundle->stream_cfgs = prox_zmalloc(sizeof(*bundle->stream_cfgs) * bundle->n_stream_cfgs, socket);
817
818         plogx_dbg("loading bundle cfg with %d streams\n", bundle->n_stream_cfgs);
819         idx = 0;
820         lua_pushnil(L);
821         while (lua_next(L, -2)) {
822                 if (!clients_loaded) {
823                         if (lua_to_host_set(L, TABLE, "clients", &bundle->clients)) {
824                                 return -1;
825                         }
826                         clients_loaded = 1;
827                 }
828                 if (lua_to_stream_cfg(L, STACK, NULL, socket, &bundle->stream_cfgs[idx], hs)) {
829                         return -1;
830                 }
831
832                 ++idx;
833                 lua_pop(L, 1);
834         }
835
836         lua_pop(L, pop);
837         return 0;
838 }
839
840 static void init_task_gen(struct task_base *tbase, struct task_args *targ)
841 {
842         struct task_gen_server *task = (struct task_gen_server *)tbase;
843         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
844
845         static char name[] = "server_mempool";
846         name[0]++;
847         task->mempool = rte_mempool_create(name,
848                                            4*1024 - 1, MBUF_SIZE,
849                                            targ->nb_cache_mbuf,
850                                            sizeof(struct rte_pktmbuf_pool_private),
851                                            rte_pktmbuf_pool_init, NULL,
852                                            rte_pktmbuf_init, 0,
853                                            socket_id, 0);
854         PROX_PANIC(task->mempool == NULL, "Failed to allocate memory pool with %u elements\n", 4*1024 - 1);
855         int pop = lua_getfrom(prox_lua(), GLOBAL, targ->streams);
856         PROX_PANIC(pop < 0, "Failed to find '%s' in lua\n", targ->streams);
857
858         lua_len(prox_lua(), -1);
859         uint32_t n_listen = lua_tointeger(prox_lua(), -1);
860         lua_pop(prox_lua(), 1);
861         PROX_PANIC(n_listen == 0, "No services specified to listen on\n");
862
863         task->bundle_cfgs = prox_zmalloc(n_listen * sizeof(task->bundle_cfgs[0]), socket_id);
864
865         plogx_info("n_listen = %d\n", n_listen);
866
867         struct hash_set *hs = prox_sh_find_socket(socket_id, "genl4_streams");
868         if (hs == NULL) {
869                 /* Expected number of streams per bundle = 1, hash_set
870                    will grow if full. */
871                 hs = hash_set_create(n_listen, socket_id);
872                 prox_sh_add_socket(socket_id, "genl4_streams", hs);
873         }
874
875         const struct rte_hash_parameters listen_table = {
876                 .name = name,
877                 .entries = n_listen * 4,
878                 .key_len = sizeof(struct new_tuple),
879                 .hash_func = rte_hash_crc,
880                 .hash_func_init_val = 0,
881                 .socket_id = socket_id,
882         };
883         name[0]++;
884
885         task->listen_hash = rte_hash_create(&listen_table);
886         task->listen_entries = prox_zmalloc(listen_table.entries * sizeof(task->listen_entries[0]), socket_id);
887
888         int idx = 0;
889         lua_pushnil(prox_lua());
890         while (lua_next(prox_lua(), -2)) {
891                 task->bundle_cfgs[idx].n_stream_cfgs = 1;
892                 task->bundle_cfgs[idx].stream_cfgs = prox_zmalloc(sizeof(*task->bundle_cfgs[idx].stream_cfgs), socket_id);
893                 int ret = lua_to_stream_cfg(prox_lua(), STACK, NULL, socket_id, &task->bundle_cfgs[idx].stream_cfgs[0], hs);
894                 PROX_PANIC(ret, "Failed to load stream cfg\n");
895                 struct stream_cfg *stream = task->bundle_cfgs[idx].stream_cfgs[0];
896
897                 // TODO: check mask and add to hash for each host
898                 struct new_tuple nt = {
899                         .dst_addr = stream->servers.ip,
900                         .proto_id = stream->proto,
901                         .dst_port = stream->servers.port,
902                         .l2_types[0] = 0x0008,
903                 };
904
905                 ret = rte_hash_add_key(task->listen_hash, &nt);
906                 PROX_PANIC(ret < 0, "Failed to add\n");
907
908                 task->listen_entries[ret] = &task->bundle_cfgs[idx];
909
910                 plogx_dbg("Server = "IPv4_BYTES_FMT":%d\n", IPv4_BYTES(((uint8_t*)&nt.dst_addr)), rte_bswap16(nt.dst_port));
911                 ++idx;
912                 lua_pop(prox_lua(), 1);
913         }
914
915         static char name2[] = "task_gen_hash2";
916
917         name2[0]++;
918         plogx_dbg("Creating bundle ctx pool\n");
919         if (bundle_ctx_pool_create(name2, targ->n_concur_conn * 2, &task->bundle_ctx_pool, NULL, 0, NULL, socket_id)) {
920                 cmd_mem_stats();
921                 PROX_PANIC(1, "Failed to create conn_ctx_pool\n");
922         }
923
924         task->heap = heap_create(targ->n_concur_conn * 2, socket_id);
925         task->seed = rte_rdtsc();
926
927         /* TODO: calculate the CDF of the reply distribution and the
928            number of replies as the number to cover for 99% of the
929            replies. For now, assume that this is number is 2. */
930         uint32_t queue_size = rte_align32pow2(targ->n_concur_conn * 2);
931
932         PROX_PANIC(queue_size == 0, "Overflow resulted in queue size 0\n");
933         task->fqueue = fqueue_create(queue_size, socket_id);
934         PROX_PANIC(task->fqueue == NULL, "Failed to allocate local queue\n");
935
936         uint32_t n_descriptors;
937
938         if (targ->nb_txports) {
939                 PROX_PANIC(targ->nb_txports != 1, "Need exactly one TX port for L4 generation\n");
940                 n_descriptors = prox_port_cfg[targ->tx_port_queue[0].port].n_txd;
941         } else {
942                 PROX_PANIC(targ->nb_txrings != 1, "Need exactly one TX ring for L4 generation\n");
943                 n_descriptors = 256;
944         }
945
946         struct token_time_cfg tt_cfg = {
947                 .bpp = targ->rate_bps,
948                 .period = rte_get_tsc_hz(),
949                 .bytes_max = n_descriptors * (ETHER_MIN_LEN + 20),
950         };
951
952         token_time_init(&task->token_time, &tt_cfg);
953 }
954
955 static void init_task_gen_client(struct task_base *tbase, struct task_args *targ)
956 {
957         struct task_gen_client *task = (struct task_gen_client *)tbase;
958         static char name[] = "gen_pool";
959         const uint32_t socket = rte_lcore_to_socket_id(targ->lconf->id);
960         name[0]++;
961         task->mempool = rte_mempool_create(name,
962                                            4*1024 - 1, MBUF_SIZE,
963                                            targ->nb_cache_mbuf,
964                                            sizeof(struct rte_pktmbuf_pool_private),
965                                            rte_pktmbuf_pool_init, NULL,
966                                            rte_pktmbuf_init, 0,
967                                            socket, 0);
968         PROX_PANIC(task->mempool == NULL, "Failed to allocate memory pool with %u elements\n", 4*1024 - 1);
969
970         /* streams contains a lua table. Go through it and read each
971            stream with associated imix_fraction. */
972         uint32_t imix;
973         uint32_t i = 0;
974
975         int pop = lua_getfrom(prox_lua(), GLOBAL, targ->streams);
976         PROX_PANIC(pop < 0, "Failed to find '%s' in lua\n", targ->streams);
977
978         lua_len(prox_lua(), -1);
979         uint32_t n_bundle_cfgs = lua_tointeger(prox_lua(), -1);
980         lua_pop(prox_lua(), 1);
981         PROX_PANIC(n_bundle_cfgs == 0, "No configs specified\n");
982         plogx_info("loading %d bundle_cfgs\n", n_bundle_cfgs);
983
984         struct hash_set *hs = prox_sh_find_socket(socket, "genl4_streams");
985         if (hs == NULL) {
986                 /* Expected number of streams per bundle = 8, hash_set
987                    will grow if full. */
988                 hs = hash_set_create(n_bundle_cfgs * 8, socket);
989                 prox_sh_add_socket(socket, "genl4_streams", hs);
990         }
991
992         task->bundle_cfgs = prox_zmalloc(n_bundle_cfgs * sizeof(task->bundle_cfgs[0]), socket);
993         lua_pushnil(prox_lua());
994
995         int total_imix = 0;
996
997         uint32_t *occur = prox_zmalloc(n_bundle_cfgs * sizeof(*occur), socket);
998         struct cdf *cdf = cdf_create(n_bundle_cfgs, socket);
999
1000         while (lua_next(prox_lua(), -2)) {
1001                 PROX_PANIC(lua_to_int(prox_lua(), TABLE, "imix_fraction", &imix) ||
1002                            lua_to_bundle_cfg(prox_lua(), TABLE, "bundle", socket, &task->bundle_cfgs[i], hs),
1003                            "Failed to load bundle cfg:\n%s\n", get_lua_to_errors());
1004                 cdf_add(cdf, imix);
1005                 occur[i] = imix;
1006                 total_imix += imix;
1007                 ++i;
1008                 lua_pop(prox_lua(), 1);
1009         }
1010
1011         lua_pop(prox_lua(), pop);
1012         cdf_setup(cdf);
1013
1014         PROX_PANIC(targ->max_setup_rate == 0, "Max setup rate not set\n");
1015
1016         task->new_conn_cost = rte_get_tsc_hz()/targ->max_setup_rate;
1017
1018         static char name2[] = "task_gen_hash";
1019         name2[0]++;
1020         plogx_dbg("Creating bundle ctx pool\n");
1021         if (bundle_ctx_pool_create(name2, targ->n_concur_conn, &task->bundle_ctx_pool, occur, n_bundle_cfgs, task->bundle_cfgs, socket)) {
1022                 cmd_mem_stats();
1023                 PROX_PANIC(1, "Failed to create conn_ctx_pool\n");
1024         }
1025
1026         task->heap = heap_create(targ->n_concur_conn, socket);
1027         task->seed = rte_rdtsc();
1028         /* task->token_time.bytes_max = MAX_PKT_BURST * (ETHER_MAX_LEN + 20); */
1029
1030         /* To avoid overflowing the tx descriptors, the token bucket
1031            size needs to be limited. The descriptors are filled most
1032            quickly with the smallest packets. For that reason, the
1033            token bucket size is given by "number of tx descriptors" *
1034            "smallest Ethernet packet". */
1035         PROX_ASSERT(targ->nb_txports == 1);
1036
1037         struct token_time_cfg tt_cfg = {
1038                 .bpp = targ->rate_bps,
1039                 .period = rte_get_tsc_hz(),
1040                 .bytes_max = prox_port_cfg[targ->tx_port_queue[0].port].n_txd * (ETHER_MIN_LEN + 20),
1041         };
1042
1043         token_time_init(&task->token_time, &tt_cfg);
1044 }
1045
1046 static void start_task_gen_client(struct task_base *tbase)
1047 {
1048         struct task_gen_client *task = (struct task_gen_client *)tbase;
1049
1050         token_time_reset(&task->token_time, rte_rdtsc(), 0);
1051
1052         task->new_conn_tokens = 0;
1053         task->new_conn_last_tsc = rte_rdtsc();
1054 }
1055
1056 static void stop_task_gen_client(struct task_base *tbase)
1057 {
1058         struct task_gen_client *task = (struct task_gen_client *)tbase;
1059         struct bundle_ctx *bundle;
1060
1061         while (!heap_is_empty(task->heap)) {
1062                 bundle = BUNDLE_CTX_UPCAST(heap_pop(task->heap));
1063                 bundle_expire(bundle, &task->bundle_ctx_pool, &task->l4_stats);
1064         }
1065 }
1066
1067 static void start_task_gen_server(struct task_base *tbase)
1068 {
1069         struct task_gen_server *task = (struct task_gen_server *)tbase;
1070
1071         token_time_reset(&task->token_time, rte_rdtsc(), 0);
1072 }
1073
1074 static void stop_task_gen_server(struct task_base *tbase)
1075 {
1076         struct task_gen_server *task = (struct task_gen_server *)tbase;
1077         struct bundle_ctx *bundle;
1078         uint8_t out[MAX_PKT_BURST];
1079
1080         while (!heap_is_empty(task->heap)) {
1081                 bundle = BUNDLE_CTX_UPCAST(heap_pop(task->heap));
1082                 bundle_expire(bundle, &task->bundle_ctx_pool, &task->l4_stats);
1083         }
1084
1085         if (task->cancelled) {
1086                 struct rte_mbuf *mbuf = task->mbuf_saved;
1087
1088                 out[0] = OUT_DISCARD;
1089                 task->cancelled = 0;
1090                 task->base.tx_pkt(&task->base, &mbuf, 1, out);
1091         }
1092
1093         do {
1094                 if (task->cur_mbufs_beg == task->cur_mbufs_end) {
1095                         task->cur_mbufs_end = fqueue_get(task->fqueue, task->cur_mbufs, MAX_PKT_BURST);
1096                         task->cur_mbufs_beg = 0;
1097                         if (task->cur_mbufs_end == 0)
1098                                 break;
1099                 }
1100                 uint16_t n_pkts = task->cur_mbufs_end - task->cur_mbufs_beg;
1101                 struct rte_mbuf **mbufs = task->cur_mbufs + task->cur_mbufs_beg;
1102
1103                 if (n_pkts) {
1104                         for (uint16_t j = 0; j < n_pkts; ++j) {
1105                                 out[j] = OUT_DISCARD;
1106                         }
1107                         task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
1108                 }
1109         } while (1);
1110 }
1111
1112 static struct task_init task_init_gen1 = {
1113         .mode_str = "genl4",
1114         .sub_mode_str = "server",
1115         .init = init_task_gen,
1116         .handle = handle_gen_bulk,
1117         .start = start_task_gen_server,
1118         .stop = stop_task_gen_server,
1119         .flag_features = TASK_FEATURE_ZERO_RX,
1120         .size = sizeof(struct task_gen_server),
1121         .mbuf_size = 2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM,
1122 };
1123
1124 static struct task_init task_init_gen2 = {
1125         .mode_str = "genl4",
1126         .init = init_task_gen_client,
1127         .handle = handle_gen_bulk_client,
1128         .start = start_task_gen_client,
1129         .stop = stop_task_gen_client,
1130         .flag_features = TASK_FEATURE_ZERO_RX,
1131         .size = sizeof(struct task_gen_client),
1132         .mbuf_size = 2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM,
1133 };
1134
1135 __attribute__((constructor)) static void reg_task_gen(void)
1136 {
1137         reg_task(&task_init_gen1);
1138         reg_task(&task_init_gen2);
1139 }