Merge "VNF_Catalogue Codebase"
[samplevnf.git] / VNFs / DPPD-PROX / main.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <string.h>
18 #include <locale.h>
19 #include <unistd.h>
20 #include <signal.h>
21
22 #include <rte_cycles.h>
23 #include <rte_atomic.h>
24 #include <rte_table_hash.h>
25 #include <rte_memzone.h>
26 #include <rte_errno.h>
27
28 #include "prox_malloc.h"
29 #include "run.h"
30 #include "main.h"
31 #include "log.h"
32 #include "quit.h"
33 #include "clock.h"
34 #include "defines.h"
35 #include "version.h"
36 #include "prox_args.h"
37 #include "prox_assert.h"
38 #include "prox_cfg.h"
39 #include "prox_shared.h"
40 #include "prox_port_cfg.h"
41 #include "toeplitz.h"
42 #include "hash_utils.h"
43 #include "handle_lb_net.h"
44 #include "prox_cksum.h"
45 #include "thread_nop.h"
46 #include "thread_generic.h"
47 #include "thread_pipeline.h"
48 #include "cqm.h"
49
50 #if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0)
51 #define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE
52 #endif
53
54 uint8_t lb_nb_txrings = 0xff;
55 struct rte_ring *ctrl_rings[RTE_MAX_LCORE*MAX_TASKS_PER_CORE];
56
57 static void __attribute__((noreturn)) prox_usage(const char *prgname)
58 {
59         plog_info("\nUsage: %s [-f CONFIG_FILE] [-a|-e] [-m|-s|-i] [-w DEF] [-u] [-t]\n"
60                   "\t-f CONFIG_FILE : configuration file to load, ./prox.cfg by default\n"
61                   "\t-l LOG_FILE : log file name, ./prox.log by default\n"
62                   "\t-p : include PID in log file name if default log file is used\n"
63                   "\t-o DISPLAY: Set display to use, can be 'curses' (default), 'cli' or 'none'\n"
64                   "\t-v verbosity : initial logging verbosity\n"
65                   "\t-a : autostart all cores (by default)\n"
66                   "\t-e : don't autostart\n"
67                   "\t-n : Create NULL devices instead of using PCI devices, useful together with -i\n"
68                   "\t-m : list supported task modes and exit\n"
69                   "\t-s : check configuration file syntax and exit\n"
70                   "\t-i : check initialization sequence and exit\n"
71                   "\t-u : Listen on UDS /tmp/prox.sock\n"
72                   "\t-t : Listen on TCP port 8474\n"
73                   "\t-q : Pass argument to Lua interpreter, useful to define variables\n"
74                   "\t-w : define variable using syntax varname=value\n"
75                   "\t     takes precedence over variables defined in CONFIG_FILE\n"
76                   "\t-k : Log statistics to file \"stats_dump\" in current directory\n"
77                   "\t-d : Run as daemon, the parent process will block until PROX is not initialized\n"
78                   "\t-z : Ignore CPU topology, implies -i\n"
79                   "\t-r : Change initial screen refresh rate. If set to a lower than 0.001 seconds,\n"
80                   "\t     screen refreshing will be disabled\n"
81                   , prgname);
82         exit(EXIT_FAILURE);
83 }
84
85 static void check_mixed_normal_pipeline(void)
86 {
87         struct lcore_cfg *lconf = NULL;
88         uint32_t lcore_id = -1;
89
90         while (prox_core_next(&lcore_id, 0) == 0) {
91                 lconf = &lcore_cfg[lcore_id];
92
93                 int all_thread_nop = 1;
94                 int generic = 0;
95                 int pipeline = 0;
96                 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
97                         struct task_args *targ = &lconf->targs[task_id];
98                         all_thread_nop = all_thread_nop &&
99                                 targ->task_init->thread_x == thread_nop;
100
101                         pipeline = pipeline || targ->task_init->thread_x == thread_pipeline;
102                         generic = generic || targ->task_init->thread_x == thread_generic;
103                 }
104                 PROX_PANIC(generic && pipeline, "Can't run both pipeline and normal thread on same core\n");
105
106                 if (all_thread_nop)
107                         lconf->thread_x = thread_nop;
108                 else {
109                         lconf->thread_x = thread_generic;
110                 }
111         }
112 }
113
114 static void check_missing_rx(void)
115 {
116         struct lcore_cfg *lconf = NULL;
117         struct task_args *targ;
118
119         while (core_targ_next(&lconf, &targ, 0) == 0) {
120                 PROX_PANIC((targ->flags & TASK_ARG_RX_RING) && targ->rx_rings[0] == 0 && !targ->tx_opt_ring_task,
121                            "Configuration Error - Core %u task %u Receiving from ring, but nobody xmitting to this ring\n", lconf->id, targ->id);
122                 if (targ->nb_rxports == 0 && targ->nb_rxrings == 0) {
123                         PROX_PANIC(!task_init_flag_set(targ->task_init, TASK_FEATURE_NO_RX),
124                                    "\tCore %u task %u: no rx_ports and no rx_rings configured while required by mode %s\n", lconf->id, targ->id, targ->task_init->mode_str);
125                 }
126         }
127 }
128
129 static void check_cfg_consistent(void)
130 {
131         check_missing_rx();
132         check_mixed_normal_pipeline();
133 }
134
135 static void plog_all_rings(void)
136 {
137         struct lcore_cfg *lconf = NULL;
138         struct task_args *targ;
139
140         while (core_targ_next(&lconf, &targ, 0) == 0) {
141                 for (uint8_t ring_idx = 0; ring_idx < targ->nb_rxrings; ++ring_idx) {
142                         plog_info("\tCore %u, task %u, rx_ring[%u] %p\n", lconf->id, targ->id, ring_idx, targ->rx_rings[ring_idx]);
143                 }
144         }
145 }
146
147 static int chain_flag_state(struct task_args *targ, uint64_t flag, int is_set)
148 {
149         if (task_init_flag_set(targ->task_init, flag) == is_set)
150                 return 1;
151
152         int ret = 0;
153
154         for (uint32_t i = 0; i < targ->n_prev_tasks; ++i) {
155                 ret = chain_flag_state(targ->prev_tasks[i], flag, is_set);
156                 if (ret)
157                         return 1;
158         }
159         return 0;
160 }
161
162 static void configure_if_tx_queues(struct task_args *targ, uint8_t socket)
163 {
164         uint8_t if_port;
165
166         for (uint8_t i = 0; i < targ->nb_txports; ++i) {
167                 if_port = targ->tx_port_queue[i].port;
168
169                 PROX_PANIC(if_port == OUT_DISCARD, "port misconfigured, exiting\n");
170
171                 PROX_PANIC(!prox_port_cfg[if_port].active, "\tPort %u not used, skipping...\n", if_port);
172
173                 int dsocket = prox_port_cfg[if_port].socket;
174                 if (dsocket != -1 && dsocket != socket) {
175                         plog_warn("TX core on socket %d while device on socket %d\n", socket, dsocket);
176                 }
177
178                 if (prox_port_cfg[if_port].tx_ring[0] == '\0') {  // Rings-backed port can use single queue
179                         targ->tx_port_queue[i].queue = prox_port_cfg[if_port].n_txq;
180                         prox_port_cfg[if_port].n_txq++;
181                 } else {
182                         prox_port_cfg[if_port].n_txq = 1;
183                         targ->tx_port_queue[i].queue = 0;
184                 }
185                 /* Set the ETH_TXQ_FLAGS_NOREFCOUNT flag if none of
186                    the tasks up to the task transmitting to the port
187                    does not use refcnt. */
188                 if (!chain_flag_state(targ, TASK_FEATURE_TXQ_FLAGS_REFCOUNT, 1)) {
189                         prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOREFCOUNT;
190                         plog_info("\t\tEnabling No refcnt on port %d\n", if_port);
191                 }
192                 else {
193                         plog_info("\t\tRefcnt used on port %d\n", if_port);
194                 }
195
196                 /* By default OFFLOAD is enabled, but if the whole
197                    chain has NOOFFLOADS set all the way until the
198                    first task that receives from a port, it will be
199                    disabled for the destination port. */
200                 if (chain_flag_state(targ, TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS, 1)) {
201                         prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOOFFLOADS;
202                         plog_info("\t\tDisabling TX offloads on port %d\n", if_port);
203                 } else {
204                         plog_info("\t\tEnabling TX offloads on port %d\n", if_port);
205                 }
206
207                 /* By default NOMULTSEGS is disabled, as drivers/NIC might split packets on RX
208                    It should only be enabled when we know for sure that the RX does not split packets.
209                    Set the ETH_TXQ_FLAGS_NOMULTSEGS flag if none of the tasks up to the task
210                    transmitting to the port does not use multsegs. */
211                 if (!chain_flag_state(targ, TASK_FEATURE_TXQ_FLAGS_NOMULTSEGS, 0)) {
212                         prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
213                         plog_info("\t\tEnabling No MultiSegs on port %d\n", if_port);
214                 }
215                 else {
216                         plog_info("\t\tMultiSegs used on port %d\n", if_port);
217                 }
218         }
219 }
220
221 static void configure_if_rx_queues(struct task_args *targ, uint8_t socket)
222 {
223         for (int i = 0; i < targ->nb_rxports; i++) {
224                 uint8_t if_port = targ->rx_port_queue[i].port;
225
226                 if (if_port == OUT_DISCARD) {
227                         return;
228                 }
229
230                 PROX_PANIC(!prox_port_cfg[if_port].active, "Port %u not used, aborting...\n", if_port);
231
232                 if(prox_port_cfg[if_port].rx_ring[0] != '\0') {
233                         prox_port_cfg[if_port].n_rxq = 0;
234                 }
235
236                 targ->rx_port_queue[i].queue = prox_port_cfg[if_port].n_rxq;
237                 prox_port_cfg[if_port].pool[targ->rx_port_queue[i].queue] = targ->pool;
238                 prox_port_cfg[if_port].pool_size[targ->rx_port_queue[i].queue] = targ->nb_mbuf - 1;
239                 prox_port_cfg[if_port].n_rxq++;
240
241                 int dsocket = prox_port_cfg[if_port].socket;
242                 if (dsocket != -1 && dsocket != socket) {
243                         plog_warn("RX core on socket %d while device on socket %d\n", socket, dsocket);
244                 }
245         }
246 }
247
248 static void configure_if_queues(void)
249 {
250         struct lcore_cfg *lconf = NULL;
251         struct task_args *targ;
252         uint8_t socket;
253
254         while (core_targ_next(&lconf, &targ, 0) == 0) {
255                 socket = rte_lcore_to_socket_id(lconf->id);
256
257                 configure_if_tx_queues(targ, socket);
258                 configure_if_rx_queues(targ, socket);
259         }
260 }
261
262 static const char *gen_ring_name(void)
263 {
264         static char retval[] = "XX";
265         static const char* ring_names =
266                 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
267                 "abcdefghijklmnopqrstuvwxyz"
268                 "[\\]^_`!\"#$%&'()*+,-./:;<="
269                 ">?@{|}0123456789";
270         static int idx2 = 0;
271
272         int idx = idx2;
273
274         retval[0] = ring_names[idx % strlen(ring_names)];
275         idx /= strlen(ring_names);
276         retval[1] = idx ? ring_names[(idx - 1) % strlen(ring_names)] : 0;
277
278         idx2++;
279
280         return retval;
281 }
282
283 static int task_is_master(struct task_args *targ)
284 {
285         return !targ->lconf;
286 }
287
288 struct ring_init_stats {
289         uint32_t n_pkt_rings;
290         uint32_t n_ctrl_rings;
291         uint32_t n_opt_rings;
292 };
293
294 static uint32_t ring_init_stats_total(const struct ring_init_stats *ris)
295 {
296         return ris->n_pkt_rings + ris->n_ctrl_rings + ris->n_opt_rings;
297 }
298
299 static uint32_t count_incoming_tasks(uint32_t lcore_worker, uint32_t dest_task)
300 {
301         struct lcore_cfg *lconf = NULL;
302         struct task_args *targ;
303         uint32_t ret = 0;
304         struct core_task ct;
305
306         while (core_targ_next(&lconf, &targ, 0) == 0) {
307                 for (uint8_t idxx = 0; idxx < MAX_PROTOCOLS; ++idxx) {
308                         for (uint8_t ridx = 0; ridx < targ->core_task_set[idxx].n_elems; ++ridx) {
309                                 ct = targ->core_task_set[idxx].core_task[ridx];
310
311                                 if (dest_task == ct.task && lcore_worker == ct.core)
312                                         ret++;
313                         }
314                 }
315         }
316         return ret;
317 }
318
319 static struct rte_ring *get_existing_ring(uint32_t lcore_id, uint32_t task_id)
320 {
321         if (!prox_core_active(lcore_id, 0))
322                 return NULL;
323
324         struct lcore_cfg *lconf = &lcore_cfg[lcore_id];
325
326         if (task_id >= lconf->n_tasks_all)
327                 return NULL;
328
329         if (lconf->targs[task_id].nb_rxrings == 0)
330                 return NULL;
331
332         return lconf->targs[task_id].rx_rings[0];
333 }
334
335 static void init_ring_between_tasks(struct lcore_cfg *lconf, struct task_args *starg,
336                                     const struct core_task ct, uint8_t ring_idx, int idx,
337                                     struct ring_init_stats *ris)
338 {
339         uint8_t socket;
340         struct rte_ring *ring = NULL;
341         struct lcore_cfg *lworker;
342         struct task_args *dtarg;
343
344         PROX_ASSERT(prox_core_active(ct.core, 0));
345         lworker = &lcore_cfg[ct.core];
346
347         /* socket used is the one that the sending core resides on */
348         socket = rte_lcore_to_socket_id(lconf->id);
349
350         plog_info("\t\tCreating ring on socket %u with size %u\n"
351                   "\t\t\tsource core, task and socket = %u, %u, %u\n"
352                   "\t\t\tdestination core, task and socket = %u, %u, %u\n"
353                   "\t\t\tdestination worker id = %u\n",
354                   socket, starg->ring_size,
355                   lconf->id, starg->id, socket,
356                   ct.core, ct.task, rte_lcore_to_socket_id(ct.core),
357                   ring_idx);
358
359         if (ct.type) {
360                 struct rte_ring **dring = NULL;
361
362                 if (ct.type == CTRL_TYPE_MSG)
363                         dring = &lworker->ctrl_rings_m[ct.task];
364                 else if (ct.type == CTRL_TYPE_PKT) {
365                         dring = &lworker->ctrl_rings_p[ct.task];
366                         starg->flags |= TASK_ARG_CTRL_RINGS_P;
367                 }
368
369                 if (*dring == NULL)
370                         ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SC_DEQ);
371                 else
372                         ring = *dring;
373                 PROX_PANIC(ring == NULL, "Cannot create ring to connect I/O core %u with worker core %u\n", lconf->id, ct.core);
374
375                 starg->tx_rings[starg->tot_n_txrings_inited] = ring;
376                 starg->tot_n_txrings_inited++;
377                 *dring = ring;
378                 if (lconf->id == prox_cfg.master) {
379                         ctrl_rings[ct.core*MAX_TASKS_PER_CORE + ct.task] = ring;
380                 }
381
382                 plog_info("\t\tCore %u task %u to -> core %u task %u ctrl_ring %s %p %s\n",
383                           lconf->id, starg->id, ct.core, ct.task, ct.type == CTRL_TYPE_PKT?
384                           "pkt" : "msg", ring, ring->name);
385                 ris->n_ctrl_rings++;
386                 return;
387         }
388
389         dtarg = &lworker->targs[ct.task];
390         lworker->targs[ct.task].worker_thread_id = ring_idx;
391         PROX_ASSERT(dtarg->flags & TASK_ARG_RX_RING);
392         PROX_ASSERT(ct.task < lworker->n_tasks_all);
393
394         /* If all the following conditions are met, the ring can be
395            optimized away. */
396         if (!task_is_master(starg) && starg->lconf->id == dtarg->lconf->id &&
397             starg->nb_txrings == 1 && idx == 0 && dtarg->task &&
398             dtarg->tot_rxrings == 1 && starg->task == dtarg->task - 1) {
399                 plog_info("\t\tOptimizing away ring on core %u from task %u to task %u\n",
400                           dtarg->lconf->id, starg->task, dtarg->task);
401                 /* No need to set up ws_mbuf. */
402                 starg->tx_opt_ring = 1;
403                 /* During init of destination task, the buffer in the
404                    source task will be initialized. */
405                 dtarg->tx_opt_ring_task = starg;
406                 ris->n_opt_rings++;
407                 ++dtarg->nb_rxrings;
408                 return;
409         }
410
411         int ring_created = 1;
412         /* Only create multi-producer rings if configured to do so AND
413            there is only one task sending to the task */
414         if ((prox_cfg.flags & DSF_MP_RINGS && count_incoming_tasks(ct.core, ct.task) > 1)
415                 || (prox_cfg.flags & DSF_ENABLE_BYPASS)) {
416                 ring = get_existing_ring(ct.core, ct.task);
417
418                 if (ring) {
419                         plog_info("\t\tCore %u task %u creatign MP ring %p to core %u task %u\n",
420                                   lconf->id, starg->id, ring, ct.core, ct.task);
421                         ring_created = 0;
422                 }
423                 else {
424                         ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SC_DEQ);
425                         plog_info("\t\tCore %u task %u using MP ring %p from core %u task %u\n",
426                                   lconf->id, starg->id, ring, ct.core, ct.task);
427                 }
428         }
429         else
430                 ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SP_ENQ | RING_F_SC_DEQ);
431
432         PROX_PANIC(ring == NULL, "Cannot create ring to connect I/O core %u with worker core %u\n", lconf->id, ct.core);
433
434         starg->tx_rings[starg->tot_n_txrings_inited] = ring;
435         starg->tot_n_txrings_inited++;
436
437         if (ring_created) {
438                 PROX_ASSERT(dtarg->nb_rxrings < MAX_RINGS_PER_TASK);
439                 dtarg->rx_rings[dtarg->nb_rxrings] = ring;
440                 ++dtarg->nb_rxrings;
441         }
442         dtarg->nb_slave_threads = starg->core_task_set[idx].n_elems;
443         dtarg->lb_friend_core = lconf->id;
444         dtarg->lb_friend_task = starg->id;
445         plog_info("\t\tWorker thread %d has core %d, task %d as a lb friend\n", ct.core, lconf->id, starg->id);
446         plog_info("\t\tCore %u task %u tx_ring[%u] -> core %u task %u rx_ring[%u] %p %s %u WT\n",
447                   lconf->id, starg->id, ring_idx, ct.core, ct.task, dtarg->nb_rxrings, ring, ring->name,
448                   dtarg->nb_slave_threads);
449         ++ris->n_pkt_rings;
450 }
451
452 static void init_rings(void)
453 {
454         struct lcore_cfg *lconf = NULL;
455         struct task_args *starg;
456         struct ring_init_stats ris = {0};
457
458         while (core_targ_next(&lconf, &starg, 1) == 0) {
459                 plog_info("\t*** Initializing rings on core %u, task %u ***\n", lconf->id, starg->id);
460                 for (uint8_t idx = 0; idx < MAX_PROTOCOLS; ++idx) {
461                         for (uint8_t ring_idx = 0; ring_idx < starg->core_task_set[idx].n_elems; ++ring_idx) {
462                                 PROX_ASSERT(ring_idx < MAX_WT_PER_LB);
463                                 PROX_ASSERT(starg->tot_n_txrings_inited < MAX_RINGS_PER_TASK);
464
465                                 struct core_task ct = starg->core_task_set[idx].core_task[ring_idx];
466                                 init_ring_between_tasks(lconf, starg, ct, ring_idx, idx, &ris);
467                         }
468                 }
469         }
470
471         plog_info("\tInitialized %d rings:\n"
472                   "\t\tNumber of packet rings: %u\n"
473                   "\t\tNumber of control rings: %u\n"
474                   "\t\tNumber of optimized rings: %u\n",
475                   ring_init_stats_total(&ris),
476                   ris.n_pkt_rings,
477                   ris.n_ctrl_rings,
478                   ris.n_opt_rings);
479 }
480
481 static void shuffle_mempool(struct rte_mempool* mempool, uint32_t nb_mbuf)
482 {
483         struct rte_mbuf** pkts = prox_zmalloc(nb_mbuf * sizeof(*pkts), rte_socket_id());
484         uint64_t got = 0;
485
486         while (rte_mempool_get_bulk(mempool, (void**)(pkts + got), 1) == 0)
487                 ++got;
488
489         while (got) {
490                 int idx;
491                 do {
492                         idx = rand() % nb_mbuf - 1;
493                 } while (pkts[idx] == 0);
494
495                 rte_mempool_put_bulk(mempool, (void**)&pkts[idx], 1);
496                 pkts[idx] = 0;
497                 --got;
498         };
499         prox_free(pkts);
500 }
501
502 static void setup_mempools_unique_per_socket(void)
503 {
504         uint32_t flags = 0;
505         char name[64];
506         struct lcore_cfg *lconf = NULL;
507         struct task_args *targ;
508
509         struct rte_mempool     *pool[MAX_SOCKETS];
510         uint32_t mbuf_count[MAX_SOCKETS] = {0};
511         uint32_t nb_cache_mbuf[MAX_SOCKETS] = {0};
512         uint32_t mbuf_size[MAX_SOCKETS] = {0};
513
514         while (core_targ_next_early(&lconf, &targ, 0) == 0) {
515                 PROX_PANIC(targ->task_init == NULL, "task_init = NULL, is mode specified for core %d, task %d ?\n", lconf->id, targ->id);
516                 uint8_t socket = rte_lcore_to_socket_id(lconf->id);
517                 PROX_ASSERT(socket < MAX_SOCKETS);
518
519                 if (targ->mbuf_size_set_explicitely)
520                         flags = MEMPOOL_F_NO_SPREAD;
521                 if ((!targ->mbuf_size_set_explicitely) && (targ->task_init->mbuf_size != 0)) {
522                         targ->mbuf_size = targ->task_init->mbuf_size;
523                 }
524                 if (targ->rx_port_queue[0].port != OUT_DISCARD) {
525                         struct prox_port_cfg* port_cfg = &prox_port_cfg[targ->rx_port_queue[0].port];
526                         PROX_ASSERT(targ->nb_mbuf != 0);
527                         mbuf_count[socket] += targ->nb_mbuf;
528                         if (nb_cache_mbuf[socket] == 0)
529                                 nb_cache_mbuf[socket] = targ->nb_cache_mbuf;
530                         else {
531                                 PROX_PANIC(nb_cache_mbuf[socket] != targ->nb_cache_mbuf,
532                                            "all mbuf_cache must have the same size if using a unique mempool per socket\n");
533                         }
534                         if (mbuf_size[socket] == 0)
535                                 mbuf_size[socket] = targ->mbuf_size;
536                         else {
537                                 PROX_PANIC(mbuf_size[socket] != targ->mbuf_size,
538                                            "all mbuf_size must have the same size if using a unique mempool per socket\n");
539                         }
540                         if ((!targ->mbuf_size_set_explicitely) && (strcmp(port_cfg->short_name, "vmxnet3") == 0)) {
541                                 if (mbuf_size[socket] < MBUF_SIZE + RTE_PKTMBUF_HEADROOM)
542                                         mbuf_size[socket] = MBUF_SIZE + RTE_PKTMBUF_HEADROOM;
543                         }
544                 }
545         }
546         for (int i = 0 ; i < MAX_SOCKETS; i++) {
547                 if (mbuf_count[i] != 0) {
548                         sprintf(name, "socket_%u_pool", i);
549                         pool[i] = rte_mempool_create(name,
550                                                      mbuf_count[i] - 1, mbuf_size[i],
551                                                      nb_cache_mbuf[i],
552                                                      sizeof(struct rte_pktmbuf_pool_private),
553                                                      rte_pktmbuf_pool_init, NULL,
554                                                      prox_pktmbuf_init, NULL,
555                                                      i, flags);
556                         PROX_PANIC(pool[i] == NULL, "\t\tError: cannot create mempool for socket %u\n", i);
557                         plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", pool[i],
558                                   mbuf_count[i], mbuf_size[i], nb_cache_mbuf[i], i);
559
560                         if (prox_cfg.flags & DSF_SHUFFLE) {
561                                 shuffle_mempool(pool[i], mbuf_count[i]);
562                         }
563                 }
564         }
565
566         lconf = NULL;
567         while (core_targ_next_early(&lconf, &targ, 0) == 0) {
568                 uint8_t socket = rte_lcore_to_socket_id(lconf->id);
569
570                 if (targ->rx_port_queue[0].port != OUT_DISCARD) {
571                         /* use this pool for the interface that the core is receiving from */
572                         /* If one core receives from multiple ports, all the ports use the same mempool */
573                         targ->pool = pool[socket];
574                         /* Set the number of mbuf to the number of the unique mempool, so that the used and free work */
575                         targ->nb_mbuf = mbuf_count[socket];
576                         plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", targ->pool,
577                                   targ->nb_mbuf, mbuf_size[socket], targ->nb_cache_mbuf, socket);
578                 }
579         }
580 }
581
582 static void setup_mempool_for_rx_task(struct lcore_cfg *lconf, struct task_args *targ)
583 {
584         const uint8_t socket = rte_lcore_to_socket_id(lconf->id);
585         struct prox_port_cfg *port_cfg = &prox_port_cfg[targ->rx_port_queue[0].port];
586         const struct rte_memzone *mz;
587         struct rte_mempool *mp = NULL;
588         uint32_t flags = 0;
589         char memzone_name[64];
590         char name[64];
591
592         /* mbuf size can be set
593          *  - from config file (highest priority, overwriting any other config) - should only be used as workaround
594          *  - through each 'mode', overwriting the default mbuf_size
595          *  - defaulted to MBUF_SIZE i.e. 1518 Bytes
596          * Except is set expliciteky, ensure that size is big enough for vmxnet3 driver
597          */
598         if (targ->mbuf_size_set_explicitely) {
599                 flags = MEMPOOL_F_NO_SPREAD;
600                 /* targ->mbuf_size already set */
601         }
602         else if (targ->task_init->mbuf_size != 0) {
603                 /* mbuf_size not set through config file but set through mode */
604                 targ->mbuf_size = targ->task_init->mbuf_size;
605         }
606         else if (strcmp(port_cfg->short_name, "vmxnet3") == 0) {
607                 if (targ->mbuf_size < MBUF_SIZE + RTE_PKTMBUF_HEADROOM)
608                         targ->mbuf_size = MBUF_SIZE + RTE_PKTMBUF_HEADROOM;
609         }
610
611         /* allocate memory pool for packets */
612         PROX_ASSERT(targ->nb_mbuf != 0);
613
614         if (targ->pool_name[0] == '\0') {
615                 sprintf(name, "core_%u_port_%u_pool", lconf->id, targ->id);
616         }
617
618         snprintf(memzone_name, sizeof(memzone_name)-1, "MP_%s", targ->pool_name);
619         mz = rte_memzone_lookup(memzone_name);
620
621         if (mz != NULL) {
622                 mp = (struct rte_mempool*)mz->addr;
623
624                 targ->nb_mbuf = mp->size;
625                 targ->pool = mp;
626         }
627
628 #ifdef RTE_LIBRTE_IVSHMEM_FALSE
629         if (mz != NULL && mp != NULL && mp->phys_addr != mz->ioremap_addr) {
630                 /* Init mbufs with ioremap_addr for dma */
631                 mp->phys_addr = mz->ioremap_addr;
632                 mp->elt_pa[0] = mp->phys_addr + (mp->elt_va_start - (uintptr_t)mp);
633
634                 struct prox_pktmbuf_reinit_args init_args;
635                 init_args.mp = mp;
636                 init_args.lconf = lconf;
637
638                 uint32_t elt_sz = mp->elt_size + mp->header_size + mp->trailer_size;
639                 rte_mempool_obj_iter((void*)mp->elt_va_start, mp->size, elt_sz, 1,
640                                      mp->elt_pa, mp->pg_num, mp->pg_shift, prox_pktmbuf_reinit, &init_args);
641         }
642 #endif
643
644         /* Use this pool for the interface that the core is
645            receiving from if one core receives from multiple
646            ports, all the ports use the same mempool */
647         if (targ->pool == NULL) {
648                 plog_info("\t\tCreating mempool with name '%s'\n", name);
649                 targ->pool = rte_mempool_create(name,
650                                                 targ->nb_mbuf - 1, targ->mbuf_size,
651                                                 targ->nb_cache_mbuf,
652                                                 sizeof(struct rte_pktmbuf_pool_private),
653                                                 rte_pktmbuf_pool_init, NULL,
654                                                 prox_pktmbuf_init, lconf,
655                                                 socket, flags);
656         }
657
658         PROX_PANIC(targ->pool == NULL,
659                    "\t\tError: cannot create mempool for core %u port %u: %s\n", lconf->id, targ->id, rte_strerror(rte_errno));
660
661         plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", targ->pool,
662                   targ->nb_mbuf, targ->mbuf_size, targ->nb_cache_mbuf, socket);
663         if (prox_cfg.flags & DSF_SHUFFLE) {
664                 shuffle_mempool(targ->pool, targ->nb_mbuf);
665         }
666 }
667
668 static void setup_mempools_multiple_per_socket(void)
669 {
670         struct lcore_cfg *lconf = NULL;
671         struct task_args *targ;
672
673         while (core_targ_next_early(&lconf, &targ, 0) == 0) {
674                 PROX_PANIC(targ->task_init == NULL, "task_init = NULL, is mode specified for core %d, task %d ?\n", lconf->id, targ->id);
675                 if (targ->rx_port_queue[0].port == OUT_DISCARD)
676                         continue;
677                 setup_mempool_for_rx_task(lconf, targ);
678         }
679 }
680
681 static void setup_mempools(void)
682 {
683         if (prox_cfg.flags & UNIQUE_MEMPOOL_PER_SOCKET)
684                 setup_mempools_unique_per_socket();
685         else
686                 setup_mempools_multiple_per_socket();
687 }
688
689 static void set_task_lconf(void)
690 {
691         struct lcore_cfg *lconf;
692         uint32_t lcore_id = -1;
693
694         while(prox_core_next(&lcore_id, 0) == 0) {
695                 lconf = &lcore_cfg[lcore_id];
696                 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
697                         lconf->targs[task_id].lconf = lconf;
698                 }
699         }
700 }
701
702 static void set_dest_threads(void)
703 {
704         struct lcore_cfg *lconf = NULL;
705         struct task_args *targ;
706
707         while (core_targ_next(&lconf, &targ, 0) == 0) {
708                 for (uint8_t idx = 0; idx < MAX_PROTOCOLS; ++idx) {
709                         for (uint8_t ring_idx = 0; ring_idx < targ->core_task_set[idx].n_elems; ++ring_idx) {
710                                 struct core_task ct = targ->core_task_set[idx].core_task[ring_idx];
711
712                                 struct task_args *dest_task = core_targ_get(ct.core, ct.task);
713                                 dest_task->prev_tasks[dest_task->n_prev_tasks++] = targ;
714                         }
715                 }
716         }
717 }
718
719 static void setup_all_task_structs_early_init(void)
720 {
721         struct lcore_cfg *lconf = NULL;
722         struct task_args *targ;
723
724         plog_info("\t*** Calling early init on all tasks ***\n");
725         while (core_targ_next(&lconf, &targ, 0) == 0) {
726                 if (targ->task_init->early_init) {
727                         targ->task_init->early_init(targ);
728                 }
729         }
730 }
731
732 static void setup_all_task_structs(void)
733 {
734         struct lcore_cfg *lconf;
735         uint32_t lcore_id = -1;
736
737         while(prox_core_next(&lcore_id, 0) == 0) {
738                 lconf = &lcore_cfg[lcore_id];
739                 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
740                         lconf->tasks_all[task_id] = init_task_struct(&lconf->targs[task_id]);
741                 }
742         }
743 }
744
745 static void init_port_activate(void)
746 {
747         struct lcore_cfg *lconf = NULL;
748         struct task_args *targ;
749         uint8_t port_id = 0;
750
751         while (core_targ_next_early(&lconf, &targ, 0) == 0) {
752                 for (int i = 0; i < targ->nb_rxports; i++) {
753                         port_id = targ->rx_port_queue[i].port;
754                         prox_port_cfg[port_id].active = 1;
755                 }
756
757                 for (int i = 0; i < targ->nb_txports; i++) {
758                         port_id = targ->tx_port_queue[i].port;
759                         prox_port_cfg[port_id].active = 1;
760                 }
761         }
762 }
763
764 /* Initialize cores and allocate mempools */
765 static void init_lcores(void)
766 {
767         struct lcore_cfg *lconf = 0;
768         uint32_t lcore_id = -1;
769
770         while(prox_core_next(&lcore_id, 0) == 0) {
771                 uint8_t socket = rte_lcore_to_socket_id(lcore_id);
772                 PROX_PANIC(socket + 1 > MAX_SOCKETS, "Can't configure core %u (on socket %u). MAX_SOCKET is set to %d\n", lcore_id, socket, MAX_SOCKETS);
773         }
774
775         /* need to allocate mempools as the first thing to use the lowest possible address range */
776         plog_info("=== Initializing mempools ===\n");
777         setup_mempools();
778
779         lcore_cfg_alloc_hp();
780
781         set_dest_threads();
782         set_task_lconf();
783
784         plog_info("=== Initializing port addresses ===\n");
785         init_port_addr();
786
787         plog_info("=== Initializing queue numbers on cores ===\n");
788         configure_if_queues();
789
790         plog_info("=== Initializing rings on cores ===\n");
791         init_rings();
792
793         plog_info("=== Checking configuration consistency ===\n");
794         check_cfg_consistent();
795
796         plog_all_rings();
797
798         setup_all_task_structs_early_init();
799         plog_info("=== Initializing tasks ===\n");
800         setup_all_task_structs();
801 }
802
803 static int setup_prox(int argc, char **argv)
804 {
805         if (prox_read_config_file() != 0 ||
806             prox_setup_rte(argv[0]) != 0) {
807                 return -1;
808         }
809
810         if (prox_cfg.flags & DSF_CHECK_SYNTAX) {
811                 plog_info("=== Configuration file syntax has been checked ===\n\n");
812                 exit(EXIT_SUCCESS);
813         }
814
815         init_port_activate();
816         plog_info("=== Initializing rte devices ===\n");
817         if (!(prox_cfg.flags & DSF_USE_DUMMY_DEVICES))
818                 init_rte_ring_dev();
819         init_rte_dev(prox_cfg.flags & DSF_USE_DUMMY_DEVICES);
820         plog_info("=== Calibrating TSC overhead ===\n");
821         clock_init();
822         plog_info("\tTSC running at %"PRIu64" Hz\n", rte_get_tsc_hz());
823
824         init_lcores();
825         plog_info("=== Initializing ports ===\n");
826         init_port_all();
827
828         if (prox_cfg.logbuf_size) {
829                 prox_cfg.logbuf = prox_zmalloc(prox_cfg.logbuf_size, rte_socket_id());
830                 PROX_PANIC(prox_cfg.logbuf == NULL, "Failed to allocate memory for logbuf with size = %d\n", prox_cfg.logbuf_size);
831         }
832
833         if (prox_cfg.flags & DSF_CHECK_INIT) {
834                 plog_info("=== Initialization sequence completed ===\n\n");
835                 exit(EXIT_SUCCESS);
836         }
837
838         /* Current way that works to disable DPDK logging */
839         FILE *f = fopen("/dev/null", "r");
840         rte_openlog_stream(f);
841         plog_info("=== PROX started ===\n");
842         return 0;
843 }
844
845 static int success = 0;
846 static void siguser_handler(int signal)
847 {
848         if (signal == SIGUSR1)
849                 success = 1;
850         else
851                 success = 0;
852 }
853
854 static void sigabrt_handler(__attribute__((unused)) int signum)
855 {
856         /* restore default disposition for SIGABRT and SIGPIPE */
857         signal(SIGABRT, SIG_DFL);
858         signal(SIGPIPE, SIG_DFL);
859
860         /* ignore further Ctrl-C */
861         signal(SIGINT, SIG_IGN);
862
863         /* more drastic exit on tedious termination signal */
864         plog_info("Aborting...\n");
865         if (lcore_cfg != NULL) {
866                 uint32_t lcore_id;
867                 pthread_t thread_id, tid0, tid = pthread_self();
868                 memset(&tid0, 0, sizeof(tid0));
869
870                 /* cancel all threads except current one */
871                 lcore_id = -1;
872                 while (prox_core_next(&lcore_id, 1) == 0) {
873                         thread_id = lcore_cfg[lcore_id].thread_id;
874                         if (pthread_equal(thread_id, tid0))
875                                 continue;
876                         if (pthread_equal(thread_id, tid))
877                                 continue;
878                         pthread_cancel(thread_id);
879                 }
880
881                 /* wait for cancelled threads to terminate */
882                 lcore_id = -1;
883                 while (prox_core_next(&lcore_id, 1) == 0) {
884                         thread_id = lcore_cfg[lcore_id].thread_id;
885                         if (pthread_equal(thread_id, tid0))
886                                 continue;
887                         if (pthread_equal(thread_id, tid))
888                                 continue;
889                         pthread_join(thread_id, NULL);
890                 }
891         }
892
893         /* close ncurses */
894         display_end();
895
896         /* close ports on termination signal */
897         close_ports_atexit();
898
899         /* terminate now */
900         abort();
901 }
902
903 static void sigterm_handler(int signum)
904 {
905         /* abort on second Ctrl-C */
906         if (signum == SIGINT)
907                 signal(SIGINT, sigabrt_handler);
908
909         /* gracefully quit on harmless termination signal */
910         /* ports will subsequently get closed at resulting exit */
911         quit();
912 }
913
914 int main(int argc, char **argv)
915 {
916         /* set en_US locale to print big numbers with ',' */
917         setlocale(LC_NUMERIC, "en_US.utf-8");
918
919         if (prox_parse_args(argc, argv) != 0){
920                 prox_usage(argv[0]);
921         }
922
923         plog_init(prox_cfg.log_name, prox_cfg.log_name_pid);
924         plog_info("=== " PROGRAM_NAME " " VERSION_STR " ===\n");
925         plog_info("\tUsing DPDK %s\n", rte_version() + sizeof(RTE_VER_PREFIX));
926         read_rdt_info();
927
928         if (prox_cfg.flags & DSF_LIST_TASK_MODES) {
929                 /* list supported task modes and exit */
930                 tasks_list();
931                 return EXIT_SUCCESS;
932         }
933
934         /* close ports at normal exit */
935         atexit(close_ports_atexit);
936         /* gracefully quit on harmless termination signals */
937         signal(SIGHUP, sigterm_handler);
938         signal(SIGINT, sigterm_handler);
939         signal(SIGQUIT, sigterm_handler);
940         signal(SIGTERM, sigterm_handler);
941         signal(SIGUSR1, sigterm_handler);
942         signal(SIGUSR2, sigterm_handler);
943         /* more drastic exit on tedious termination signals */
944         signal(SIGABRT, sigabrt_handler);
945         signal(SIGPIPE, sigabrt_handler);
946
947         if (prox_cfg.flags & DSF_DAEMON) {
948                 signal(SIGUSR1, siguser_handler);
949                 signal(SIGUSR2, siguser_handler);
950                 plog_info("=== Running in Daemon mode ===\n");
951                 plog_info("\tForking child and waiting for setup completion\n");
952
953                 pid_t ppid = getpid();
954                 pid_t pid = fork();
955                 if (pid < 0) {
956                         plog_err("Failed to fork process to run in daemon mode\n");
957                         return EXIT_FAILURE;
958                 }
959
960                 if (pid == 0) {
961                         fclose(stdin);
962                         fclose(stdout);
963                         fclose(stderr);
964                         if (setsid() < 0) {
965                                 kill(ppid, SIGUSR2);
966                                 return EXIT_FAILURE;
967                         }
968                         if (setup_prox(argc, argv) != 0) {
969                                 kill(ppid, SIGUSR2);
970                                 return EXIT_FAILURE;
971                         }
972                         else {
973                                 kill(ppid, SIGUSR1);
974                                 run(prox_cfg.flags);
975                                 return EXIT_SUCCESS;
976                         }
977                 }
978                 else {
979                         /* Before exiting the parent, wait until the
980                            child process has finished setting up */
981                         pause();
982                         if (prox_cfg.logbuf) {
983                                 file_print(prox_cfg.logbuf);
984                         }
985                         return success? EXIT_SUCCESS : EXIT_FAILURE;
986                 }
987         }
988
989         if (setup_prox(argc, argv) != 0)
990                 return EXIT_FAILURE;
991         run(prox_cfg.flags);
992         return EXIT_SUCCESS;
993 }