Merge changes from PROX-v041
[samplevnf.git] / VNFs / DPPD-PROX / main.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <string.h>
18 #include <locale.h>
19 #include <unistd.h>
20 #include <signal.h>
21
22 #include <rte_cycles.h>
23 #include <rte_atomic.h>
24 #include <rte_table_hash.h>
25 #include <rte_memzone.h>
26 #include <rte_errno.h>
27
28 #include "prox_malloc.h"
29 #include "run.h"
30 #include "main.h"
31 #include "log.h"
32 #include "quit.h"
33 #include "clock.h"
34 #include "defines.h"
35 #include "version.h"
36 #include "prox_args.h"
37 #include "prox_assert.h"
38 #include "prox_cfg.h"
39 #include "prox_shared.h"
40 #include "prox_port_cfg.h"
41 #include "toeplitz.h"
42 #include "hash_utils.h"
43 #include "handle_lb_net.h"
44 #include "prox_cksum.h"
45 #include "thread_nop.h"
46 #include "thread_generic.h"
47 #include "thread_pipeline.h"
48 #include "cqm.h"
49 #include "handle_master.h"
50
51 #if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0)
52 #define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE
53 #endif
54
55 uint8_t lb_nb_txrings = 0xff;
56 struct rte_ring *ctrl_rings[RTE_MAX_LCORE*MAX_TASKS_PER_CORE];
57
58 static void __attribute__((noreturn)) prox_usage(const char *prgname)
59 {
60         plog_info("\nUsage: %s [-f CONFIG_FILE] [-a|-e] [-m|-s|-i] [-w DEF] [-u] [-t]\n"
61                   "\t-f CONFIG_FILE : configuration file to load, ./prox.cfg by default\n"
62                   "\t-l LOG_FILE : log file name, ./prox.log by default\n"
63                   "\t-p : include PID in log file name if default log file is used\n"
64                   "\t-o DISPLAY: Set display to use, can be 'curses' (default), 'cli' or 'none'\n"
65                   "\t-v verbosity : initial logging verbosity\n"
66                   "\t-a : autostart all cores (by default)\n"
67                   "\t-e : don't autostart\n"
68                   "\t-n : Create NULL devices instead of using PCI devices, useful together with -i\n"
69                   "\t-m : list supported task modes and exit\n"
70                   "\t-s : check configuration file syntax and exit\n"
71                   "\t-i : check initialization sequence and exit\n"
72                   "\t-u : Listen on UDS /tmp/prox.sock\n"
73                   "\t-t : Listen on TCP port 8474\n"
74                   "\t-q : Pass argument to Lua interpreter, useful to define variables\n"
75                   "\t-w : define variable using syntax varname=value\n"
76                   "\t     takes precedence over variables defined in CONFIG_FILE\n"
77                   "\t-k : Log statistics to file \"stats_dump\" in current directory\n"
78                   "\t-d : Run as daemon, the parent process will block until PROX is not initialized\n"
79                   "\t-z : Ignore CPU topology, implies -i\n"
80                   "\t-r : Change initial screen refresh rate. If set to a lower than 0.001 seconds,\n"
81                   "\t     screen refreshing will be disabled\n"
82                   , prgname);
83         exit(EXIT_FAILURE);
84 }
85
86 static void check_mixed_normal_pipeline(void)
87 {
88         struct lcore_cfg *lconf = NULL;
89         uint32_t lcore_id = -1;
90
91         while (prox_core_next(&lcore_id, 0) == 0) {
92                 lconf = &lcore_cfg[lcore_id];
93
94                 int all_thread_nop = 1;
95                 int generic = 0;
96                 int pipeline = 0;
97                 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
98                         struct task_args *targ = &lconf->targs[task_id];
99                         all_thread_nop = all_thread_nop &&
100                                 targ->task_init->thread_x == thread_nop;
101
102                         pipeline = pipeline || targ->task_init->thread_x == thread_pipeline;
103                         generic = generic || targ->task_init->thread_x == thread_generic;
104                 }
105                 PROX_PANIC(generic && pipeline, "Can't run both pipeline and normal thread on same core\n");
106
107                 if (all_thread_nop)
108                         lconf->thread_x = thread_nop;
109                 else {
110                         lconf->thread_x = thread_generic;
111                 }
112         }
113 }
114
115 static void check_zero_rx(void)
116 {
117         struct lcore_cfg *lconf = NULL;
118         struct task_args *targ;
119
120         while (core_targ_next(&lconf, &targ, 0) == 0) {
121                 if (targ->nb_rxports != 0) {
122                         PROX_PANIC(task_init_flag_set(targ->task_init, TASK_FEATURE_NO_RX),
123                            "\tCore %u task %u: rx_ports configured while mode %s does not use it\n", lconf->id, targ->id, targ->task_init->mode_str);
124                 }
125         }
126 }
127
128 static void check_missing_rx(void)
129 {
130         struct lcore_cfg *lconf = NULL, *rx_lconf = NULL;
131         struct task_args *targ, *rx_targ = NULL;
132         struct prox_port_cfg *port;
133         uint8_t port_id, rx_port_id, ok;
134
135         while (core_targ_next(&lconf, &targ, 0) == 0) {
136                 PROX_PANIC((targ->flags & TASK_ARG_RX_RING) && targ->rx_rings[0] == 0 && !targ->tx_opt_ring_task,
137                            "Configuration Error - Core %u task %u Receiving from ring, but nobody xmitting to this ring\n", lconf->id, targ->id);
138                 if (targ->nb_rxports == 0 && targ->nb_rxrings == 0) {
139                         PROX_PANIC(!task_init_flag_set(targ->task_init, TASK_FEATURE_NO_RX),
140                                    "\tCore %u task %u: no rx_ports and no rx_rings configured while required by mode %s\n", lconf->id, targ->id, targ->task_init->mode_str);
141                 }
142         }
143
144         lconf = NULL;
145         while (core_targ_next(&lconf, &targ, 0) == 0) {
146                 if (strcmp(targ->task_init->sub_mode_str, "l3") != 0)
147                         continue;
148                 port = find_reachable_port(targ);
149                 if (port == NULL)
150                         continue;
151                 port_id = port - prox_port_cfg;
152                 rx_lconf = NULL;
153                 ok = 0;
154                 plog_info("\tCore %d task %d transmitting to port %d in L3 mode\n", lconf->id, targ->id, port_id);
155                 while (core_targ_next(&rx_lconf, &rx_targ, 0) == 0) {
156                         for (uint8_t i = 0; i < rx_targ->nb_rxports; ++i) {
157                                 rx_port_id = rx_targ->rx_port_queue[i].port;
158                                 if ((rx_port_id == port_id) && (rx_targ->task_init->flag_features & TASK_FEATURE_L3)){
159                                         ok = 1;
160                                         break;
161                                 }
162                         }
163                         if (ok == 1) {
164                                 plog_info("\tCore %d task %d has found core %d task %d receiving from port %d\n", lconf->id, targ->id, rx_lconf->id, rx_targ->id, port_id);
165                                 break;
166                         }
167                 }
168                 PROX_PANIC(ok == 0, "L3 sub mode for port %d on core %d task %d, but no core/task receiving on that port\n", port_id, lconf->id, targ->id);
169         }
170 }
171
172 static void check_cfg_consistent(void)
173 {
174         check_missing_rx();
175         check_zero_rx();
176         check_mixed_normal_pipeline();
177 }
178
179 static void plog_all_rings(void)
180 {
181         struct lcore_cfg *lconf = NULL;
182         struct task_args *targ;
183
184         while (core_targ_next(&lconf, &targ, 0) == 0) {
185                 for (uint8_t ring_idx = 0; ring_idx < targ->nb_rxrings; ++ring_idx) {
186                         plog_info("\tCore %u, task %u, rx_ring[%u] %p\n", lconf->id, targ->id, ring_idx, targ->rx_rings[ring_idx]);
187                 }
188         }
189 }
190
191 static int chain_flag_state(struct task_args *targ, uint64_t flag, int is_set)
192 {
193         if (task_init_flag_set(targ->task_init, flag) == is_set)
194                 return 1;
195
196         int ret = 0;
197
198         for (uint32_t i = 0; i < targ->n_prev_tasks; ++i) {
199                 ret = chain_flag_state(targ->prev_tasks[i], flag, is_set);
200                 if (ret)
201                         return 1;
202         }
203         return 0;
204 }
205
206 static void configure_if_tx_queues(struct task_args *targ, uint8_t socket)
207 {
208         uint8_t if_port;
209
210         for (uint8_t i = 0; i < targ->nb_txports; ++i) {
211                 if_port = targ->tx_port_queue[i].port;
212
213                 PROX_PANIC(if_port == OUT_DISCARD, "port misconfigured, exiting\n");
214
215                 PROX_PANIC(!prox_port_cfg[if_port].active, "\tPort %u not used, skipping...\n", if_port);
216
217                 int dsocket = prox_port_cfg[if_port].socket;
218                 if (dsocket != -1 && dsocket != socket) {
219                         plog_warn("TX core on socket %d while device on socket %d\n", socket, dsocket);
220                 }
221
222                 if (prox_port_cfg[if_port].tx_ring[0] == '\0') {  // Rings-backed port can use single queue
223                         targ->tx_port_queue[i].queue = prox_port_cfg[if_port].n_txq;
224                         prox_port_cfg[if_port].n_txq++;
225                 } else {
226                         prox_port_cfg[if_port].n_txq = 1;
227                         targ->tx_port_queue[i].queue = 0;
228                 }
229                 /* Set the ETH_TXQ_FLAGS_NOREFCOUNT flag if none of
230                    the tasks up to the task transmitting to the port
231                    does not use refcnt. */
232                 if (!chain_flag_state(targ, TASK_FEATURE_TXQ_FLAGS_REFCOUNT, 1)) {
233                         prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOREFCOUNT;
234                         plog_info("\t\tEnabling No refcnt on port %d\n", if_port);
235                 }
236                 else {
237                         plog_info("\t\tRefcnt used on port %d\n", if_port);
238                 }
239
240                 /* By default OFFLOAD is enabled, but if the whole
241                    chain has NOOFFLOADS set all the way until the
242                    first task that receives from a port, it will be
243                    disabled for the destination port. */
244                 if (chain_flag_state(targ, TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS, 1)) {
245                         prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOOFFLOADS;
246                         plog_info("\t\tDisabling TX offloads on port %d\n", if_port);
247                 } else {
248                         plog_info("\t\tEnabling TX offloads on port %d\n", if_port);
249                 }
250
251                 /* By default NOMULTSEGS is disabled, as drivers/NIC might split packets on RX
252                    It should only be enabled when we know for sure that the RX does not split packets.
253                    Set the ETH_TXQ_FLAGS_NOMULTSEGS flag if none of the tasks up to the task
254                    transmitting to the port does not use multsegs. */
255                 if (!chain_flag_state(targ, TASK_FEATURE_TXQ_FLAGS_NOMULTSEGS, 0)) {
256                         prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
257                         plog_info("\t\tEnabling No MultiSegs on port %d\n", if_port);
258                 }
259                 else {
260                         plog_info("\t\tMultiSegs used on port %d\n", if_port);
261                 }
262         }
263 }
264
265 static void configure_if_rx_queues(struct task_args *targ, uint8_t socket)
266 {
267         for (int i = 0; i < targ->nb_rxports; i++) {
268                 uint8_t if_port = targ->rx_port_queue[i].port;
269
270                 if (if_port == OUT_DISCARD) {
271                         return;
272                 }
273
274                 PROX_PANIC(!prox_port_cfg[if_port].active, "Port %u not used, aborting...\n", if_port);
275
276                 if(prox_port_cfg[if_port].rx_ring[0] != '\0') {
277                         prox_port_cfg[if_port].n_rxq = 0;
278                 }
279
280                 targ->rx_port_queue[i].queue = prox_port_cfg[if_port].n_rxq;
281                 prox_port_cfg[if_port].pool[targ->rx_port_queue[i].queue] = targ->pool;
282                 prox_port_cfg[if_port].pool_size[targ->rx_port_queue[i].queue] = targ->nb_mbuf - 1;
283                 prox_port_cfg[if_port].n_rxq++;
284
285                 int dsocket = prox_port_cfg[if_port].socket;
286                 if (dsocket != -1 && dsocket != socket) {
287                         plog_warn("RX core on socket %d while device on socket %d\n", socket, dsocket);
288                 }
289         }
290 }
291
292 static void configure_if_queues(void)
293 {
294         struct lcore_cfg *lconf = NULL;
295         struct task_args *targ;
296         uint8_t socket;
297
298         while (core_targ_next(&lconf, &targ, 0) == 0) {
299                 socket = rte_lcore_to_socket_id(lconf->id);
300
301                 configure_if_tx_queues(targ, socket);
302                 configure_if_rx_queues(targ, socket);
303         }
304 }
305
306 static const char *gen_ring_name(void)
307 {
308         static char retval[] = "XX";
309         static const char* ring_names =
310                 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
311                 "abcdefghijklmnopqrstuvwxyz"
312                 "[\\]^_`!\"#$%&'()*+,-./:;<="
313                 ">?@{|}0123456789";
314         static int idx2 = 0;
315
316         int idx = idx2;
317
318         retval[0] = ring_names[idx % strlen(ring_names)];
319         idx /= strlen(ring_names);
320         retval[1] = idx ? ring_names[(idx - 1) % strlen(ring_names)] : 0;
321
322         idx2++;
323
324         return retval;
325 }
326
327 struct ring_init_stats {
328         uint32_t n_pkt_rings;
329         uint32_t n_ctrl_rings;
330         uint32_t n_opt_rings;
331 };
332
333 static uint32_t ring_init_stats_total(const struct ring_init_stats *ris)
334 {
335         return ris->n_pkt_rings + ris->n_ctrl_rings + ris->n_opt_rings;
336 }
337
338 static uint32_t count_incoming_tasks(uint32_t lcore_worker, uint32_t dest_task)
339 {
340         struct lcore_cfg *lconf = NULL;
341         struct task_args *targ;
342         uint32_t ret = 0;
343         struct core_task ct;
344
345         while (core_targ_next(&lconf, &targ, 0) == 0) {
346                 for (uint8_t idxx = 0; idxx < MAX_PROTOCOLS; ++idxx) {
347                         for (uint8_t ridx = 0; ridx < targ->core_task_set[idxx].n_elems; ++ridx) {
348                                 ct = targ->core_task_set[idxx].core_task[ridx];
349
350                                 if (dest_task == ct.task && lcore_worker == ct.core)
351                                         ret++;
352                         }
353                 }
354         }
355         return ret;
356 }
357
358 static struct rte_ring *get_existing_ring(uint32_t lcore_id, uint32_t task_id)
359 {
360         if (!prox_core_active(lcore_id, 0))
361                 return NULL;
362
363         struct lcore_cfg *lconf = &lcore_cfg[lcore_id];
364
365         if (task_id >= lconf->n_tasks_all)
366                 return NULL;
367
368         if (lconf->targs[task_id].nb_rxrings == 0)
369                 return NULL;
370
371         return lconf->targs[task_id].rx_rings[0];
372 }
373
374 static struct rte_ring *init_ring_between_tasks(struct lcore_cfg *lconf, struct task_args *starg,
375                                     const struct core_task ct, uint8_t ring_idx, int idx,
376                                     struct ring_init_stats *ris)
377 {
378         uint8_t socket;
379         struct rte_ring *ring = NULL;
380         struct lcore_cfg *lworker;
381         struct task_args *dtarg;
382
383         PROX_ASSERT(prox_core_active(ct.core, 0));
384         lworker = &lcore_cfg[ct.core];
385
386         /* socket used is the one that the sending core resides on */
387         socket = rte_lcore_to_socket_id(lconf->id);
388
389         plog_info("\t\tCreating ring on socket %u with size %u\n"
390                   "\t\t\tsource core, task and socket = %u, %u, %u\n"
391                   "\t\t\tdestination core, task and socket = %u, %u, %u\n"
392                   "\t\t\tdestination worker id = %u\n",
393                   socket, starg->ring_size,
394                   lconf->id, starg->id, socket,
395                   ct.core, ct.task, rte_lcore_to_socket_id(ct.core),
396                   ring_idx);
397
398         if (ct.type) {
399                 struct rte_ring **dring = NULL;
400
401                 if (ct.type == CTRL_TYPE_MSG)
402                         dring = &lworker->ctrl_rings_m[ct.task];
403                 else if (ct.type == CTRL_TYPE_PKT) {
404                         dring = &lworker->ctrl_rings_p[ct.task];
405                         starg->flags |= TASK_ARG_CTRL_RINGS_P;
406                 }
407
408                 if (*dring == NULL)
409                         ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SC_DEQ);
410                 else
411                         ring = *dring;
412                 PROX_PANIC(ring == NULL, "Cannot create ring to connect I/O core %u with worker core %u\n", lconf->id, ct.core);
413
414                 starg->tx_rings[starg->tot_n_txrings_inited] = ring;
415                 starg->tot_n_txrings_inited++;
416                 *dring = ring;
417                 if (lconf->id == prox_cfg.master) {
418                         ctrl_rings[ct.core*MAX_TASKS_PER_CORE + ct.task] = ring;
419                 } else if (ct.core == prox_cfg.master) {
420                         starg->ctrl_plane_ring = ring;
421                 }
422
423                 plog_info("\t\tCore %u task %u to -> core %u task %u ctrl_ring %s %p %s\n",
424                           lconf->id, starg->id, ct.core, ct.task, ct.type == CTRL_TYPE_PKT?
425                           "pkt" : "msg", ring, ring->name);
426                 ris->n_ctrl_rings++;
427                 return ring;
428         }
429
430         dtarg = &lworker->targs[ct.task];
431         lworker->targs[ct.task].worker_thread_id = ring_idx;
432         PROX_ASSERT(dtarg->flags & TASK_ARG_RX_RING);
433         PROX_ASSERT(ct.task < lworker->n_tasks_all);
434
435         /* If all the following conditions are met, the ring can be
436            optimized away. */
437         if (!task_is_master(starg) && !task_is_master(dtarg) && starg->lconf->id == dtarg->lconf->id &&
438             starg->nb_txrings == 1 && idx == 0 && dtarg->task &&
439             dtarg->tot_rxrings == 1 && starg->task == dtarg->task - 1) {
440                 plog_info("\t\tOptimizing away ring on core %u from task %u to task %u\n",
441                           dtarg->lconf->id, starg->task, dtarg->task);
442                 /* No need to set up ws_mbuf. */
443                 starg->tx_opt_ring = 1;
444                 /* During init of destination task, the buffer in the
445                    source task will be initialized. */
446                 dtarg->tx_opt_ring_task = starg;
447                 ris->n_opt_rings++;
448                 ++dtarg->nb_rxrings;
449                 return NULL;
450         }
451
452         int ring_created = 1;
453         /* Only create multi-producer rings if configured to do so AND
454            there is only one task sending to the task */
455         if ((prox_cfg.flags & DSF_MP_RINGS && count_incoming_tasks(ct.core, ct.task) > 1)
456                 || (prox_cfg.flags & DSF_ENABLE_BYPASS)) {
457                 ring = get_existing_ring(ct.core, ct.task);
458
459                 if (ring) {
460                         plog_info("\t\tCore %u task %u creatign MP ring %p to core %u task %u\n",
461                                   lconf->id, starg->id, ring, ct.core, ct.task);
462                         ring_created = 0;
463                 }
464                 else {
465                         ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SC_DEQ);
466                         plog_info("\t\tCore %u task %u using MP ring %p from core %u task %u\n",
467                                   lconf->id, starg->id, ring, ct.core, ct.task);
468                 }
469         }
470         else
471                 ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SP_ENQ | RING_F_SC_DEQ);
472
473         PROX_PANIC(ring == NULL, "Cannot create ring to connect I/O core %u with worker core %u\n", lconf->id, ct.core);
474
475         starg->tx_rings[starg->tot_n_txrings_inited] = ring;
476         starg->tot_n_txrings_inited++;
477
478         if (ring_created) {
479                 PROX_ASSERT(dtarg->nb_rxrings < MAX_RINGS_PER_TASK);
480                 dtarg->rx_rings[dtarg->nb_rxrings] = ring;
481                 ++dtarg->nb_rxrings;
482         }
483         dtarg->nb_slave_threads = starg->core_task_set[idx].n_elems;
484         dtarg->lb_friend_core = lconf->id;
485         dtarg->lb_friend_task = starg->id;
486         plog_info("\t\tWorker thread %d has core %d, task %d as a lb friend\n", ct.core, lconf->id, starg->id);
487         plog_info("\t\tCore %u task %u tx_ring[%u] -> core %u task %u rx_ring[%u] %p %s %u WT\n",
488                   lconf->id, starg->id, ring_idx, ct.core, ct.task, dtarg->nb_rxrings, ring, ring->name,
489                   dtarg->nb_slave_threads);
490         ++ris->n_pkt_rings;
491         return ring;
492 }
493
494 static void init_rings(void)
495 {
496         struct lcore_cfg *lconf = NULL;
497         struct task_args *starg;
498         struct ring_init_stats ris = {0};
499
500         while (core_targ_next(&lconf, &starg, 1) == 0) {
501                 plog_info("\t*** Initializing rings on core %u, task %u ***\n", lconf->id, starg->id);
502                 for (uint8_t idx = 0; idx < MAX_PROTOCOLS; ++idx) {
503                         for (uint8_t ring_idx = 0; ring_idx < starg->core_task_set[idx].n_elems; ++ring_idx) {
504                                 PROX_ASSERT(ring_idx < MAX_WT_PER_LB);
505                                 PROX_ASSERT(starg->tot_n_txrings_inited < MAX_RINGS_PER_TASK);
506
507                                 struct core_task ct = starg->core_task_set[idx].core_task[ring_idx];
508                                 init_ring_between_tasks(lconf, starg, ct, ring_idx, idx, &ris);
509                         }
510                 }
511         }
512
513         plog_info("\tInitialized %d rings:\n"
514                   "\t\tNumber of packet rings: %u\n"
515                   "\t\tNumber of control rings: %u\n"
516                   "\t\tNumber of optimized rings: %u\n",
517                   ring_init_stats_total(&ris),
518                   ris.n_pkt_rings,
519                   ris.n_ctrl_rings,
520                   ris.n_opt_rings);
521
522         lconf = NULL;
523         struct prox_port_cfg *port;
524         while (core_targ_next(&lconf, &starg, 1) == 0) {
525                 if ((starg->task_init) && (starg->task_init->flag_features & TASK_FEATURE_L3)) {
526                         struct core_task ct;
527                         ct.core = prox_cfg.master;
528                         ct.task = 0;
529                         ct.type = CTRL_TYPE_PKT;
530                         struct rte_ring *rx_ring = init_ring_between_tasks(lconf, starg, ct, 0, 0, &ris);
531
532                         ct.core = lconf->id;
533                         ct.task = starg->id;;
534                         struct rte_ring *tx_ring = init_ring_between_tasks(lcore_cfg, lcore_cfg[prox_cfg.master].targs, ct, 0, 0, &ris);
535                 }
536         }
537 }
538
539 static void shuffle_mempool(struct rte_mempool* mempool, uint32_t nb_mbuf)
540 {
541         struct rte_mbuf** pkts = prox_zmalloc(nb_mbuf * sizeof(*pkts), rte_socket_id());
542         uint64_t got = 0;
543
544         while (rte_mempool_get_bulk(mempool, (void**)(pkts + got), 1) == 0)
545                 ++got;
546
547         while (got) {
548                 int idx;
549                 do {
550                         idx = rand() % nb_mbuf - 1;
551                 } while (pkts[idx] == 0);
552
553                 rte_mempool_put_bulk(mempool, (void**)&pkts[idx], 1);
554                 pkts[idx] = 0;
555                 --got;
556         };
557         prox_free(pkts);
558 }
559
560 static void setup_mempools_unique_per_socket(void)
561 {
562         uint32_t flags = 0;
563         char name[64];
564         struct lcore_cfg *lconf = NULL;
565         struct task_args *targ;
566
567         struct rte_mempool     *pool[MAX_SOCKETS];
568         uint32_t mbuf_count[MAX_SOCKETS] = {0};
569         uint32_t nb_cache_mbuf[MAX_SOCKETS] = {0};
570         uint32_t mbuf_size[MAX_SOCKETS] = {0};
571
572         while (core_targ_next_early(&lconf, &targ, 0) == 0) {
573                 PROX_PANIC(targ->task_init == NULL, "task_init = NULL, is mode specified for core %d, task %d ?\n", lconf->id, targ->id);
574                 uint8_t socket = rte_lcore_to_socket_id(lconf->id);
575                 PROX_ASSERT(socket < MAX_SOCKETS);
576
577                 if (targ->mbuf_size_set_explicitely)
578                         flags = MEMPOOL_F_NO_SPREAD;
579                 if ((!targ->mbuf_size_set_explicitely) && (targ->task_init->mbuf_size != 0)) {
580                         targ->mbuf_size = targ->task_init->mbuf_size;
581                 }
582                 if (targ->rx_port_queue[0].port != OUT_DISCARD) {
583                         struct prox_port_cfg* port_cfg = &prox_port_cfg[targ->rx_port_queue[0].port];
584                         PROX_ASSERT(targ->nb_mbuf != 0);
585                         mbuf_count[socket] += targ->nb_mbuf;
586                         if (nb_cache_mbuf[socket] == 0)
587                                 nb_cache_mbuf[socket] = targ->nb_cache_mbuf;
588                         else {
589                                 PROX_PANIC(nb_cache_mbuf[socket] != targ->nb_cache_mbuf,
590                                            "all mbuf_cache must have the same size if using a unique mempool per socket\n");
591                         }
592                         if (mbuf_size[socket] == 0)
593                                 mbuf_size[socket] = targ->mbuf_size;
594                         else {
595                                 PROX_PANIC(mbuf_size[socket] != targ->mbuf_size,
596                                            "all mbuf_size must have the same size if using a unique mempool per socket\n");
597                         }
598                         if ((!targ->mbuf_size_set_explicitely) && (strcmp(port_cfg->short_name, "vmxnet3") == 0)) {
599                                 if (mbuf_size[socket] < MBUF_SIZE + RTE_PKTMBUF_HEADROOM)
600                                         mbuf_size[socket] = MBUF_SIZE + RTE_PKTMBUF_HEADROOM;
601                         }
602                 }
603         }
604         for (int i = 0 ; i < MAX_SOCKETS; i++) {
605                 if (mbuf_count[i] != 0) {
606                         sprintf(name, "socket_%u_pool", i);
607                         pool[i] = rte_mempool_create(name,
608                                                      mbuf_count[i] - 1, mbuf_size[i],
609                                                      nb_cache_mbuf[i],
610                                                      sizeof(struct rte_pktmbuf_pool_private),
611                                                      rte_pktmbuf_pool_init, NULL,
612                                                      prox_pktmbuf_init, NULL,
613                                                      i, flags);
614                         PROX_PANIC(pool[i] == NULL, "\t\tError: cannot create mempool for socket %u\n", i);
615                         plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", pool[i],
616                                   mbuf_count[i], mbuf_size[i], nb_cache_mbuf[i], i);
617
618                         if (prox_cfg.flags & DSF_SHUFFLE) {
619                                 shuffle_mempool(pool[i], mbuf_count[i]);
620                         }
621                 }
622         }
623
624         lconf = NULL;
625         while (core_targ_next_early(&lconf, &targ, 0) == 0) {
626                 uint8_t socket = rte_lcore_to_socket_id(lconf->id);
627
628                 if (targ->rx_port_queue[0].port != OUT_DISCARD) {
629                         /* use this pool for the interface that the core is receiving from */
630                         /* If one core receives from multiple ports, all the ports use the same mempool */
631                         targ->pool = pool[socket];
632                         /* Set the number of mbuf to the number of the unique mempool, so that the used and free work */
633                         targ->nb_mbuf = mbuf_count[socket];
634                         plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", targ->pool,
635                                   targ->nb_mbuf, mbuf_size[socket], targ->nb_cache_mbuf, socket);
636                 }
637         }
638 }
639
640 static void setup_mempool_for_rx_task(struct lcore_cfg *lconf, struct task_args *targ)
641 {
642         const uint8_t socket = rte_lcore_to_socket_id(lconf->id);
643         struct prox_port_cfg *port_cfg = &prox_port_cfg[targ->rx_port_queue[0].port];
644         const struct rte_memzone *mz;
645         struct rte_mempool *mp = NULL;
646         uint32_t flags = 0;
647         char memzone_name[64];
648         char name[64];
649
650         /* mbuf size can be set
651          *  - from config file (highest priority, overwriting any other config) - should only be used as workaround
652          *  - through each 'mode', overwriting the default mbuf_size
653          *  - defaulted to MBUF_SIZE i.e. 1518 Bytes
654          * Except is set expliciteky, ensure that size is big enough for vmxnet3 driver
655          */
656         if (targ->mbuf_size_set_explicitely) {
657                 flags = MEMPOOL_F_NO_SPREAD;
658                 /* targ->mbuf_size already set */
659         }
660         else if (targ->task_init->mbuf_size != 0) {
661                 /* mbuf_size not set through config file but set through mode */
662                 targ->mbuf_size = targ->task_init->mbuf_size;
663         }
664         else if (strcmp(port_cfg->short_name, "vmxnet3") == 0) {
665                 if (targ->mbuf_size < MBUF_SIZE + RTE_PKTMBUF_HEADROOM)
666                         targ->mbuf_size = MBUF_SIZE + RTE_PKTMBUF_HEADROOM;
667         }
668
669         /* allocate memory pool for packets */
670         PROX_ASSERT(targ->nb_mbuf != 0);
671
672         if (targ->pool_name[0] == '\0') {
673                 sprintf(name, "core_%u_port_%u_pool", lconf->id, targ->id);
674         }
675
676         snprintf(memzone_name, sizeof(memzone_name)-1, "MP_%s", targ->pool_name);
677         mz = rte_memzone_lookup(memzone_name);
678
679         if (mz != NULL) {
680                 mp = (struct rte_mempool*)mz->addr;
681
682                 targ->nb_mbuf = mp->size;
683                 targ->pool = mp;
684         }
685
686 #ifdef RTE_LIBRTE_IVSHMEM_FALSE
687         if (mz != NULL && mp != NULL && mp->phys_addr != mz->ioremap_addr) {
688                 /* Init mbufs with ioremap_addr for dma */
689                 mp->phys_addr = mz->ioremap_addr;
690                 mp->elt_pa[0] = mp->phys_addr + (mp->elt_va_start - (uintptr_t)mp);
691
692                 struct prox_pktmbuf_reinit_args init_args;
693                 init_args.mp = mp;
694                 init_args.lconf = lconf;
695
696                 uint32_t elt_sz = mp->elt_size + mp->header_size + mp->trailer_size;
697                 rte_mempool_obj_iter((void*)mp->elt_va_start, mp->size, elt_sz, 1,
698                                      mp->elt_pa, mp->pg_num, mp->pg_shift, prox_pktmbuf_reinit, &init_args);
699         }
700 #endif
701
702         /* Use this pool for the interface that the core is
703            receiving from if one core receives from multiple
704            ports, all the ports use the same mempool */
705         if (targ->pool == NULL) {
706                 plog_info("\t\tCreating mempool with name '%s'\n", name);
707                 targ->pool = rte_mempool_create(name,
708                                                 targ->nb_mbuf - 1, targ->mbuf_size,
709                                                 targ->nb_cache_mbuf,
710                                                 sizeof(struct rte_pktmbuf_pool_private),
711                                                 rte_pktmbuf_pool_init, NULL,
712                                                 prox_pktmbuf_init, lconf,
713                                                 socket, flags);
714         }
715
716         PROX_PANIC(targ->pool == NULL,
717                    "\t\tError: cannot create mempool for core %u port %u: %s\n", lconf->id, targ->id, rte_strerror(rte_errno));
718
719         plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", targ->pool,
720                   targ->nb_mbuf, targ->mbuf_size, targ->nb_cache_mbuf, socket);
721         if (prox_cfg.flags & DSF_SHUFFLE) {
722                 shuffle_mempool(targ->pool, targ->nb_mbuf);
723         }
724 }
725
726 static void setup_mempools_multiple_per_socket(void)
727 {
728         struct lcore_cfg *lconf = NULL;
729         struct task_args *targ;
730
731         while (core_targ_next_early(&lconf, &targ, 0) == 0) {
732                 PROX_PANIC(targ->task_init == NULL, "task_init = NULL, is mode specified for core %d, task %d ?\n", lconf->id, targ->id);
733                 if (targ->rx_port_queue[0].port == OUT_DISCARD)
734                         continue;
735                 setup_mempool_for_rx_task(lconf, targ);
736         }
737 }
738
739 static void setup_mempools(void)
740 {
741         if (prox_cfg.flags & UNIQUE_MEMPOOL_PER_SOCKET)
742                 setup_mempools_unique_per_socket();
743         else
744                 setup_mempools_multiple_per_socket();
745 }
746
747 static void set_task_lconf(void)
748 {
749         struct lcore_cfg *lconf;
750         uint32_t lcore_id = -1;
751
752         while(prox_core_next(&lcore_id, 1) == 0) {
753                 lconf = &lcore_cfg[lcore_id];
754                 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
755                         lconf->targs[task_id].lconf = lconf;
756                 }
757         }
758 }
759
760 static void set_dest_threads(void)
761 {
762         struct lcore_cfg *lconf = NULL;
763         struct task_args *targ;
764
765         while (core_targ_next(&lconf, &targ, 0) == 0) {
766                 for (uint8_t idx = 0; idx < MAX_PROTOCOLS; ++idx) {
767                         for (uint8_t ring_idx = 0; ring_idx < targ->core_task_set[idx].n_elems; ++ring_idx) {
768                                 struct core_task ct = targ->core_task_set[idx].core_task[ring_idx];
769
770                                 struct task_args *dest_task = core_targ_get(ct.core, ct.task);
771                                 dest_task->prev_tasks[dest_task->n_prev_tasks++] = targ;
772                         }
773                 }
774         }
775 }
776
777 static void setup_all_task_structs_early_init(void)
778 {
779         struct lcore_cfg *lconf = NULL;
780         struct task_args *targ;
781
782         plog_info("\t*** Calling early init on all tasks ***\n");
783         while (core_targ_next(&lconf, &targ, 0) == 0) {
784                 if (targ->task_init->early_init) {
785                         targ->task_init->early_init(targ);
786                 }
787         }
788 }
789
790 static void setup_all_task_structs(void)
791 {
792         struct lcore_cfg *lconf;
793         uint32_t lcore_id = -1;
794         struct task_base *tmaster = NULL;
795
796         while(prox_core_next(&lcore_id, 1) == 0) {
797                 lconf = &lcore_cfg[lcore_id];
798                 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
799                         if (task_is_master(&lconf->targs[task_id])) {
800                                 plog_info("\tInitializing MASTER struct for core %d task %d\n", lcore_id, task_id);
801                                 lconf->tasks_all[task_id] = init_task_struct(&lconf->targs[task_id]);
802                                 tmaster = lconf->tasks_all[task_id];
803                         }
804                 }
805         }
806         PROX_PANIC(tmaster == NULL, "Can't initialize master task\n");
807         lcore_id = -1;
808
809         while(prox_core_next(&lcore_id, 1) == 0) {
810                 lconf = &lcore_cfg[lcore_id];
811                 plog_info("\tInitializing struct for core %d with %d task\n", lcore_id, lconf->n_tasks_all);
812                 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
813                         if (!task_is_master(&lconf->targs[task_id])) {
814                                 plog_info("\tInitializing struct for core %d task %d\n", lcore_id, task_id);
815                                 lconf->targs[task_id].tmaster = tmaster;
816                                 lconf->tasks_all[task_id] = init_task_struct(&lconf->targs[task_id]);
817                         }
818                 }
819         }
820 }
821
822 static void init_port_activate(void)
823 {
824         struct lcore_cfg *lconf = NULL;
825         struct task_args *targ;
826         uint8_t port_id = 0;
827
828         while (core_targ_next_early(&lconf, &targ, 0) == 0) {
829                 for (int i = 0; i < targ->nb_rxports; i++) {
830                         port_id = targ->rx_port_queue[i].port;
831                         prox_port_cfg[port_id].active = 1;
832                 }
833
834                 for (int i = 0; i < targ->nb_txports; i++) {
835                         port_id = targ->tx_port_queue[i].port;
836                         prox_port_cfg[port_id].active = 1;
837                 }
838         }
839 }
840
841 /* Initialize cores and allocate mempools */
842 static void init_lcores(void)
843 {
844         struct lcore_cfg *lconf = 0;
845         uint32_t lcore_id = -1;
846
847         while(prox_core_next(&lcore_id, 0) == 0) {
848                 uint8_t socket = rte_lcore_to_socket_id(lcore_id);
849                 PROX_PANIC(socket + 1 > MAX_SOCKETS, "Can't configure core %u (on socket %u). MAX_SOCKET is set to %d\n", lcore_id, socket, MAX_SOCKETS);
850         }
851
852         /* need to allocate mempools as the first thing to use the lowest possible address range */
853         plog_info("=== Initializing mempools ===\n");
854         setup_mempools();
855
856         lcore_cfg_alloc_hp();
857
858         set_dest_threads();
859         set_task_lconf();
860
861         plog_info("=== Initializing port addresses ===\n");
862         init_port_addr();
863
864         plog_info("=== Initializing queue numbers on cores ===\n");
865         configure_if_queues();
866
867         plog_info("=== Initializing rings on cores ===\n");
868         init_rings();
869
870         plog_info("=== Checking configuration consistency ===\n");
871         check_cfg_consistent();
872
873         plog_all_rings();
874
875         setup_all_task_structs_early_init();
876         plog_info("=== Initializing tasks ===\n");
877         setup_all_task_structs();
878 }
879
880 static int setup_prox(int argc, char **argv)
881 {
882         if (prox_read_config_file() != 0 ||
883             prox_setup_rte(argv[0]) != 0) {
884                 return -1;
885         }
886
887         if (prox_cfg.flags & DSF_CHECK_SYNTAX) {
888                 plog_info("=== Configuration file syntax has been checked ===\n\n");
889                 exit(EXIT_SUCCESS);
890         }
891
892         init_port_activate();
893         plog_info("=== Initializing rte devices ===\n");
894         if (!(prox_cfg.flags & DSF_USE_DUMMY_DEVICES))
895                 init_rte_ring_dev();
896         init_rte_dev(prox_cfg.flags & DSF_USE_DUMMY_DEVICES);
897         plog_info("=== Calibrating TSC overhead ===\n");
898         clock_init();
899         plog_info("\tTSC running at %"PRIu64" Hz\n", rte_get_tsc_hz());
900
901         init_lcores();
902         plog_info("=== Initializing ports ===\n");
903         init_port_all();
904
905         if (prox_cfg.logbuf_size) {
906                 prox_cfg.logbuf = prox_zmalloc(prox_cfg.logbuf_size, rte_socket_id());
907                 PROX_PANIC(prox_cfg.logbuf == NULL, "Failed to allocate memory for logbuf with size = %d\n", prox_cfg.logbuf_size);
908         }
909
910         if (prox_cfg.flags & DSF_CHECK_INIT) {
911                 plog_info("=== Initialization sequence completed ===\n\n");
912                 exit(EXIT_SUCCESS);
913         }
914
915         /* Current way that works to disable DPDK logging */
916         FILE *f = fopen("/dev/null", "r");
917         rte_openlog_stream(f);
918         plog_info("=== PROX started ===\n");
919         return 0;
920 }
921
922 static int success = 0;
923 static void siguser_handler(int signal)
924 {
925         if (signal == SIGUSR1)
926                 success = 1;
927         else
928                 success = 0;
929 }
930
931 static void sigabrt_handler(__attribute__((unused)) int signum)
932 {
933         /* restore default disposition for SIGABRT and SIGPIPE */
934         signal(SIGABRT, SIG_DFL);
935         signal(SIGPIPE, SIG_DFL);
936
937         /* ignore further Ctrl-C */
938         signal(SIGINT, SIG_IGN);
939
940         /* more drastic exit on tedious termination signal */
941         plog_info("Aborting...\n");
942         if (lcore_cfg != NULL) {
943                 uint32_t lcore_id;
944                 pthread_t thread_id, tid0, tid = pthread_self();
945                 memset(&tid0, 0, sizeof(tid0));
946
947                 /* cancel all threads except current one */
948                 lcore_id = -1;
949                 while (prox_core_next(&lcore_id, 1) == 0) {
950                         thread_id = lcore_cfg[lcore_id].thread_id;
951                         if (pthread_equal(thread_id, tid0))
952                                 continue;
953                         if (pthread_equal(thread_id, tid))
954                                 continue;
955                         pthread_cancel(thread_id);
956                 }
957
958                 /* wait for cancelled threads to terminate */
959                 lcore_id = -1;
960                 while (prox_core_next(&lcore_id, 1) == 0) {
961                         thread_id = lcore_cfg[lcore_id].thread_id;
962                         if (pthread_equal(thread_id, tid0))
963                                 continue;
964                         if (pthread_equal(thread_id, tid))
965                                 continue;
966                         pthread_join(thread_id, NULL);
967                 }
968         }
969
970         /* close ncurses */
971         display_end();
972
973         /* close ports on termination signal */
974         close_ports_atexit();
975
976         /* terminate now */
977         abort();
978 }
979
980 static void sigterm_handler(int signum)
981 {
982         /* abort on second Ctrl-C */
983         if (signum == SIGINT)
984                 signal(SIGINT, sigabrt_handler);
985
986         /* gracefully quit on harmless termination signal */
987         /* ports will subsequently get closed at resulting exit */
988         quit();
989 }
990
991 int main(int argc, char **argv)
992 {
993         /* set en_US locale to print big numbers with ',' */
994         setlocale(LC_NUMERIC, "en_US.utf-8");
995
996         if (prox_parse_args(argc, argv) != 0){
997                 prox_usage(argv[0]);
998         }
999
1000         plog_init(prox_cfg.log_name, prox_cfg.log_name_pid);
1001         plog_info("=== " PROGRAM_NAME " " VERSION_STR " ===\n");
1002         plog_info("\tUsing DPDK %s\n", rte_version() + sizeof(RTE_VER_PREFIX));
1003         read_rdt_info();
1004
1005         if (prox_cfg.flags & DSF_LIST_TASK_MODES) {
1006                 /* list supported task modes and exit */
1007                 tasks_list();
1008                 return EXIT_SUCCESS;
1009         }
1010
1011         /* close ports at normal exit */
1012         atexit(close_ports_atexit);
1013         /* gracefully quit on harmless termination signals */
1014         signal(SIGHUP, sigterm_handler);
1015         signal(SIGINT, sigterm_handler);
1016         signal(SIGQUIT, sigterm_handler);
1017         signal(SIGTERM, sigterm_handler);
1018         signal(SIGUSR1, sigterm_handler);
1019         signal(SIGUSR2, sigterm_handler);
1020         /* more drastic exit on tedious termination signals */
1021         signal(SIGABRT, sigabrt_handler);
1022         signal(SIGPIPE, sigabrt_handler);
1023
1024         if (prox_cfg.flags & DSF_DAEMON) {
1025                 signal(SIGUSR1, siguser_handler);
1026                 signal(SIGUSR2, siguser_handler);
1027                 plog_info("=== Running in Daemon mode ===\n");
1028                 plog_info("\tForking child and waiting for setup completion\n");
1029
1030                 pid_t ppid = getpid();
1031                 pid_t pid = fork();
1032                 if (pid < 0) {
1033                         plog_err("Failed to fork process to run in daemon mode\n");
1034                         return EXIT_FAILURE;
1035                 }
1036
1037                 if (pid == 0) {
1038                         fclose(stdin);
1039                         fclose(stdout);
1040                         fclose(stderr);
1041                         if (setsid() < 0) {
1042                                 kill(ppid, SIGUSR2);
1043                                 return EXIT_FAILURE;
1044                         }
1045                         if (setup_prox(argc, argv) != 0) {
1046                                 kill(ppid, SIGUSR2);
1047                                 return EXIT_FAILURE;
1048                         }
1049                         else {
1050                                 kill(ppid, SIGUSR1);
1051                                 run(prox_cfg.flags);
1052                                 return EXIT_SUCCESS;
1053                         }
1054                 }
1055                 else {
1056                         /* Before exiting the parent, wait until the
1057                            child process has finished setting up */
1058                         pause();
1059                         if (prox_cfg.logbuf) {
1060                                 file_print(prox_cfg.logbuf);
1061                         }
1062                         return success? EXIT_SUCCESS : EXIT_FAILURE;
1063                 }
1064         }
1065
1066         if (setup_prox(argc, argv) != 0)
1067                 return EXIT_FAILURE;
1068         run(prox_cfg.flags);
1069         return EXIT_SUCCESS;
1070 }