Merge changes from topic "TST009_benchmark"
[samplevnf.git] / VNFs / DPPD-PROX / handle_fm.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <dlfcn.h>
18
19 #include <rte_ip.h>
20 #include <rte_udp.h>
21 #include <rte_tcp.h>
22 #include <rte_cycles.h>
23 #include <rte_ether.h>
24 #include <rte_ethdev.h> // required by rte_eth_ctrl.h in 19.05
25 #include <rte_eth_ctrl.h>
26
27 #include "log.h"
28 #include "quit.h"
29 #include "lconf.h"
30 #include "task_init.h"
31 #include "task_base.h"
32 #include "kv_store_expire.h"
33 #include "stats.h"
34 #include "prox_shared.h"
35 #include "etypes.h"
36 #include "prox_cfg.h"
37 #include "dpi/dpi.h"
38
39 struct task_dpi_per_core {
40         void     *dpi_opaque;
41 };
42
43 struct task_fm {
44         struct task_base          base;
45         /* FM related fields */
46         struct kv_store_expire   *kv_store_expire;
47         void                     *dpi_opaque;
48
49         struct dpi_engine        dpi_engine;
50         struct task_dpi_per_core *dpi_shared; /* Used only during init */
51 };
52
53 struct eth_ip4_udp {
54         prox_rte_ether_hdr l2;
55         prox_rte_ipv4_hdr  l3;
56         union {
57                 prox_rte_udp_hdr   udp;
58                 prox_rte_tcp_hdr   tcp;
59         } l4;
60 } __attribute__((packed));
61
62 union pkt_type {
63         struct {
64                 uint16_t etype;
65                 uint8_t  ip_byte;
66                 uint8_t  next_proto;
67         } __attribute__((packed));
68         uint32_t val;
69 };
70
71 static union pkt_type pkt_type_udp = {
72         .next_proto = IPPROTO_UDP,
73         .ip_byte    = 0x45,
74         .etype      = ETYPE_IPv4,
75 };
76
77 static union pkt_type pkt_type_tcp = {
78         .next_proto = IPPROTO_TCP,
79         .ip_byte    = 0x45,
80         .etype      = ETYPE_IPv4,
81 };
82
83 static int extract_flow_info(struct eth_ip4_udp *p, struct flow_info *fi, struct flow_info *fi_flipped, uint32_t *len, uint8_t **payload)
84 {
85         union pkt_type pkt_type = {
86                 .next_proto = p->l3.next_proto_id,
87                 .ip_byte    = p->l3.version_ihl,
88                 .etype      = p->l2.ether_type,
89         };
90
91         memset(fi->reservered, 0, sizeof(fi->reservered));
92         memset(fi_flipped->reservered, 0, sizeof(fi_flipped->reservered));
93
94         if (pkt_type.val == pkt_type_udp.val) {
95                 fi->ip_src = p->l3.src_addr;
96                 fi->ip_dst = p->l3.dst_addr;
97                 fi->ip_proto = p->l3.next_proto_id;
98                 fi->port_src = p->l4.udp.src_port;
99                 fi->port_dst = p->l4.udp.dst_port;
100
101                 fi_flipped->ip_src = p->l3.dst_addr;
102                 fi_flipped->ip_dst = p->l3.src_addr;
103                 fi_flipped->ip_proto = p->l3.next_proto_id;
104                 fi_flipped->port_src = p->l4.udp.dst_port;
105                 fi_flipped->port_dst = p->l4.udp.src_port;
106
107                 *len = rte_be_to_cpu_16(p->l4.udp.dgram_len) - sizeof(prox_rte_udp_hdr);
108                 *payload = (uint8_t*)(&p->l4.udp) + sizeof(prox_rte_udp_hdr);
109                 return 0;
110         }
111         else if (pkt_type.val == pkt_type_tcp.val) {
112                 fi->ip_src = p->l3.src_addr;
113                 fi->ip_dst = p->l3.dst_addr;
114                 fi->ip_proto = p->l3.next_proto_id;
115                 fi->port_src = p->l4.tcp.src_port;
116                 fi->port_dst = p->l4.tcp.dst_port;
117
118                 fi_flipped->ip_src = p->l3.dst_addr;
119                 fi_flipped->ip_dst = p->l3.src_addr;
120                 fi_flipped->ip_proto = p->l3.next_proto_id;
121                 fi_flipped->port_src = p->l4.tcp.dst_port;
122                 fi_flipped->port_dst = p->l4.tcp.src_port;
123
124                 *len = rte_be_to_cpu_16(p->l3.total_length) - sizeof(prox_rte_ipv4_hdr) - ((p->l4.tcp.data_off >> 4)*4);
125                 *payload = ((uint8_t*)&p->l4.tcp) + ((p->l4.tcp.data_off >> 4)*4);
126                 return 0;
127         }
128
129         return -1;
130 }
131
132 static int is_flow_beg(const struct flow_info *fi, const struct eth_ip4_udp *p)
133 {
134         return fi->ip_proto == IPPROTO_UDP ||
135                 (fi->ip_proto == IPPROTO_TCP && p->l4.tcp.tcp_flags & PROX_RTE_TCP_SYN_FLAG);
136 }
137
138 static void *lookup_flow(struct task_fm *task, struct flow_info *fi, uint64_t now_tsc)
139 {
140         struct kv_store_expire_entry *entry;
141
142         entry = kv_store_expire_get(task->kv_store_expire, fi, now_tsc);
143
144         return entry ? entry_value(task->kv_store_expire, entry) : NULL;
145 }
146
147 static void *lookup_or_insert_flow(struct task_fm *task, struct flow_info *fi, uint64_t now_tsc)
148 {
149         struct kv_store_expire_entry *entry;
150
151         entry = kv_store_expire_get_or_put(task->kv_store_expire, fi, now_tsc);
152
153         return entry ? entry_value(task->kv_store_expire, entry) : NULL;
154 }
155
156 static int handle_fm(struct task_fm *task, struct rte_mbuf *mbuf, uint64_t now_tsc)
157 {
158         struct eth_ip4_udp *p;
159         struct flow_info fi, fi_flipped;
160         void *flow_data;
161         uint32_t len;
162         uint8_t *payload;
163         uint32_t res[2];
164         size_t res_len = 2;
165         int flow_beg;
166         struct dpi_payload dpi_payload;
167         int is_upstream = 0;
168
169         p = rte_pktmbuf_mtod(mbuf, struct eth_ip4_udp *);
170
171         if (0 != extract_flow_info(p, &fi, &fi_flipped, &len, &payload)) {
172                 plogx_err("Unknown packet type\n");
173                 return OUT_DISCARD;
174         }
175
176         /* First, try to see if the flow already exists where the
177            current packet is sent by the server. */
178         if (!(flow_data = lookup_flow(task, &fi_flipped, now_tsc))) {
179                 /* Insert a new flow, only if this is the first packet
180                    in the flow. */
181                 is_upstream = 1;
182                 if (is_flow_beg(&fi, p))
183                         flow_data = lookup_or_insert_flow(task, &fi, now_tsc);
184                 else
185                         flow_data = lookup_flow(task, &fi, now_tsc);
186         }
187
188         if (!flow_data)
189                 return OUT_DISCARD;
190         else if (!len)
191                 return 0;
192
193         dpi_payload.payload = payload;
194         dpi_payload.len = len;
195         dpi_payload.client_to_server = is_upstream;
196         gettimeofday(&dpi_payload.tv, NULL);
197         task->dpi_engine.dpi_process(task->dpi_opaque, is_upstream? &fi : &fi_flipped, flow_data, &dpi_payload, res, &res_len);
198         return OUT_HANDLED;
199 }
200
201 static int handle_fm_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
202 {
203         struct task_fm *task = (struct task_fm *)tbase;
204         uint64_t now_tsc = rte_rdtsc();
205         uint16_t handled = 0;
206         uint16_t discard = 0;
207         int ret;
208
209         for (uint16_t i = 0; i < n_pkts; ++i) {
210                 ret = handle_fm(task, mbufs[i], now_tsc);
211                 if (ret == OUT_DISCARD)
212                         discard++;
213                 else if (ret == OUT_HANDLED)
214                         handled++;
215         }
216
217         for (uint16_t i = 0; i < n_pkts; ++i)
218                 rte_pktmbuf_free(mbufs[i]);
219
220         TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, handled);
221         TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, discard);
222         return 0;
223 }
224
225 static void load_dpi_engine(const char *dpi_engine_path, struct dpi_engine *dst)
226 {
227         void *handle = prox_sh_find_system(dpi_engine_path);
228
229         if (handle == NULL) {
230                 plogx_info("Loading DPI engine from '%s'\n", dpi_engine_path);
231                 handle = dlopen(dpi_engine_path, RTLD_NOW | RTLD_GLOBAL);
232
233                 PROX_PANIC(handle == NULL, "Failed to load dpi engine from '%s' with error:\n\t\t%s\n", dpi_engine_path, dlerror());
234                 prox_sh_add_system(dpi_engine_path, handle);
235         }
236
237         struct dpi_engine *(*get_dpi_engine)(void) = dlsym(handle, "get_dpi_engine");
238
239         PROX_PANIC(get_dpi_engine == NULL, "Failed to find get_dpi_engine function from '%s'\n", dpi_engine_path);
240         struct dpi_engine *dpi_engine = get_dpi_engine();
241
242         dpi_engine->dpi_print = plog_info;
243         rte_memcpy(dst, dpi_engine, sizeof(*dst));
244 }
245
246 static uint32_t count_fm_cores(void)
247 {
248         uint32_t n_cores = 0;
249         uint32_t lcore_id = -1;
250         struct lcore_cfg *lconf;
251
252         while(prox_core_next(&lcore_id, 0) == 0) {
253                 lconf = &lcore_cfg[lcore_id];
254                 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
255                         if (!strcmp(lconf->targs[task_id].task_init->mode_str, "fm")) {
256                                 n_cores++;
257                                 /* Only intersted in number of cores
258                                    so break here. */
259                                 break;
260                         }
261                 }
262         }
263
264         return n_cores;
265 }
266
267 static struct kv_store_expire *get_shared_flow_table(struct task_args *targ, struct dpi_engine *de)
268 {
269         struct kv_store_expire *ret = prox_sh_find_core(targ->lconf->id, "flow_table");
270         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
271
272         if (!ret) {
273                 ret = kv_store_expire_create(rte_align32pow2(targ->flow_table_size) * 4,
274                                              sizeof(struct flow_info),
275                                              de->dpi_get_flow_entry_size(),
276                                              socket_id,
277                                              de->dpi_flow_expire,
278                                              rte_get_tsc_hz() * 60);
279                 PROX_PANIC(ret == NULL, "Failed to allocate KV store\n");
280                 prox_sh_add_core(targ->lconf->id, "flow_table", ret);
281         }
282         return ret;
283 }
284
285 static struct task_dpi_per_core *get_shared_dpi_shared(struct task_args *targ)
286 {
287         static const char *name = "dpi_shared";
288         struct task_dpi_per_core *ret = prox_sh_find_core(targ->lconf->id, name);
289         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
290
291         if (!ret) {
292                 ret = prox_zmalloc(sizeof(*ret), socket_id);
293                 prox_sh_add_core(targ->lconf->id, name, ret);
294         }
295         return ret;
296 }
297
298 static void init_task_fm(struct task_base *tbase, struct task_args *targ)
299 {
300         struct task_fm *task = (struct task_fm *)tbase;
301         static int dpi_inited = 0;
302
303         load_dpi_engine(targ->dpi_engine_path, &task->dpi_engine);
304
305         task->kv_store_expire = get_shared_flow_table(targ, &task->dpi_engine);
306         task->dpi_shared = get_shared_dpi_shared(targ);
307
308         if (!dpi_inited) {
309                 uint32_t n_threads = count_fm_cores();
310                 const char *dpi_params[16];
311
312                 plogx_info("Initializing DPI with %u threads\n", n_threads);
313                 dpi_inited = 1;
314
315                 PROX_PANIC(targ->n_dpi_engine_args > 16, "Too many DPI arguments");
316                 for (size_t i = 0; i < targ->n_dpi_engine_args && i < 16; ++i)
317                         dpi_params[i] = targ->dpi_engine_args[i];
318
319                 int ret = task->dpi_engine.dpi_init(n_threads, targ->n_dpi_engine_args, dpi_params);
320
321                 PROX_PANIC(ret, "Failed to initialize DPI engine\n");
322         }
323 }
324
325 static void start_first(struct task_base *tbase)
326 {
327         struct task_fm *task = (struct task_fm *)tbase;
328         void *ret = task->dpi_engine.dpi_thread_start();
329
330         task->dpi_shared->dpi_opaque = ret;
331         PROX_PANIC(ret == NULL, "dpi_thread_init failed\n");
332 }
333
334 static void start(struct task_base *tbase)
335 {
336         struct task_fm *task = (struct task_fm *)tbase;
337
338         task->dpi_opaque = task->dpi_shared->dpi_opaque;
339         PROX_PANIC(task->dpi_opaque == NULL, "dpi_opaque == NULL");
340 }
341
342 static void stop(struct task_base *tbase)
343 {
344         struct task_fm *task = (struct task_fm *)tbase;
345
346         size_t expired = kv_store_expire_expire_all(task->kv_store_expire);
347         size_t size = kv_store_expire_size(task->kv_store_expire);
348
349         plogx_info("%zu/%zu\n", expired, size);
350 }
351
352 static void stop_last(struct task_base *tbase)
353 {
354         struct task_fm *task = (struct task_fm *)tbase;
355
356         task->dpi_engine.dpi_thread_stop(task->dpi_shared->dpi_opaque);
357         task->dpi_shared->dpi_opaque = NULL;
358 }
359
360 static struct task_init task_init_fm = {
361         .mode_str = "fm",
362         .init = init_task_fm,
363         .handle = handle_fm_bulk,
364         .start = start,
365         .stop = stop,
366         .start_first = start_first,
367         .stop_last = stop_last,
368         .size = sizeof(struct task_fm)
369 };
370
371 __attribute__((constructor)) static void reg_task_fm(void)
372 {
373         reg_task(&task_init_fm);
374 }