2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
21 #include <rte_cycles.h>
22 #include <rte_version.h>
27 #include <rte_hash_crc.h>
30 #include "prox_lua_types.h"
31 #include "prox_malloc.h"
32 #include "file_utils.h"
34 #include "prox_assert.h"
35 #include "prox_args.h"
37 #include "pkt_parser.h"
38 #include "handle_lat.h"
39 #include "task_init.h"
40 #include "task_base.h"
41 #include "prox_port_cfg.h"
46 #include "mbuf_utils.h"
47 #include "genl4_bundle.h"
48 #include "genl4_stream_udp.h"
49 #include "genl4_stream_tcp.h"
52 #include "token_time.h"
54 #include "prox_shared.h"
56 #if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0)
57 #define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE
65 } __attribute__((packed));
67 enum handle_state {HANDLE_QUEUED, HANDLE_SCHEDULED};
69 struct task_gen_server {
70 struct task_base base;
71 struct l4_stats l4_stats;
72 struct rte_mempool *mempool;
73 struct rte_hash *listen_hash;
74 /* Listening bundles contain only 1 part since the state of a
75 multi_part comm is kept mostly at the client side*/
76 struct bundle_cfg **listen_entries;
77 struct bundle_ctx_pool bundle_ctx_pool;
78 struct bundle_cfg *bundle_cfgs; /* Loaded configurations */
79 struct token_time token_time;
80 enum handle_state handle_state;
82 struct fqueue *fqueue;
83 struct rte_mbuf *cur_mbufs[MAX_PKT_BURST];
84 uint32_t cur_mbufs_beg;
85 uint32_t cur_mbufs_end;
88 struct rte_mbuf *mbuf_saved;
91 /* Handle scheduled events */
92 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
96 struct task_gen_client {
97 struct task_base base;
98 struct l4_stats l4_stats;
99 struct rte_mempool *mempool;
100 struct bundle_ctx_pool bundle_ctx_pool;
101 struct bundle_cfg *bundle_cfgs; /* Loaded configurations */
102 struct token_time token_time;
103 /* Create new connections and handle scheduled events */
104 struct rte_mbuf *new_mbufs[MAX_PKT_BURST];
105 uint32_t new_conn_cost;
106 uint32_t new_conn_tokens;
107 uint64_t new_conn_last_tsc;
108 uint32_t n_new_mbufs;
115 static int refill_mbufs(uint32_t *n_new_mbufs, struct rte_mempool *mempool, struct rte_mbuf **mbufs)
117 if (*n_new_mbufs == MAX_PKT_BURST)
120 if (rte_mempool_get_bulk(mempool, (void **)mbufs, MAX_PKT_BURST - *n_new_mbufs) < 0) {
121 plogx_err("4Mempool alloc failed for %d mbufs\n", MAX_PKT_BURST - *n_new_mbufs);
125 for (uint32_t i = 0; i < MAX_PKT_BURST - *n_new_mbufs; ++i) {
126 init_mbuf_seg(mbufs[i]);
129 *n_new_mbufs = MAX_PKT_BURST;
134 static const struct bundle_cfg *server_accept(struct task_gen_server *task, struct new_tuple *nt)
136 int ret = rte_hash_lookup(task->listen_hash, nt);
141 return task->listen_entries[ret];
144 static int handle_gen_bulk_client(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
146 struct task_gen_client *task = (struct task_gen_client *)tbase;
147 uint8_t out[MAX_PKT_BURST] = {0};
148 struct bundle_ctx *conn;
152 for (int i = 0; i < n_pkts; ++i) {
154 struct l4_meta l4_meta;
156 if (parse_pkt(mbufs[i], &pt, &l4_meta)) {
157 plogdx_err(mbufs[i], "Parsing failed\n");
158 out[i] = OUT_DISCARD;
162 ret = rte_hash_lookup(task->bundle_ctx_pool.hash, (const void *)&pt);
165 plogx_dbg("Client: packet RX that does not belong to connection:"
166 "Client = "IPv4_BYTES_FMT":%d, Server = "IPv4_BYTES_FMT":%d\n",
167 IPv4_BYTES(((uint8_t*)&pt.dst_addr)),
168 rte_bswap16(pt.dst_port),
169 IPv4_BYTES(((uint8_t*)&pt.src_addr)),
170 rte_bswap16(pt.src_port));
172 plogdx_dbg(mbufs[i], NULL);
174 if (pt.proto_id == IPPROTO_TCP) {
175 stream_tcp_create_rst(mbufs[i], &l4_meta, &pt);
180 out[i] = OUT_DISCARD;
185 conn = task->bundle_ctx_pool.hash_entries[ret];
186 ret = bundle_proc_data(conn, mbufs[i], &l4_meta, &task->bundle_ctx_pool, &task->seed, &task->l4_stats);
187 out[i] = ret == 0? 0: OUT_HANDLED;
189 task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
192 /* If there is at least one callback to handle, handle at most MAX_PKT_BURST */
193 if (heap_top_is_lower(task->heap, rte_rdtsc())) {
194 if (0 != refill_mbufs(&task->n_new_mbufs, task->mempool, task->new_mbufs))
197 uint16_t n_called_back = 0;
198 while (heap_top_is_lower(task->heap, rte_rdtsc()) && n_called_back < MAX_PKT_BURST) {
199 conn = BUNDLE_CTX_UPCAST(heap_pop(task->heap));
201 /* handle packet TX (retransmit or delayed transmit) */
202 ret = bundle_proc_data(conn, task->new_mbufs[n_called_back], NULL, &task->bundle_ctx_pool, &task->seed, &task->l4_stats);
205 out[n_called_back] = 0;
209 plogx_dbg("During callback, will send %d packets\n", n_called_back);
211 task->base.tx_pkt(&task->base, task->new_mbufs, n_called_back, out);
212 task->n_new_mbufs -= n_called_back;
215 uint32_t n_new = task->bundle_ctx_pool.n_free_bundles;
216 n_new = n_new > MAX_PKT_BURST? MAX_PKT_BURST : n_new;
218 uint64_t diff = (rte_rdtsc() - task->new_conn_last_tsc)/task->new_conn_cost;
219 task->new_conn_last_tsc += diff * task->new_conn_cost;
220 task->new_conn_tokens += diff;
222 if (task->new_conn_tokens > 16)
223 task->new_conn_tokens = 16;
224 if (n_new > task->new_conn_tokens)
225 n_new = task->new_conn_tokens;
226 task->new_conn_tokens -= n_new;
230 if (0 != refill_mbufs(&task->n_new_mbufs, task->mempool, task->new_mbufs))
233 for (uint32_t i = 0; i < n_new; ++i) {
234 struct bundle_ctx *bundle_ctx = bundle_ctx_pool_get_w_cfg(&task->bundle_ctx_pool);
235 PROX_ASSERT(bundle_ctx);
237 struct pkt_tuple *pt = &bundle_ctx->tuple;
241 /* Note that the actual packet sent will
242 contain swapped addresses and ports
243 (i.e. pkt.src <=> tuple.dst). The incoming
244 packet will match this struct. */
245 bundle_init(bundle_ctx, task->heap, PEER_CLIENT, &task->seed);
247 ret = rte_hash_lookup(task->bundle_ctx_pool.hash, (const void *)pt);
249 if (n_retries++ == 1000) {
250 plogx_err("Already tried 1K times\n");
255 ret = rte_hash_add_key(task->bundle_ctx_pool.hash, (const void *)pt);
258 plogx_err("Failed to add key ret = %d, n_free = %d\n", ret, task->bundle_ctx_pool.n_free_bundles);
259 bundle_ctx_pool_put(&task->bundle_ctx_pool, bundle_ctx);
261 pkt_tuple_debug2(pt);
262 out[i] = OUT_DISCARD;
266 task->bundle_ctx_pool.hash_entries[ret] = bundle_ctx;
268 if (bundle_ctx->ctx.stream_cfg->proto == IPPROTO_TCP)
269 task->l4_stats.tcp_created++;
271 task->l4_stats.udp_created++;
273 task->l4_stats.bundles_created++;
275 ret = bundle_proc_data(bundle_ctx, task->new_mbufs[i], NULL, &task->bundle_ctx_pool, &task->seed, &task->l4_stats);
276 out[i] = ret == 0? 0: OUT_HANDLED;
279 int ret2 = task->base.tx_pkt(&task->base, task->new_mbufs, n_new, out);
280 task->n_new_mbufs -= n_new;
284 static int handle_gen_queued(struct task_gen_server *task)
286 uint8_t out[MAX_PKT_BURST];
287 struct bundle_ctx *conn;
288 struct pkt_tuple pkt_tuple;
289 struct l4_meta l4_meta;
291 uint16_t cancelled = 0;
294 if (task->cur_mbufs_beg == task->cur_mbufs_end) {
295 task->cur_mbufs_end = fqueue_get(task->fqueue, task->cur_mbufs, MAX_PKT_BURST);
296 task->cur_mbufs_beg = 0;
298 uint16_t n_pkts = task->cur_mbufs_end - task->cur_mbufs_beg;
299 struct rte_mbuf **mbufs = task->cur_mbufs + task->cur_mbufs_beg;
302 if (task->cancelled) {
303 uint16_t pkt_len = mbuf_wire_size(mbufs[0]);
305 if (token_time_take(&task->token_time, pkt_len) != 0)
308 out[0] = task->out_saved;
313 for (; j < n_pkts; ++j) {
315 if (parse_pkt(mbufs[j], &pkt_tuple, &l4_meta)) {
316 plogdx_err(mbufs[j], "Unknown packet, parsing failed\n");
317 out[j] = OUT_DISCARD;
321 ret = rte_hash_lookup(task->bundle_ctx_pool.hash, (const void *)&pkt_tuple);
324 conn = task->bundle_ctx_pool.hash_entries[ret];
326 /* If not part of existing connection, try to create a connection */
328 nt.dst_addr = pkt_tuple.dst_addr;
329 nt.proto_id = pkt_tuple.proto_id;
330 nt.dst_port = pkt_tuple.dst_port;
331 rte_memcpy(nt.l2_types, pkt_tuple.l2_types, sizeof(nt.l2_types));
332 const struct bundle_cfg *n;
334 if (NULL != (n = server_accept(task, &nt))) {
335 conn = bundle_ctx_pool_get(&task->bundle_ctx_pool);
337 out[j] = OUT_DISCARD;
338 plogx_err("No more free bundles to accept new connection\n");
341 ret = rte_hash_add_key(task->bundle_ctx_pool.hash, (const void *)&pkt_tuple);
343 out[j] = OUT_DISCARD;
344 bundle_ctx_pool_put(&task->bundle_ctx_pool, conn);
345 plog_err("Adding key failed while trying to accept connection\n");
349 task->bundle_ctx_pool.hash_entries[ret] = conn;
351 bundle_init_w_cfg(conn, n, task->heap, PEER_SERVER, &task->seed);
352 conn->tuple = pkt_tuple;
354 if (conn->ctx.stream_cfg->proto == IPPROTO_TCP)
355 task->l4_stats.tcp_created++;
357 task->l4_stats.udp_created++;
360 plog_err("Packet received for service that does not exist :\n"
361 "source ip = %0x:%u\n"
363 pkt_tuple.src_addr, rte_bswap16(pkt_tuple.src_port),
364 pkt_tuple.dst_addr, rte_bswap16(pkt_tuple.dst_port));
368 /* bundle contains either an active connection or a
369 newly created connection. If it is NULL, then not
372 ret = bundle_proc_data(conn, mbufs[j], &l4_meta, &task->bundle_ctx_pool, &task->seed, &task->l4_stats);
374 out[j] = ret == 0? 0: OUT_HANDLED;
377 uint16_t pkt_len = mbuf_wire_size(mbufs[j]);
379 if (token_time_take(&task->token_time, pkt_len) != 0) {
380 task->out_saved = out[j];
382 task->base.tx_pkt(&task->base, mbufs, j, out);
383 task->cur_mbufs_beg += j;
389 pkt_tuple_debug(&pkt_tuple);
390 plogd_dbg(mbufs[j], NULL);
391 out[j] = OUT_DISCARD;
395 task->base.tx_pkt(&task->base, mbufs, j, out);
397 task->cur_mbufs_beg += j;
401 static int handle_gen_scheduled(struct task_gen_server *task)
403 struct bundle_ctx *conn;
404 uint8_t out[MAX_PKT_BURST];
406 uint16_t n_called_back = 0;
408 if (task->cancelled) {
409 struct rte_mbuf *mbuf = task->mbuf_saved;
411 uint16_t pkt_len = mbuf_wire_size(mbuf);
412 if (token_time_take(&task->token_time, pkt_len) == 0) {
415 task->base.tx_pkt(&task->base, &mbuf, 1, out);
422 if (0 != refill_mbufs(&task->n_new_mbufs, task->mempool, task->new_mbufs))
426 while (heap_top_is_lower(task->heap, rte_rdtsc()) && n_called_back < task->n_new_mbufs) {
427 conn = BUNDLE_CTX_UPCAST(heap_pop(task->heap));
429 /* handle packet TX (retransmit or delayed transmit) */
430 ret = bundle_proc_data(conn, task->new_mbufs[n_called_back], NULL, &task->bundle_ctx_pool, &task->seed, &task->l4_stats);
433 struct rte_mbuf *mbuf = task->new_mbufs[n_called_back];
434 uint16_t pkt_len = mbuf_wire_size(mbuf);
436 if (token_time_take(&task->token_time, pkt_len) == 0) {
437 out[n_called_back] = 0;
442 struct ether_hdr *eth = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
443 struct ipv4_hdr *ip = (struct ipv4_hdr*)(eth + 1);
444 struct tcp_hdr *tcp = (struct tcp_hdr*)(ip + 1);
448 task->mbuf_saved = mbuf;
449 task->base.tx_pkt(&task->base, task->new_mbufs, n_called_back, out);
450 /* The mbuf that is currently been
451 processed (and which has been
452 cancelled) is saved in
453 task->mbuf_saved. It will be
454 restored as the first mbuf when
455 this function is called again. */
456 task->n_new_mbufs -= (n_called_back + 1);
462 task->base.tx_pkt(&task->base, task->new_mbufs, n_called_back, out);
463 task->n_new_mbufs -= n_called_back;
468 static int handle_gen_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
470 struct task_gen_server *task = (struct task_gen_server *)tbase;
471 struct bundle_ctx *conn;
474 token_time_update(&task->token_time, rte_rdtsc());
476 if ((ret = fqueue_put(task->fqueue, mbufs, n_pkts)) != n_pkts) {
477 uint8_t out[MAX_PKT_BURST];
478 for (uint16_t j = 0; j < n_pkts - ret; ++j)
479 out[j] = OUT_DISCARD;
481 ret2 = task->base.tx_pkt(&task->base, mbufs + ret, n_pkts - ret, out);
483 if (task->handle_state == HANDLE_QUEUED) {
484 if (handle_gen_queued(task) == 0) {
485 if (handle_gen_scheduled(task) != 0)
486 task->handle_state = HANDLE_SCHEDULED;
490 if (handle_gen_scheduled(task) == 0) {
491 if (handle_gen_queued(task) != 0)
492 task->handle_state = HANDLE_QUEUED;
498 static int lua_to_host_set(struct lua_State *L, enum lua_place from, const char *name, struct host_set *h)
501 if ((pop = lua_getfrom(L, from, name)) < 0)
504 if (!lua_istable(L, -1))
507 uint32_t port = 0, port_mask = 0;
509 if (lua_to_ip(L, TABLE, "ip", &h->ip) || lua_to_int(L, TABLE, "port", &port))
512 if (lua_to_int(L, TABLE, "ip_mask", &h->ip_mask))
514 if (lua_to_int(L, TABLE, "port_mask", &port_mask))
517 h->port = rte_bswap16(port);
518 h->port_mask = rte_bswap16(port_mask);
519 h->ip = rte_bswap32(h->ip);
520 h->ip_mask = rte_bswap32(h->ip_mask);
526 static int file_read_cached(const char *file_name, uint8_t **mem, uint32_t beg, uint32_t len, uint32_t socket, struct hash_set *hs)
535 /* Since the configuration can reference the same file from
536 multiple places, use prox_shared infrastructure to detect
537 this and return previously loaded data. */
540 snprintf(name, sizeof(name), "%u-%u:%s", beg, len, file_name);
541 *mem = prox_sh_find_socket(socket, name);
545 /* check if the file has been loaded on the other socket. */
546 if (socket == 1 && (data_mem = prox_sh_find_socket(0, name))) {
547 uint8_t *data_find = hash_set_find(hs, data_mem, len);
549 data_find = prox_zmalloc(len, socket);
550 PROX_PANIC(data_find == NULL, "Failed to allocate memory (%u bytes) to hold header for peer\n", len);
552 rte_memcpy(data_find, data_mem, len);
553 hash_set_add(hs, data_find, len);
556 prox_sh_add_socket(socket, name, *mem);
560 /* It is possible that a file with a different name contains
561 the same data. In that case, search all loaded files and
562 compare the data to reduce memory utilization.*/
563 data_mem = malloc(len);
564 PROX_PANIC(data_mem == NULL, "Failed to allocate temporary memory to hold data\n");
566 if (file_read_content(file_name, data_mem, beg, len)) {
567 plog_err("%s\n", file_get_error());
571 uint8_t *data_find = hash_set_find(hs, data_mem, len);
573 data_find = prox_zmalloc(len, socket);
574 PROX_PANIC(data_find == NULL, "Failed to allocate memory (%u bytes) to hold header for peer\n", len);
576 rte_memcpy(data_find, data_mem, len);
577 hash_set_add(hs, data_find, len);
583 prox_sh_add_socket(socket, name, *mem);
587 static int lua_to_peer_data(struct lua_State *L, enum lua_place from, const char *name, uint32_t socket, struct peer_data *peer_data, size_t *cl, struct hash_set *hs)
589 uint32_t hdr_len, hdr_beg, content_len, content_beg;
590 char hdr_file[256], content_file[256];
593 if ((pop = lua_getfrom(L, from, name)) < 0)
596 if (!lua_istable(L, -1))
599 if (lua_getfrom(L, TABLE, "header") < 0)
601 if (lua_to_int(L, TABLE, "len", &hdr_len) < 0)
603 if (lua_to_int(L, TABLE, "beg", &hdr_beg) < 0)
605 if (lua_to_string(L, TABLE, "file_name", hdr_file, sizeof(hdr_file)) < 0)
609 if (lua_getfrom(L, TABLE, "content") < 0)
611 if (lua_to_int(L, TABLE, "len", &content_len) < 0)
613 if (lua_to_int(L, TABLE, "beg", &content_beg) < 0)
615 if (lua_to_string(L, TABLE, "file_name", content_file, sizeof(content_file)) < 0)
619 if (hdr_len == UINT32_MAX) {
620 long ret = file_get_size(hdr_file);
623 plog_err("%s", file_get_error());
626 hdr_len = ret - hdr_beg;
629 if (content_len == UINT32_MAX) {
630 long ret = file_get_size(content_file);
633 plog_err("%s", file_get_error());
636 content_len = ret - content_beg;
639 peer_data->hdr_len = hdr_len;
641 if (file_read_cached(hdr_file, &peer_data->hdr, hdr_beg, hdr_len, socket, hs))
643 if (file_read_cached(content_file, &peer_data->content, content_beg, content_len, socket, hs))
650 static int lua_to_peer_action(struct lua_State *L, enum lua_place from, const char *name, struct peer_action *action, size_t client_contents_len, size_t server_contents_len)
654 if ((pop = lua_getfrom(L, from, name)) < 0)
657 if (!lua_istable(L, -1))
660 uint32_t peer, beg, len;
661 if (lua_to_int(L, TABLE, "peer", &peer) ||
662 lua_to_int(L, TABLE, "beg", &beg) ||
663 lua_to_int(L, TABLE, "len", &len)) {
666 size_t data_len = (peer == PEER_CLIENT? client_contents_len : server_contents_len);
667 if (len == (uint32_t)-1)
668 len = data_len - beg;
670 PROX_PANIC(beg + len > data_len, "Accessing data past the end (starting at %u for %u bytes) while total length is %zu\n", beg, len, data_len);
679 static int lua_to_stream_cfg(struct lua_State *L, enum lua_place from, const char *name, uint32_t socket, struct stream_cfg **stream_cfg, struct hash_set *hs)
682 struct stream_cfg *ret;
684 if ((pop = lua_getfrom(L, from, name)) < 0)
687 if (lua_getfrom(L, TABLE, "actions") < 0)
690 lua_len(prox_lua(), -1);
691 uint32_t n_actions = lua_tointeger(prox_lua(), -1);
692 lua_pop(prox_lua(), 1);
697 mem_size += sizeof(*ret);
698 /* one additional action is allocated to allow inserting an
699 additional "default" action to close down TCP sessions from
701 mem_size += sizeof(ret->actions[0]) * (n_actions + 1);
703 ret = prox_zmalloc(sizeof(*ret) + mem_size, socket);
704 ret->n_actions = n_actions;
706 size_t client_contents_len, server_contents_len;
708 uint32_t timeout_us, timeout_time_wait_us;
709 plogx_dbg("loading stream\n");
710 if (lua_to_host_set(L, TABLE, "servers", &ret->servers))
712 if (lua_to_string(L, TABLE, "l4_proto", proto, sizeof(proto)))
714 if (lua_to_peer_data(L, TABLE, "client_data", socket, &ret->data[PEER_CLIENT], &client_contents_len, hs))
716 if (lua_to_peer_data(L, TABLE, "server_data", socket, &ret->data[PEER_SERVER], &server_contents_len, hs))
719 if (lua_to_int(L, TABLE, "timeout", &timeout_us)) {
720 timeout_us = 1000000;
723 ret->tsc_timeout = usec_to_tsc(timeout_us);
727 if (lua_to_double(L, TABLE, "up_bps", &up))
728 up = 5000;// Default rate is 40 Mbps
730 if (lua_to_double(L, TABLE, "dn_bps", &dn))
731 dn = 5000;// Default rate is 40 Mbps
733 const uint64_t hz = rte_get_tsc_hz();
735 ret->tt_cfg[PEER_CLIENT] = token_time_cfg_create(up, hz, ETHER_MAX_LEN + 20);
736 ret->tt_cfg[PEER_SERVER] = token_time_cfg_create(dn, hz, ETHER_MAX_LEN + 20);
738 if (!strcmp(proto, "tcp")) {
739 ret->proto = IPPROTO_TCP;
740 ret->proc = stream_tcp_proc;
741 ret->is_ended = stream_tcp_is_ended;
743 if (lua_to_int(L, TABLE, "timeout_time_wait", &timeout_time_wait_us)) {
744 timeout_time_wait_us = 2000000;
747 ret->tsc_timeout_time_wait = usec_to_tsc(timeout_time_wait_us);
749 else if (!strcmp(proto, "udp")) {
750 plogx_dbg("loading UDP\n");
751 ret->proto = IPPROTO_UDP;
752 ret->proc = stream_udp_proc;
753 ret->is_ended = stream_udp_is_ended;
758 /* get all actions */
759 if (lua_getfrom(L, TABLE, "actions") < 0)
764 while (lua_next(L, -2)) {
765 if (lua_to_peer_action(L, STACK, NULL, &ret->actions[idx], client_contents_len, server_contents_len))
768 stream_cfg_verify_action(ret, &ret->actions[idx]);
776 /* For TCP, one of the peers initiates closing down the
777 connection. This is signified by the last action having
778 with zero length. If such an action is not specified in the
779 configuration file, the default is for the client to close
780 the connection. This means that the TCP connection at the
781 client will go into a TIME_WAIT state and the server
782 releases all the resources avoiding resource starvation at
784 if (ret->proto == IPPROTO_TCP && ret->actions[ret->n_actions - 1].len != 0) {
785 ret->actions[ret->n_actions].len = 0;
786 ret->actions[ret->n_actions].beg = 0;
787 ret->actions[ret->n_actions].peer = PEER_CLIENT;
791 if (IPPROTO_TCP == ret->proto)
792 stream_tcp_calc_len(ret, &ret->n_pkts, &ret->n_bytes);
794 stream_udp_calc_len(ret, &ret->n_pkts, &ret->n_bytes);
801 static int lua_to_bundle_cfg(struct lua_State *L, enum lua_place from, const char *name, uint8_t socket, struct bundle_cfg *bundle, struct hash_set *hs)
804 int clients_loaded = 0;
806 if ((pop = lua_getfrom(L, from, name)) < 0)
809 if (!lua_istable(L, -1))
812 lua_len(prox_lua(), -1);
813 bundle->n_stream_cfgs = lua_tointeger(prox_lua(), -1);
814 lua_pop(prox_lua(), 1);
816 bundle->stream_cfgs = prox_zmalloc(sizeof(*bundle->stream_cfgs) * bundle->n_stream_cfgs, socket);
818 plogx_dbg("loading bundle cfg with %d streams\n", bundle->n_stream_cfgs);
821 while (lua_next(L, -2)) {
822 if (!clients_loaded) {
823 if (lua_to_host_set(L, TABLE, "clients", &bundle->clients)) {
828 if (lua_to_stream_cfg(L, STACK, NULL, socket, &bundle->stream_cfgs[idx], hs)) {
840 static void init_task_gen(struct task_base *tbase, struct task_args *targ)
842 struct task_gen_server *task = (struct task_gen_server *)tbase;
843 const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
845 static char name[] = "server_mempool";
847 task->mempool = rte_mempool_create(name,
848 4*1024 - 1, MBUF_SIZE,
850 sizeof(struct rte_pktmbuf_pool_private),
851 rte_pktmbuf_pool_init, NULL,
854 PROX_PANIC(task->mempool == NULL, "Failed to allocate memory pool with %u elements\n", 4*1024 - 1);
855 int pop = lua_getfrom(prox_lua(), GLOBAL, targ->streams);
856 PROX_PANIC(pop < 0, "Failed to find '%s' in lua\n", targ->streams);
858 lua_len(prox_lua(), -1);
859 uint32_t n_listen = lua_tointeger(prox_lua(), -1);
860 lua_pop(prox_lua(), 1);
861 PROX_PANIC(n_listen == 0, "No services specified to listen on\n");
863 task->bundle_cfgs = prox_zmalloc(n_listen * sizeof(task->bundle_cfgs[0]), socket_id);
865 plogx_info("n_listen = %d\n", n_listen);
867 struct hash_set *hs = prox_sh_find_socket(socket_id, "genl4_streams");
869 /* Expected number of streams per bundle = 1, hash_set
870 will grow if full. */
871 hs = hash_set_create(n_listen, socket_id);
872 prox_sh_add_socket(socket_id, "genl4_streams", hs);
875 const struct rte_hash_parameters listen_table = {
877 .entries = n_listen * 4,
878 .key_len = sizeof(struct new_tuple),
879 .hash_func = rte_hash_crc,
880 .hash_func_init_val = 0,
881 .socket_id = socket_id,
885 task->listen_hash = rte_hash_create(&listen_table);
886 task->listen_entries = prox_zmalloc(listen_table.entries * sizeof(task->listen_entries[0]), socket_id);
889 lua_pushnil(prox_lua());
890 while (lua_next(prox_lua(), -2)) {
891 task->bundle_cfgs[idx].n_stream_cfgs = 1;
892 task->bundle_cfgs[idx].stream_cfgs = prox_zmalloc(sizeof(*task->bundle_cfgs[idx].stream_cfgs), socket_id);
893 int ret = lua_to_stream_cfg(prox_lua(), STACK, NULL, socket_id, &task->bundle_cfgs[idx].stream_cfgs[0], hs);
894 PROX_PANIC(ret, "Failed to load stream cfg\n");
895 struct stream_cfg *stream = task->bundle_cfgs[idx].stream_cfgs[0];
897 // TODO: check mask and add to hash for each host
898 struct new_tuple nt = {
899 .dst_addr = stream->servers.ip,
900 .proto_id = stream->proto,
901 .dst_port = stream->servers.port,
902 .l2_types[0] = 0x0008,
905 ret = rte_hash_add_key(task->listen_hash, &nt);
906 PROX_PANIC(ret < 0, "Failed to add\n");
908 task->listen_entries[ret] = &task->bundle_cfgs[idx];
910 plogx_dbg("Server = "IPv4_BYTES_FMT":%d\n", IPv4_BYTES(((uint8_t*)&nt.dst_addr)), rte_bswap16(nt.dst_port));
912 lua_pop(prox_lua(), 1);
915 static char name2[] = "task_gen_hash2";
918 plogx_dbg("Creating bundle ctx pool\n");
919 if (bundle_ctx_pool_create(name2, targ->n_concur_conn * 2, &task->bundle_ctx_pool, NULL, 0, NULL, socket_id)) {
921 PROX_PANIC(1, "Failed to create conn_ctx_pool\n");
924 task->heap = heap_create(targ->n_concur_conn * 2, socket_id);
925 task->seed = rte_rdtsc();
927 /* TODO: calculate the CDF of the reply distribution and the
928 number of replies as the number to cover for 99% of the
929 replies. For now, assume that this is number is 2. */
930 uint32_t queue_size = rte_align32pow2(targ->n_concur_conn * 2);
932 PROX_PANIC(queue_size == 0, "Overflow resulted in queue size 0\n");
933 task->fqueue = fqueue_create(queue_size, socket_id);
934 PROX_PANIC(task->fqueue == NULL, "Failed to allocate local queue\n");
936 uint32_t n_descriptors;
938 if (targ->nb_txports) {
939 PROX_PANIC(targ->nb_txports != 1, "Need exactly one TX port for L4 generation\n");
940 n_descriptors = prox_port_cfg[targ->tx_port_queue[0].port].n_txd;
942 PROX_PANIC(targ->nb_txrings != 1, "Need exactly one TX ring for L4 generation\n");
946 struct token_time_cfg tt_cfg = {
947 .bpp = targ->rate_bps,
948 .period = rte_get_tsc_hz(),
949 .bytes_max = n_descriptors * (ETHER_MIN_LEN + 20),
952 token_time_init(&task->token_time, &tt_cfg);
955 static void init_task_gen_client(struct task_base *tbase, struct task_args *targ)
957 struct task_gen_client *task = (struct task_gen_client *)tbase;
958 static char name[] = "gen_pool";
959 const uint32_t socket = rte_lcore_to_socket_id(targ->lconf->id);
961 task->mempool = rte_mempool_create(name,
962 4*1024 - 1, MBUF_SIZE,
964 sizeof(struct rte_pktmbuf_pool_private),
965 rte_pktmbuf_pool_init, NULL,
968 PROX_PANIC(task->mempool == NULL, "Failed to allocate memory pool with %u elements\n", 4*1024 - 1);
970 /* streams contains a lua table. Go through it and read each
971 stream with associated imix_fraction. */
975 int pop = lua_getfrom(prox_lua(), GLOBAL, targ->streams);
976 PROX_PANIC(pop < 0, "Failed to find '%s' in lua\n", targ->streams);
978 lua_len(prox_lua(), -1);
979 uint32_t n_bundle_cfgs = lua_tointeger(prox_lua(), -1);
980 lua_pop(prox_lua(), 1);
981 PROX_PANIC(n_bundle_cfgs == 0, "No configs specified\n");
982 plogx_info("loading %d bundle_cfgs\n", n_bundle_cfgs);
984 struct hash_set *hs = prox_sh_find_socket(socket, "genl4_streams");
986 /* Expected number of streams per bundle = 8, hash_set
987 will grow if full. */
988 hs = hash_set_create(n_bundle_cfgs * 8, socket);
989 prox_sh_add_socket(socket, "genl4_streams", hs);
992 task->bundle_cfgs = prox_zmalloc(n_bundle_cfgs * sizeof(task->bundle_cfgs[0]), socket);
993 lua_pushnil(prox_lua());
997 uint32_t *occur = prox_zmalloc(n_bundle_cfgs * sizeof(*occur), socket);
998 struct cdf *cdf = cdf_create(n_bundle_cfgs, socket);
1000 while (lua_next(prox_lua(), -2)) {
1001 PROX_PANIC(lua_to_int(prox_lua(), TABLE, "imix_fraction", &imix) ||
1002 lua_to_bundle_cfg(prox_lua(), TABLE, "bundle", socket, &task->bundle_cfgs[i], hs),
1003 "Failed to load bundle cfg:\n%s\n", get_lua_to_errors());
1008 lua_pop(prox_lua(), 1);
1011 lua_pop(prox_lua(), pop);
1014 PROX_PANIC(targ->max_setup_rate == 0, "Max setup rate not set\n");
1016 task->new_conn_cost = rte_get_tsc_hz()/targ->max_setup_rate;
1018 static char name2[] = "task_gen_hash";
1020 plogx_dbg("Creating bundle ctx pool\n");
1021 if (bundle_ctx_pool_create(name2, targ->n_concur_conn, &task->bundle_ctx_pool, occur, n_bundle_cfgs, task->bundle_cfgs, socket)) {
1023 PROX_PANIC(1, "Failed to create conn_ctx_pool\n");
1026 task->heap = heap_create(targ->n_concur_conn, socket);
1027 task->seed = rte_rdtsc();
1028 /* task->token_time.bytes_max = MAX_PKT_BURST * (ETHER_MAX_LEN + 20); */
1030 /* To avoid overflowing the tx descriptors, the token bucket
1031 size needs to be limited. The descriptors are filled most
1032 quickly with the smallest packets. For that reason, the
1033 token bucket size is given by "number of tx descriptors" *
1034 "smallest Ethernet packet". */
1035 PROX_ASSERT(targ->nb_txports == 1);
1037 struct token_time_cfg tt_cfg = {
1038 .bpp = targ->rate_bps,
1039 .period = rte_get_tsc_hz(),
1040 .bytes_max = prox_port_cfg[targ->tx_port_queue[0].port].n_txd * (ETHER_MIN_LEN + 20),
1043 token_time_init(&task->token_time, &tt_cfg);
1046 static void start_task_gen_client(struct task_base *tbase)
1048 struct task_gen_client *task = (struct task_gen_client *)tbase;
1050 token_time_reset(&task->token_time, rte_rdtsc(), 0);
1052 task->new_conn_tokens = 0;
1053 task->new_conn_last_tsc = rte_rdtsc();
1056 static void stop_task_gen_client(struct task_base *tbase)
1058 struct task_gen_client *task = (struct task_gen_client *)tbase;
1059 struct bundle_ctx *bundle;
1061 while (!heap_is_empty(task->heap)) {
1062 bundle = BUNDLE_CTX_UPCAST(heap_pop(task->heap));
1063 bundle_expire(bundle, &task->bundle_ctx_pool, &task->l4_stats);
1067 static void start_task_gen_server(struct task_base *tbase)
1069 struct task_gen_server *task = (struct task_gen_server *)tbase;
1071 token_time_reset(&task->token_time, rte_rdtsc(), 0);
1074 static void stop_task_gen_server(struct task_base *tbase)
1076 struct task_gen_server *task = (struct task_gen_server *)tbase;
1077 struct bundle_ctx *bundle;
1078 uint8_t out[MAX_PKT_BURST];
1080 while (!heap_is_empty(task->heap)) {
1081 bundle = BUNDLE_CTX_UPCAST(heap_pop(task->heap));
1082 bundle_expire(bundle, &task->bundle_ctx_pool, &task->l4_stats);
1085 if (task->cancelled) {
1086 struct rte_mbuf *mbuf = task->mbuf_saved;
1088 out[0] = OUT_DISCARD;
1089 task->cancelled = 0;
1090 task->base.tx_pkt(&task->base, &mbuf, 1, out);
1094 if (task->cur_mbufs_beg == task->cur_mbufs_end) {
1095 task->cur_mbufs_end = fqueue_get(task->fqueue, task->cur_mbufs, MAX_PKT_BURST);
1096 task->cur_mbufs_beg = 0;
1097 if (task->cur_mbufs_end == 0)
1100 uint16_t n_pkts = task->cur_mbufs_end - task->cur_mbufs_beg;
1101 struct rte_mbuf **mbufs = task->cur_mbufs + task->cur_mbufs_beg;
1104 for (uint16_t j = 0; j < n_pkts; ++j) {
1105 out[j] = OUT_DISCARD;
1107 task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
1112 static struct task_init task_init_gen1 = {
1113 .mode_str = "genl4",
1114 .sub_mode_str = "server",
1115 .init = init_task_gen,
1116 .handle = handle_gen_bulk,
1117 .start = start_task_gen_server,
1118 .stop = stop_task_gen_server,
1119 .flag_features = TASK_FEATURE_ZERO_RX,
1120 .size = sizeof(struct task_gen_server),
1121 .mbuf_size = 2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM,
1124 static struct task_init task_init_gen2 = {
1125 .mode_str = "genl4",
1126 .init = init_task_gen_client,
1127 .handle = handle_gen_bulk_client,
1128 .start = start_task_gen_client,
1129 .stop = stop_task_gen_client,
1130 .flag_features = TASK_FEATURE_ZERO_RX,
1131 .size = sizeof(struct task_gen_client),
1132 .mbuf_size = 2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM,
1135 __attribute__((constructor)) static void reg_task_gen(void)
1137 reg_task(&task_init_gen1);
1138 reg_task(&task_init_gen2);