2 // Copyright (c) 2010-2020 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
23 #include <rte_cycles.h>
24 #include <rte_atomic.h>
25 #include <rte_table_hash.h>
26 #include <rte_memzone.h>
27 #include <rte_errno.h>
29 #include "prox_malloc.h"
37 #include "prox_args.h"
38 #include "prox_assert.h"
40 #include "prox_shared.h"
41 #include "prox_port_cfg.h"
43 #include "hash_utils.h"
44 #include "handle_lb_net.h"
45 #include "prox_cksum.h"
46 #include "thread_nop.h"
47 #include "thread_generic.h"
48 #include "thread_pipeline.h"
50 #include "handle_master.h"
52 #if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0)
53 #define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE
56 uint8_t lb_nb_txrings = 0xff;
57 struct rte_ring *ctrl_rings[RTE_MAX_LCORE*MAX_TASKS_PER_CORE];
59 static void __attribute__((noreturn)) prox_usage(const char *prgname)
61 plog_info("\nUsage: %s [-f CONFIG_FILE] [-a|-e] [-m|-s|-i] [-w DEF] [-u] [-t]\n"
62 "\t-f CONFIG_FILE : configuration file to load, ./prox.cfg by default\n"
63 "\t-l LOG_FILE : log file name, ./prox.log by default\n"
64 "\t-p : include PID in log file name if default log file is used\n"
65 "\t-o DISPLAY: Set display to use, can be 'curses' (default), 'cli' or 'none'\n"
66 "\t-v verbosity : initial logging verbosity\n"
67 "\t-a : autostart all cores (by default)\n"
68 "\t-e : don't autostart\n"
69 "\t-n : Create NULL devices instead of using PCI devices, useful together with -i\n"
70 "\t-m : list supported task modes and exit\n"
71 "\t-s : check configuration file syntax and exit\n"
72 "\t-i : check initialization sequence and exit\n"
73 "\t-u : Listen on UDS /tmp/prox.sock\n"
74 "\t-t : Listen on TCP port 8474\n"
75 "\t-q : Pass argument to Lua interpreter, useful to define variables\n"
76 "\t-w : define variable using syntax varname=value\n"
77 "\t takes precedence over variables defined in CONFIG_FILE\n"
78 "\t-k : Log statistics to file \"stats_dump\" in current directory\n"
79 "\t-d : Run as daemon, the parent process will block until PROX is not initialized\n"
80 "\t-z : Ignore CPU topology, implies -i\n"
81 "\t-r : Change initial screen refresh rate. If set to a lower than 0.001 seconds,\n"
82 "\t screen refreshing will be disabled\n"
87 static void check_mixed_normal_pipeline(void)
89 struct lcore_cfg *lconf = NULL;
90 uint32_t lcore_id = -1;
92 while (prox_core_next(&lcore_id, 0) == 0) {
93 lconf = &lcore_cfg[lcore_id];
95 int all_thread_nop = 1;
99 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
100 struct task_args *targ = &lconf->targs[task_id];
101 l3 = !strcmp("l3", targ->sub_mode_str);
102 all_thread_nop = all_thread_nop && !l3 &&
103 targ->task_init->thread_x == thread_nop;
105 pipeline = pipeline || targ->task_init->thread_x == thread_pipeline;
106 generic = generic || targ->task_init->thread_x == thread_generic || l3;
108 PROX_PANIC(generic && pipeline, "Can't run both pipeline and normal thread on same core\n");
111 lconf->thread_x = thread_nop;
113 lconf->thread_x = thread_generic;
118 static void check_zero_rx(void)
120 struct lcore_cfg *lconf = NULL;
121 struct task_args *targ;
123 while (core_targ_next(&lconf, &targ, 0) == 0) {
124 if (targ->nb_rxports != 0) {
125 PROX_PANIC(task_init_flag_set(targ->task_init, TASK_FEATURE_NO_RX),
126 "\tCore %u task %u: rx_ports configured while mode %s does not use it\n", lconf->id, targ->id, targ->task_init->mode_str);
131 static void check_nb_mbuf(void)
133 struct lcore_cfg *lconf = NULL;
134 struct task_args *targ = NULL;
136 int n_txd = 0, n_rxd = 0;
138 while (core_targ_next(&lconf, &targ, 0) == 0) {
139 for (uint8_t i = 0; i < targ->nb_txports; ++i) {
140 port_id = targ->tx_port_queue[i].port;
141 n_txd = prox_port_cfg[port_id].n_txd;
143 for (uint8_t i = 0; i < targ->nb_rxports; ++i) {
144 port_id = targ->rx_port_queue[i].port;
145 n_rxd = prox_port_cfg[port_id].n_rxd;
147 if (targ->nb_mbuf <= n_rxd + n_txd + targ->nb_cache_mbuf + MAX_PKT_BURST) {
148 plog_warn("Core %d, task %d might not have enough mbufs (%d) to support %d txd, %d rxd and %d cache_mbuf\n",
149 lconf->id, targ->id, targ->nb_mbuf, n_txd, n_rxd, targ->nb_cache_mbuf);
154 static void check_missing_rx(void)
156 struct lcore_cfg *lconf = NULL, *rx_lconf = NULL, *tx_lconf = NULL;
157 struct task_args *targ, *rx_targ = NULL, *tx_targ = NULL;
158 uint8_t port_id, rx_port_id, ok, l3, ndp;
160 while (core_targ_next(&lconf, &targ, 0) == 0) {
161 PROX_PANIC((targ->flags & TASK_ARG_RX_RING) && targ->rx_rings[0] == 0 && !targ->tx_opt_ring_task,
162 "Configuration Error - Core %u task %u Receiving from ring, but nobody xmitting to this ring\n", lconf->id, targ->id);
163 if (targ->nb_rxports == 0 && targ->nb_rxrings == 0) {
164 PROX_PANIC(!task_init_flag_set(targ->task_init, TASK_FEATURE_NO_RX),
165 "\tCore %u task %u: no rx_ports and no rx_rings configured while required by mode %s\n", lconf->id, targ->id, targ->task_init->mode_str);
170 while (core_targ_next(&lconf, &targ, 0) == 0) {
172 if (strcmp(targ->sub_mode_str, "l3") == 0)
174 else if (strcmp(targ->sub_mode_str, "ndp") == 0)
179 PROX_PANIC((targ->nb_rxports == 0) && (targ->nb_txports == 0), "L3/NDP task must have a RX or a TX port\n");
180 // If the L3/NDP sub_mode receives from a port, check that there is at least one core/task
181 // transmitting to this port in L3/NDP sub_mode
182 for (uint8_t i = 0; i < targ->nb_rxports; ++i) {
183 rx_port_id = targ->rx_port_queue[i].port;
186 while (core_targ_next(&tx_lconf, &tx_targ, 0) == 0) {
187 if ((port_id = tx_targ->tx_port_queue[0].port) == OUT_DISCARD)
189 if ((rx_port_id == port_id) &&
190 ( ((tx_targ->flags & TASK_ARG_L3) && l3) ||
191 ((tx_targ->flags & TASK_ARG_NDP) && ndp) ) ) {
196 PROX_PANIC(ok == 0, "RX %s sub mode for port %d on core %d task %d, but no core/task transmitting on that port\n", l3 ? "l3":"ndp", rx_port_id, lconf->id, targ->id);
199 // If the L3/NDP sub_mode transmits to a port, check that there is at least one core/task
200 // receiving from that port in L3/NDP sub_mode.
201 if ((port_id = targ->tx_port_queue[0].port) == OUT_DISCARD)
205 plog_info("\tCore %d task %d transmitting to port %d in %s submode\n", lconf->id, targ->id, port_id, l3 ? "l3":"ndp");
206 while (core_targ_next(&rx_lconf, &rx_targ, 0) == 0) {
207 for (uint8_t i = 0; i < rx_targ->nb_rxports; ++i) {
208 rx_port_id = rx_targ->rx_port_queue[i].port;
209 if ((rx_port_id == port_id) &&
210 ( ((rx_targ->flags & TASK_ARG_L3) && l3) ||
211 ((rx_targ->flags & TASK_ARG_NDP) && ndp) ) ){
217 plog_info("\tCore %d task %d has found core %d task %d receiving from port %d in %s submode\n", lconf->id, targ->id, rx_lconf->id, rx_targ->id, port_id,
218 ((rx_targ->flags & TASK_ARG_L3) && l3) ? "l3":"ndp");
222 PROX_PANIC(ok == 0, "%s sub mode for port %d on core %d task %d, but no core/task receiving on that port\n", l3 ? "l3":"ndp", port_id, lconf->id, targ->id);
226 static void check_cfg_consistent(void)
231 check_mixed_normal_pipeline();
234 static void plog_all_rings(void)
236 struct lcore_cfg *lconf = NULL;
237 struct task_args *targ;
239 while (core_targ_next(&lconf, &targ, 0) == 0) {
240 for (uint8_t ring_idx = 0; ring_idx < targ->nb_rxrings; ++ring_idx) {
241 plog_info("\tCore %u, task %u, rx_ring[%u] %p\n", lconf->id, targ->id, ring_idx, targ->rx_rings[ring_idx]);
246 static int chain_flag_state(struct task_args *targ, uint64_t flag, int is_set)
248 if (task_init_flag_set(targ->task_init, flag) == is_set)
253 for (uint32_t i = 0; i < targ->n_prev_tasks; ++i) {
254 ret = chain_flag_state(targ->prev_tasks[i], flag, is_set);
261 static int chain_flag_always_set(struct task_args *targ, uint64_t flag)
263 return (!chain_flag_state(targ, flag, 0));
266 static int chain_flag_never_set(struct task_args *targ, uint64_t flag)
268 return (!chain_flag_state(targ, flag, 1));
271 static int chain_flag_sometimes_set(struct task_args *targ, uint64_t flag)
273 return (chain_flag_state(targ, flag, 1));
276 static void configure_if_tx_queues(struct task_args *targ, uint8_t socket)
280 for (uint8_t i = 0; i < targ->nb_txports; ++i) {
281 if_port = targ->tx_port_queue[i].port;
283 PROX_PANIC(if_port == OUT_DISCARD, "port misconfigured, exiting\n");
285 PROX_PANIC(!prox_port_cfg[if_port].active, "\tPort %u not used, skipping...\n", if_port);
287 int dsocket = prox_port_cfg[if_port].socket;
288 if (dsocket != -1 && dsocket != socket) {
289 plog_warn("TX core on socket %d while device on socket %d\n", socket, dsocket);
292 if (prox_port_cfg[if_port].tx_ring[0] == '\0') { // Rings-backed port can use single queue
293 targ->tx_port_queue[i].queue = prox_port_cfg[if_port].n_txq;
294 prox_port_cfg[if_port].n_txq++;
296 prox_port_cfg[if_port].n_txq = 1;
297 targ->tx_port_queue[i].queue = 0;
299 /* By default OFFLOAD is enabled, but if the whole
300 chain has NOOFFLOADS set all the way until the
301 first task that receives from a port, it will be
302 disabled for the destination port. */
303 #if RTE_VERSION < RTE_VERSION_NUM(18,8,0,1)
304 if (chain_flag_always_set(targ, TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS)) {
305 prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOOFFLOADS;
308 if (chain_flag_always_set(targ, TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS)) {
309 prox_port_cfg[if_port].requested_tx_offload &= ~(DEV_TX_OFFLOAD_IPV4_CKSUM | DEV_TX_OFFLOAD_UDP_CKSUM);
315 static void configure_if_rx_queues(struct task_args *targ, uint8_t socket)
317 struct prox_port_cfg *port;
318 uint8_t port_used_counter[PROX_MAX_PORTS] = {0};
319 bool multiple_port_reference = false;
320 uint8_t total_number_of_queues = 0;
321 // Check how many times a port is referenced for this task
322 for (uint8_t i = 0; i < targ->nb_rxports; i++) {
323 uint8_t if_port = targ->rx_port_queue[i].port;
324 port_used_counter[if_port]++;
325 if (port_used_counter[if_port] > 1) {
326 multiple_port_reference = true;
327 port = &prox_port_cfg[if_port];
328 PROX_PANIC((port->all_rx_queues), "Multiple queues defined in rx port, but all_rx_queues also set for port %s\n", port->names[0]);
331 // If only referenced once, it is possible that we want to use all queues
332 // Therefore we will check all_rx_queues for that port
333 if (!multiple_port_reference) {
334 for (uint8_t i = 0; i < PROX_MAX_PORTS; i++) {
335 uint8_t if_port = targ->rx_port_queue[i].port;
336 if (port_used_counter[if_port]) {
337 port = &prox_port_cfg[if_port];
338 if (port->all_rx_queues) {
339 port_used_counter[if_port] = port->max_rxq;
340 total_number_of_queues += port->max_rxq;
341 plog_info("\tall_rx_queues for Port %s: %u rx_queues will be applied\n", port->names[0], port_used_counter[if_port]);
346 if (total_number_of_queues) {
347 PROX_PANIC((total_number_of_queues > PROX_MAX_PORTS), "%u queues using the all_rx_queues. PROX_MAX_PORTS is set to %u\n", total_number_of_queues, PROX_MAX_PORTS);
349 for (uint8_t i = 0; i < PROX_MAX_PORTS; i++) {
350 if (port_used_counter[i]) {
351 for (uint8_t j = 0; j < port_used_counter[i]; j++) {
352 targ->rx_port_queue[index].port = i;
355 port = &prox_port_cfg[i];
356 plog_info("\t\tConfiguring task to use port %s with %u rx_queues\n", port->names[0], port_used_counter[i]);
359 targ->nb_rxports = index;
361 for (int i = 0; i < targ->nb_rxports; i++) {
362 uint8_t if_port = targ->rx_port_queue[i].port;
364 if (if_port == OUT_DISCARD) {
368 port = &prox_port_cfg[if_port];
369 PROX_PANIC(!port->active, "Port %u not used, aborting...\n", if_port);
371 if(port->rx_ring[0] != '\0') {
375 // If the mbuf size (of the rx task) is not big enough, we might receive multiple segments
376 // This is usually the case when setting a big mtu size i.e. enabling jumbo frames.
377 // If the packets get transmitted, then multi segments will have to be enabled on the TX port
378 uint16_t max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE;
379 if (max_frame_size + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM > targ->mbuf_size) {
380 targ->task_init->flag_features |= TASK_FEATURE_TXQ_FLAGS_MULTSEGS;
382 targ->rx_port_queue[i].queue = port->n_rxq;
383 port->pool[targ->rx_port_queue[i].queue] = targ->pool;
384 port->pool_size[targ->rx_port_queue[i].queue] = targ->nb_mbuf - 1;
387 int dsocket = port->socket;
388 if (dsocket != -1 && dsocket != socket) {
389 plog_warn("RX core on socket %d while device on socket %d\n", socket, dsocket);
394 static void configure_if_queues(void)
396 struct lcore_cfg *lconf = NULL;
397 struct task_args *targ;
400 while (core_targ_next(&lconf, &targ, 0) == 0) {
401 socket = rte_lcore_to_socket_id(lconf->id);
403 configure_if_rx_queues(targ, socket);
404 configure_if_tx_queues(targ, socket);
408 static void configure_tx_queue_flags(void)
410 struct lcore_cfg *lconf = NULL;
411 struct task_args *targ;
415 while (core_targ_next(&lconf, &targ, 0) == 0) {
416 socket = rte_lcore_to_socket_id(lconf->id);
417 for (uint8_t i = 0; i < targ->nb_txports; ++i) {
418 if_port = targ->tx_port_queue[i].port;
419 #if RTE_VERSION < RTE_VERSION_NUM(18,8,0,1)
420 /* Set the ETH_TXQ_FLAGS_NOREFCOUNT flag if none of
421 the tasks up to the task transmitting to the port
423 if (chain_flag_never_set(targ, TASK_FEATURE_TXQ_FLAGS_REFCOUNT)) {
424 prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOREFCOUNT;
427 /* Set the DEV_TX_OFFLOAD_MBUF_FAST_FREE flag if none of
428 the tasks up to the task transmitting to the port
429 use refcnt and per-queue all mbufs comes from the same mempool. */
430 if (chain_flag_never_set(targ, TASK_FEATURE_TXQ_FLAGS_REFCOUNT)) {
431 if (chain_flag_never_set(targ, TASK_FEATURE_TXQ_FLAGS_MULTIPLE_MEMPOOL))
432 prox_port_cfg[if_port].requested_tx_offload |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
439 static void configure_multi_segments(void)
441 struct lcore_cfg *lconf = NULL;
442 struct task_args *targ;
445 while (core_targ_next(&lconf, &targ, 0) == 0) {
446 for (uint8_t i = 0; i < targ->nb_txports; ++i) {
447 if_port = targ->tx_port_queue[i].port;
448 // Multi segment is disabled for most tasks. It is only enabled for tasks requiring big packets.
449 #if RTE_VERSION < RTE_VERSION_NUM(18,8,0,1)
450 // We can only enable "no multi segment" if no such task exists in the chain of tasks.
451 if (chain_flag_never_set(targ, TASK_FEATURE_TXQ_FLAGS_MULTSEGS)) {
452 prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
455 // We enable "multi segment" if at least one task requires it in the chain of tasks.
456 if (chain_flag_sometimes_set(targ, TASK_FEATURE_TXQ_FLAGS_MULTSEGS)) {
457 prox_port_cfg[if_port].requested_tx_offload |= DEV_TX_OFFLOAD_MULTI_SEGS;
464 static const char *gen_ring_name(void)
466 static char retval[] = "XX";
467 static const char* ring_names =
468 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
469 "abcdefghijklmnopqrstuvwxyz"
470 "[\\]^_`!\"#$%&'()*+,-./:;<="
476 retval[0] = ring_names[idx % strlen(ring_names)];
477 idx /= strlen(ring_names);
478 retval[1] = idx ? ring_names[(idx - 1) % strlen(ring_names)] : 0;
485 struct ring_init_stats {
486 uint32_t n_pkt_rings;
487 uint32_t n_ctrl_rings;
488 uint32_t n_opt_rings;
491 static uint32_t ring_init_stats_total(const struct ring_init_stats *ris)
493 return ris->n_pkt_rings + ris->n_ctrl_rings + ris->n_opt_rings;
496 static uint32_t count_incoming_tasks(uint32_t lcore_worker, uint32_t dest_task)
498 struct lcore_cfg *lconf = NULL;
499 struct task_args *targ;
503 while (core_targ_next(&lconf, &targ, 0) == 0) {
504 for (uint8_t idxx = 0; idxx < MAX_PROTOCOLS; ++idxx) {
505 for (uint8_t ridx = 0; ridx < targ->core_task_set[idxx].n_elems; ++ridx) {
506 ct = targ->core_task_set[idxx].core_task[ridx];
508 if (dest_task == ct.task && lcore_worker == ct.core)
516 static struct rte_ring *get_existing_ring(uint32_t lcore_id, uint32_t task_id)
518 if (!prox_core_active(lcore_id, 0))
521 struct lcore_cfg *lconf = &lcore_cfg[lcore_id];
523 if (task_id >= lconf->n_tasks_all)
526 if (lconf->targs[task_id].nb_rxrings == 0)
529 return lconf->targs[task_id].rx_rings[0];
532 static struct rte_ring *init_ring_between_tasks(struct lcore_cfg *lconf, struct task_args *starg,
533 const struct core_task ct, uint8_t ring_idx, int idx,
534 struct ring_init_stats *ris)
537 struct rte_ring *ring = NULL;
538 struct lcore_cfg *lworker;
539 struct task_args *dtarg;
541 PROX_ASSERT(prox_core_active(ct.core, 0));
542 lworker = &lcore_cfg[ct.core];
544 /* socket used is the one that the sending core resides on */
545 socket = rte_lcore_to_socket_id(lconf->id);
547 plog_info("\t\tCreating ring on socket %u with size %u\n"
548 "\t\t\tsource core, task and socket = %u, %u, %u\n"
549 "\t\t\tdestination core, task and socket = %u, %u, %u\n"
550 "\t\t\tdestination worker id = %u\n",
551 socket, starg->ring_size,
552 lconf->id, starg->id, socket,
553 ct.core, ct.task, rte_lcore_to_socket_id(ct.core),
557 struct rte_ring **dring = NULL;
559 if (ct.type == CTRL_TYPE_MSG)
560 dring = &lworker->ctrl_rings_m[ct.task];
561 else if (ct.type == CTRL_TYPE_PKT) {
562 dring = &lworker->ctrl_rings_p[ct.task];
563 starg->flags |= TASK_ARG_CTRL_RINGS_P;
567 ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SC_DEQ);
570 PROX_PANIC(ring == NULL, "Cannot create ring to connect I/O core %u with worker core %u\n", lconf->id, ct.core);
572 starg->tx_rings[starg->tot_n_txrings_inited] = ring;
573 starg->tot_n_txrings_inited++;
575 if (lconf->id == prox_cfg.master) {
576 ctrl_rings[ct.core*MAX_TASKS_PER_CORE + ct.task] = ring;
577 } else if (ct.core == prox_cfg.master) {
578 starg->ctrl_plane_ring = ring;
581 plog_info("\t\t\tCore %u task %u to -> core %u task %u ctrl_ring %s %p %s\n",
582 lconf->id, starg->id, ct.core, ct.task, ct.type == CTRL_TYPE_PKT?
583 "pkt" : "msg", ring, ring->name);
588 dtarg = &lworker->targs[ct.task];
589 lworker->targs[ct.task].worker_thread_id = ring_idx;
590 PROX_ASSERT(dtarg->flags & TASK_ARG_RX_RING);
591 PROX_ASSERT(ct.task < lworker->n_tasks_all);
593 /* If all the following conditions are met, the ring can be
595 if (!task_is_master(starg) && !task_is_master(dtarg) && starg->lconf->id == dtarg->lconf->id &&
596 starg->nb_txrings == 1 && idx == 0 && dtarg->task &&
597 dtarg->tot_rxrings == 1 && starg->task == dtarg->task - 1) {
598 plog_info("\t\tOptimizing away ring on core %u from task %u to task %u\n",
599 dtarg->lconf->id, starg->task, dtarg->task);
600 /* No need to set up ws_mbuf. */
601 starg->tx_opt_ring = 1;
602 /* During init of destination task, the buffer in the
603 source task will be initialized. */
604 dtarg->tx_opt_ring_task = starg;
610 int ring_created = 1;
611 /* Only create multi-producer rings if configured to do so AND
612 there is only one task sending to the task */
613 if ((prox_cfg.flags & DSF_MP_RINGS && count_incoming_tasks(ct.core, ct.task) > 1)
614 || (prox_cfg.flags & DSF_ENABLE_BYPASS)) {
615 ring = get_existing_ring(ct.core, ct.task);
618 plog_info("\t\tCore %u task %u creatign MP ring %p to core %u task %u\n",
619 lconf->id, starg->id, ring, ct.core, ct.task);
623 ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SC_DEQ);
624 plog_info("\t\tCore %u task %u using MP ring %p from core %u task %u\n",
625 lconf->id, starg->id, ring, ct.core, ct.task);
629 ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SP_ENQ | RING_F_SC_DEQ);
631 PROX_PANIC(ring == NULL, "Cannot create ring to connect I/O core %u with worker core %u\n", lconf->id, ct.core);
633 starg->tx_rings[starg->tot_n_txrings_inited] = ring;
634 starg->tot_n_txrings_inited++;
637 PROX_ASSERT(dtarg->nb_rxrings < MAX_RINGS_PER_TASK);
638 dtarg->rx_rings[dtarg->nb_rxrings] = ring;
640 if (dtarg->nb_rxrings > 1)
641 dtarg->task_init->flag_features |= TASK_FEATURE_TXQ_FLAGS_MULTIPLE_MEMPOOL;
643 dtarg->nb_slave_threads = starg->core_task_set[idx].n_elems;
644 dtarg->lb_friend_core = lconf->id;
645 dtarg->lb_friend_task = starg->id;
646 plog_info("\t\tWorker thread %d has core %d, task %d as a lb friend\n", ct.core, lconf->id, starg->id);
647 plog_info("\t\tCore %u task %u tx_ring[%u] -> core %u task %u rx_ring[%u] %p %s %u WT\n",
648 lconf->id, starg->id, ring_idx, ct.core, ct.task, dtarg->nb_rxrings, ring, ring->name,
649 dtarg->nb_slave_threads);
654 static void init_rings(void)
656 struct lcore_cfg *lconf = NULL;
657 struct task_args *starg;
658 struct ring_init_stats ris = {0};
660 while (core_targ_next(&lconf, &starg, 1) == 0) {
661 plog_info("\t*** Initializing rings on core %u, task %u ***\n", lconf->id, starg->id);
662 for (uint8_t idx = 0; idx < MAX_PROTOCOLS; ++idx) {
663 for (uint8_t ring_idx = 0; ring_idx < starg->core_task_set[idx].n_elems; ++ring_idx) {
664 PROX_ASSERT(ring_idx < MAX_WT_PER_LB);
665 PROX_ASSERT(starg->tot_n_txrings_inited < MAX_RINGS_PER_TASK);
667 struct core_task ct = starg->core_task_set[idx].core_task[ring_idx];
668 init_ring_between_tasks(lconf, starg, ct, ring_idx, idx, &ris);
673 plog_info("\tInitialized %d rings:\n"
674 "\t\tNumber of packet rings: %u\n"
675 "\t\tNumber of control rings: %u\n"
676 "\t\tNumber of optimized rings: %u\n",
677 ring_init_stats_total(&ris),
683 struct prox_port_cfg *port;
684 while (core_targ_next(&lconf, &starg, 1) == 0) {
685 if ((starg->task_init) && (starg->flags & (TASK_ARG_L3|TASK_ARG_NDP))) {
687 ct.core = prox_cfg.master;
689 ct.type = CTRL_TYPE_PKT;
690 struct rte_ring *rx_ring = init_ring_between_tasks(lconf, starg, ct, 0, 0, &ris);
693 ct.task = starg->id;;
694 struct rte_ring *tx_ring = init_ring_between_tasks(&lcore_cfg[prox_cfg.master], lcore_cfg[prox_cfg.master].targs, ct, 0, 0, &ris);
699 static void shuffle_mempool(struct rte_mempool* mempool, uint32_t nb_mbuf)
701 struct rte_mbuf** pkts = prox_zmalloc(nb_mbuf * sizeof(*pkts), rte_socket_id());
704 while ((got < nb_mbuf) && (rte_mempool_get_bulk(mempool, (void**)(pkts + got), 1) == 0))
711 idx = rand() % nb_mbuf;
712 } while (pkts[idx] == 0);
714 rte_mempool_put_bulk(mempool, (void**)&pkts[idx], 1);
721 static void set_mbuf_size(struct task_args *targ)
723 /* mbuf size can be set
724 * - from config file (highest priority, overwriting any other config) - should only be used as workaround
725 * - defaulted to MBUF_SIZE.
726 * Except if set explicitely, ensure that size is big enough for vmxnet3 driver
731 targ->mbuf_size = MBUF_SIZE;
732 struct prox_port_cfg *port;
733 uint16_t max_frame_size = 0, min_buffer_size = 0;
735 for (int i = 0; i < targ->nb_rxports; i++) {
736 uint8_t if_port = targ->rx_port_queue[i].port;
738 if (if_port == OUT_DISCARD) {
741 port = &prox_port_cfg[if_port];
742 if (max_frame_size < port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE)
743 max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE;
744 if (min_buffer_size < port->min_rx_bufsize)
745 min_buffer_size = port->min_rx_bufsize;
747 // Check whether we receive from i40e. This driver have extra mbuf size requirements
748 if (strcmp(port->short_name, "i40e") == 0)
752 // i40e supports a maximum of 5 descriptors chained
753 uint16_t required_mbuf_size = RTE_ALIGN(max_frame_size / 5, 128) + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
754 if (required_mbuf_size > targ->mbuf_size) {
755 targ->mbuf_size = required_mbuf_size;
756 plog_info("\t\tSetting mbuf_size to %u to support frame_size %u\n", targ->mbuf_size, max_frame_size);
759 if (min_buffer_size > targ->mbuf_size) {
760 plog_warn("Mbuf size might be too small. This might result in packet segmentation and memory leak\n");
765 static void setup_mempools_unique_per_socket(void)
769 struct lcore_cfg *lconf = NULL;
770 struct task_args *targ;
772 struct rte_mempool *pool[MAX_SOCKETS];
773 uint32_t mbuf_count[MAX_SOCKETS] = {0};
774 uint32_t nb_cache_mbuf[MAX_SOCKETS] = {0};
775 uint32_t mbuf_size[MAX_SOCKETS] = {0};
777 while (core_targ_next_early(&lconf, &targ, 0) == 0) {
778 PROX_PANIC(targ->task_init == NULL, "task_init = NULL, is mode specified for core %d, task %d ?\n", lconf->id, targ->id);
779 uint8_t socket = rte_lcore_to_socket_id(lconf->id);
780 PROX_ASSERT(socket < MAX_SOCKETS);
783 if (targ->rx_port_queue[0].port != OUT_DISCARD) {
784 struct prox_port_cfg* port_cfg = &prox_port_cfg[targ->rx_port_queue[0].port];
785 PROX_ASSERT(targ->nb_mbuf != 0);
786 mbuf_count[socket] += targ->nb_mbuf;
787 if (nb_cache_mbuf[socket] == 0)
788 nb_cache_mbuf[socket] = targ->nb_cache_mbuf;
790 PROX_PANIC(nb_cache_mbuf[socket] != targ->nb_cache_mbuf,
791 "all mbuf_cache must have the same size if using a unique mempool per socket\n");
793 if (mbuf_size[socket] == 0)
794 mbuf_size[socket] = targ->mbuf_size;
796 PROX_PANIC(mbuf_size[socket] != targ->mbuf_size,
797 "all mbuf_size must have the same size if using a unique mempool per socket\n");
801 for (int i = 0 ; i < MAX_SOCKETS; i++) {
802 if (mbuf_count[i] != 0) {
803 sprintf(name, "socket_%u_pool", i);
804 if ((pool[i] = rte_mempool_lookup(name)) == NULL) {
805 pool[i] = rte_mempool_create(name,
806 mbuf_count[i] - 1, mbuf_size[i],
808 sizeof(struct rte_pktmbuf_pool_private),
809 rte_pktmbuf_pool_init, NULL,
810 prox_pktmbuf_init, NULL,
812 PROX_PANIC(pool[i] == NULL, "\t\tError: cannot create mempool for socket %u\n", i);
813 plog_info("\tMempool %p size = %u * %u cache %u, socket %d\n", pool[i],
814 mbuf_count[i], mbuf_size[i], nb_cache_mbuf[i], i);
816 if (prox_cfg.flags & DSF_SHUFFLE) {
817 shuffle_mempool(pool[i], mbuf_count[i]);
824 while (core_targ_next_early(&lconf, &targ, 0) == 0) {
825 uint8_t socket = rte_lcore_to_socket_id(lconf->id);
827 if (targ->rx_port_queue[0].port != OUT_DISCARD) {
828 /* use this pool for the interface that the core is receiving from */
829 /* If one core receives from multiple ports, all the ports use the same mempool */
830 targ->pool = pool[socket];
831 /* Set the number of mbuf to the number of the unique mempool, so that the used and free work */
832 targ->nb_mbuf = mbuf_count[socket];
833 plog_info("\tMempool %p size = %u * %u cache %u, socket %d\n", targ->pool,
834 targ->nb_mbuf, mbuf_size[socket], targ->nb_cache_mbuf, socket);
839 static void setup_mempool_for_rx_task(struct lcore_cfg *lconf, struct task_args *targ)
841 const uint8_t socket = rte_lcore_to_socket_id(lconf->id);
842 struct prox_port_cfg *port_cfg = &prox_port_cfg[targ->rx_port_queue[0].port];
843 const struct rte_memzone *mz;
844 struct rte_mempool *mp = NULL;
846 char memzone_name[64];
851 /* allocate memory pool for packets */
852 PROX_ASSERT(targ->nb_mbuf != 0);
854 if (targ->pool_name[0] == '\0') {
855 sprintf(name, "core_%u_task_%u_pool", lconf->id, targ->id);
858 snprintf(memzone_name, sizeof(memzone_name)-1, "MP_%s", targ->pool_name);
859 mz = rte_memzone_lookup(memzone_name);
862 mp = (struct rte_mempool*)mz->addr;
864 targ->nb_mbuf = mp->size;
868 #ifdef RTE_LIBRTE_IVSHMEM_FALSE
869 if (mz != NULL && mp != NULL && mp->phys_addr != mz->ioremap_addr) {
870 /* Init mbufs with ioremap_addr for dma */
871 mp->phys_addr = mz->ioremap_addr;
872 mp->elt_pa[0] = mp->phys_addr + (mp->elt_va_start - (uintptr_t)mp);
874 struct prox_pktmbuf_reinit_args init_args;
876 init_args.lconf = lconf;
878 uint32_t elt_sz = mp->elt_size + mp->header_size + mp->trailer_size;
879 rte_mempool_obj_iter((void*)mp->elt_va_start, mp->size, elt_sz, 1,
880 mp->elt_pa, mp->pg_num, mp->pg_shift, prox_pktmbuf_reinit, &init_args);
884 /* Use this pool for the interface that the core is
885 receiving from if one core receives from multiple
886 ports, all the ports use the same mempool */
887 if (targ->pool == NULL) {
888 plog_info("\tCreating mempool with name '%s' on socket %d\n", name, socket);
889 targ->pool = rte_mempool_create(name,
890 targ->nb_mbuf - 1, targ->mbuf_size,
892 sizeof(struct rte_pktmbuf_pool_private),
893 rte_pktmbuf_pool_init, NULL,
894 prox_pktmbuf_init, lconf,
898 PROX_PANIC(targ->pool == NULL,
899 "\tError: cannot create mempool for core %u port %u: %s\n", lconf->id, targ->id, rte_strerror(rte_errno));
901 plog_info("\tMempool %p size = %u * %u cache %u, socket %d\n", targ->pool,
902 targ->nb_mbuf, targ->mbuf_size, targ->nb_cache_mbuf, socket);
903 if (prox_cfg.flags & DSF_SHUFFLE) {
904 shuffle_mempool(targ->pool, targ->nb_mbuf);
908 static void setup_mempools_multiple_per_socket(void)
910 struct lcore_cfg *lconf = NULL;
911 struct task_args *targ;
913 while (core_targ_next_early(&lconf, &targ, 0) == 0) {
914 PROX_PANIC(targ->task_init == NULL, "task_init = NULL, is mode specified for core %d, task %d ?\n", lconf->id, targ->id);
915 if (targ->rx_port_queue[0].port == OUT_DISCARD)
917 setup_mempool_for_rx_task(lconf, targ);
921 static void setup_mempools(void)
923 if (prox_cfg.flags & UNIQUE_MEMPOOL_PER_SOCKET)
924 setup_mempools_unique_per_socket();
926 setup_mempools_multiple_per_socket();
929 static void set_task_lconf(void)
931 struct lcore_cfg *lconf;
932 uint32_t lcore_id = -1;
934 while(prox_core_next(&lcore_id, 1) == 0) {
935 lconf = &lcore_cfg[lcore_id];
936 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
937 lconf->targs[task_id].lconf = lconf;
942 static void set_dest_threads(void)
944 struct lcore_cfg *lconf = NULL;
945 struct task_args *targ;
947 while (core_targ_next(&lconf, &targ, 0) == 0) {
948 for (uint8_t idx = 0; idx < MAX_PROTOCOLS; ++idx) {
949 for (uint8_t ring_idx = 0; ring_idx < targ->core_task_set[idx].n_elems; ++ring_idx) {
950 struct core_task ct = targ->core_task_set[idx].core_task[ring_idx];
952 struct task_args *dest_task = core_targ_get(ct.core, ct.task);
953 dest_task->prev_tasks[dest_task->n_prev_tasks++] = targ;
959 static void setup_all_task_structs_early_init(void)
961 struct lcore_cfg *lconf = NULL;
962 struct task_args *targ;
964 plog_info("\t*** Calling early init on all tasks ***\n");
965 while (core_targ_next(&lconf, &targ, 0) == 0) {
966 if (targ->task_init->early_init) {
967 targ->task_init->early_init(targ);
972 static void setup_all_task_structs(void)
974 struct lcore_cfg *lconf;
975 uint32_t lcore_id = -1;
976 struct task_base *tmaster = NULL;
978 while(prox_core_next(&lcore_id, 1) == 0) {
979 lconf = &lcore_cfg[lcore_id];
980 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
981 if (task_is_master(&lconf->targs[task_id])) {
982 plog_info("\tInitializing MASTER struct for core %d task %d\n", lcore_id, task_id);
983 lconf->tasks_all[task_id] = init_task_struct(&lconf->targs[task_id]);
984 tmaster = lconf->tasks_all[task_id];
988 PROX_PANIC(tmaster == NULL, "Can't initialize master task\n");
991 while(prox_core_next(&lcore_id, 1) == 0) {
992 lconf = &lcore_cfg[lcore_id];
993 plog_info("\t*** Initializing core %d (%d task) ***\n", lcore_id, lconf->n_tasks_all);
994 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
995 if (!task_is_master(&lconf->targs[task_id])) {
996 plog_info("\t\tInitializing struct for core %d task %d\n", lcore_id, task_id);
997 lconf->targs[task_id].tmaster = tmaster;
998 lconf->tasks_all[task_id] = init_task_struct(&lconf->targs[task_id]);
1004 static void init_port_activate(void)
1006 struct lcore_cfg *lconf = NULL;
1007 struct task_args *targ;
1008 uint8_t port_id = 0;
1010 while (core_targ_next_early(&lconf, &targ, 0) == 0) {
1011 for (int i = 0; i < targ->nb_rxports; i++) {
1012 port_id = targ->rx_port_queue[i].port;
1013 prox_port_cfg[port_id].active = 1;
1016 for (int i = 0; i < targ->nb_txports; i++) {
1017 port_id = targ->tx_port_queue[i].port;
1018 prox_port_cfg[port_id].active = 1;
1023 /* Initialize cores and allocate mempools */
1024 static void init_lcores(void)
1026 struct lcore_cfg *lconf = 0;
1027 uint32_t lcore_id = -1;
1029 while(prox_core_next(&lcore_id, 0) == 0) {
1030 uint8_t socket = rte_lcore_to_socket_id(lcore_id);
1031 PROX_PANIC(socket + 1 > MAX_SOCKETS, "Can't configure core %u (on socket %u). MAX_SOCKET is set to %d\n", lcore_id, socket, MAX_SOCKETS);
1034 /* need to allocate mempools as the first thing to use the lowest possible address range */
1035 plog_info("=== Initializing mempools ===\n");
1038 lcore_cfg_alloc_hp();
1043 plog_info("=== Initializing port addresses ===\n");
1046 plog_info("=== Initializing queue numbers on cores ===\n");
1047 configure_if_queues();
1049 plog_info("=== Initializing rings on cores ===\n");
1052 configure_multi_segments();
1053 configure_tx_queue_flags();
1055 plog_info("=== Checking configuration consistency ===\n");
1056 check_cfg_consistent();
1061 static int setup_prox(int argc, char **argv)
1063 if (prox_read_config_file() != 0 ||
1064 prox_setup_rte(argv[0]) != 0) {
1068 if (prox_cfg.flags & DSF_CHECK_SYNTAX) {
1069 plog_info("=== Configuration file syntax has been checked ===\n\n");
1073 init_port_activate();
1074 plog_info("=== Initializing rte devices ===\n");
1075 if (!(prox_cfg.flags & DSF_USE_DUMMY_DEVICES))
1076 init_rte_ring_dev();
1077 init_rte_dev(prox_cfg.flags & DSF_USE_DUMMY_DEVICES);
1078 plog_info("=== Calibrating TSC overhead ===\n");
1080 plog_info("\tTSC running at %"PRIu64" Hz\n", rte_get_tsc_hz());
1083 plog_info("=== Initializing ports ===\n");
1086 setup_all_task_structs_early_init();
1087 plog_info("=== Initializing tasks ===\n");
1088 setup_all_task_structs();
1090 if (prox_cfg.logbuf_size) {
1091 prox_cfg.logbuf = prox_zmalloc(prox_cfg.logbuf_size, rte_socket_id());
1092 PROX_PANIC(prox_cfg.logbuf == NULL, "Failed to allocate memory for logbuf with size = %d\n", prox_cfg.logbuf_size);
1095 if (prox_cfg.flags & DSF_CHECK_INIT) {
1096 plog_info("=== Initialization sequence completed ===\n\n");
1100 /* Current way that works to disable DPDK logging */
1101 FILE *f = fopen("/dev/null", "r");
1102 rte_openlog_stream(f);
1103 plog_info("=== PROX started ===\n");
1107 static int success = 0;
1108 static void siguser_handler(int signal)
1110 if (signal == SIGUSR1)
1116 static void sigabrt_handler(__attribute__((unused)) int signum)
1118 /* restore default disposition for SIGABRT and SIGPIPE */
1119 signal(SIGABRT, SIG_DFL);
1120 signal(SIGPIPE, SIG_DFL);
1122 /* ignore further Ctrl-C */
1123 signal(SIGINT, SIG_IGN);
1125 /* more drastic exit on tedious termination signal */
1126 plog_info("Aborting...\n");
1127 if (lcore_cfg != NULL) {
1129 pthread_t thread_id, tid0, tid = pthread_self();
1130 memset(&tid0, 0, sizeof(tid0));
1132 /* cancel all threads except current one */
1134 while (prox_core_next(&lcore_id, 1) == 0) {
1135 thread_id = lcore_cfg[lcore_id].thread_id;
1136 if (pthread_equal(thread_id, tid0))
1138 if (pthread_equal(thread_id, tid))
1140 pthread_cancel(thread_id);
1143 /* wait for cancelled threads to terminate */
1145 while (prox_core_next(&lcore_id, 1) == 0) {
1146 thread_id = lcore_cfg[lcore_id].thread_id;
1147 if (pthread_equal(thread_id, tid0))
1149 if (pthread_equal(thread_id, tid))
1151 pthread_join(thread_id, NULL);
1158 /* close ports on termination signal */
1159 close_ports_atexit();
1165 static void sigterm_handler(int signum)
1167 /* abort on second Ctrl-C */
1168 if (signum == SIGINT)
1169 signal(SIGINT, sigabrt_handler);
1171 /* gracefully quit on harmless termination signal */
1172 /* ports will subsequently get closed at resulting exit */
1176 static void set_term_env(void)
1178 static const char var[] = "TERM";
1179 static char str[] = "TERM=putty";
1180 char *old_value, *new_value;
1181 int max_ver = 0, min_ver = 0, n;
1183 old_value = getenv(var);
1185 const char *ncurses_version = curses_version();
1186 n = sscanf(ncurses_version, "ncurses %d.%d", &max_ver, &min_ver);
1188 plog_info("\tUnable to extract ncurses version from %s. TERM left unchanged to %s\n", ncurses_version, old_value);
1191 plog_info("\tncurses version = %d.%d (%s)\n", max_ver, min_ver, ncurses_version);
1194 if (((max_ver > 6) || ((max_ver == 6) && (min_ver >= 1))) && (strcmp(old_value, "xterm") == 0)) {
1195 // On recent OSes such as RHEL 8.0, ncurses(6.1) introduced support
1196 // for ECMA-48 repeat character control.
1197 // Some terminal emulators use TERM=xterm but do not support this feature.
1198 // In this case, printing repeating character such as "22000000 Hz" might
1199 // display as 220 Hz.
1200 // Other emulattors, such as tmux, use TERM=screen, and do not exhibit the issue.
1201 plog_info("\tChanged TERM from %s ", old_value);
1203 new_value = getenv(var);
1204 plog_info("to %s\n", new_value);
1206 plog_info("\tTERM left unchanged to %s\n", old_value);
1210 int main(int argc, char **argv)
1212 /* set en_US locale to print big numbers with ',' */
1213 setlocale(LC_NUMERIC, "en_US.utf-8");
1215 if (prox_parse_args(argc, argv) != 0){
1216 prox_usage(argv[0]);
1218 plog_init(prox_cfg.log_name, prox_cfg.log_name_pid);
1219 plog_info("=== " PROGRAM_NAME " %s ===\n", VERSION_STR());
1220 plog_info("\tUsing DPDK %s\n", rte_version() + sizeof(RTE_VER_PREFIX));
1224 if (prox_cfg.flags & DSF_LIST_TASK_MODES) {
1225 /* list supported task modes and exit */
1227 return EXIT_SUCCESS;
1230 /* close ports at normal exit */
1231 atexit(close_ports_atexit);
1232 /* gracefully quit on harmless termination signals */
1233 signal(SIGHUP, sigterm_handler);
1234 signal(SIGINT, sigterm_handler);
1235 signal(SIGQUIT, sigterm_handler);
1236 signal(SIGTERM, sigterm_handler);
1237 signal(SIGUSR1, sigterm_handler);
1238 signal(SIGUSR2, sigterm_handler);
1239 /* more drastic exit on tedious termination signals */
1240 signal(SIGABRT, sigabrt_handler);
1241 signal(SIGPIPE, sigabrt_handler);
1243 if (prox_cfg.flags & DSF_DAEMON) {
1244 signal(SIGUSR1, siguser_handler);
1245 signal(SIGUSR2, siguser_handler);
1246 plog_info("=== Running in Daemon mode ===\n");
1247 plog_info("\tForking child and waiting for setup completion\n");
1249 pid_t ppid = getpid();
1252 plog_err("Failed to fork process to run in daemon mode\n");
1253 return EXIT_FAILURE;
1261 kill(ppid, SIGUSR2);
1262 return EXIT_FAILURE;
1264 if (setup_prox(argc, argv) != 0) {
1265 kill(ppid, SIGUSR2);
1266 return EXIT_FAILURE;
1269 kill(ppid, SIGUSR1);
1270 run(prox_cfg.flags);
1271 return EXIT_SUCCESS;
1275 /* Before exiting the parent, wait until the
1276 child process has finished setting up */
1278 if (prox_cfg.logbuf) {
1279 file_print(prox_cfg.logbuf);
1281 return success? EXIT_SUCCESS : EXIT_FAILURE;
1285 if (setup_prox(argc, argv) != 0)
1286 return EXIT_FAILURE;
1287 run(prox_cfg.flags);
1289 return EXIT_SUCCESS;