Support packets in flight
[samplevnf.git] / VNFs / DPPD-PROX / thread_pipeline.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_cycles.h>
18 #include <rte_port_ethdev.h>
19 #include <rte_port_ring.h>
20 #include <rte_version.h>
21
22 #include "log.h"
23 #include "quit.h"
24 #include "thread_pipeline.h"
25 #include "lconf.h"
26 #include "defines.h"
27
28 /* Helper function: create pipeline, input ports and output ports */
29 void init_pipe_create_in_out(struct task_pipe *tpipe, struct task_args *targ)
30 {
31         struct task_base *tbase = (struct task_base *)tpipe;
32         const char *name = targ->lconf->name;
33         const char *mode = targ->task_init->mode_str;
34         uint8_t lcore_id = targ->lconf->id;
35         uint8_t task_id = targ->task;
36         int err;
37
38         /* create pipeline */
39         struct rte_pipeline_params pipeline_params = {
40                 .name = name,
41                 .socket_id = rte_lcore_to_socket_id(lcore_id),
42         };
43         tpipe->p = rte_pipeline_create(&pipeline_params);
44         PROX_PANIC(tpipe->p == NULL,
45                         "Failed to create %s pipeline on core %u task %u\n",
46                         mode, lcore_id, task_id);
47
48         /* create pipeline input ports */
49         if (targ->nb_rxrings != 0) {
50                 for (uint8_t i = 0; i < tbase->rx_params_sw.nb_rxrings; ++i) {
51                         struct rte_port_ring_reader_params port_ring_params = {
52                                 .ring = tbase->rx_params_sw.rx_rings[i],
53                         };
54                         struct rte_pipeline_port_in_params port_params = {
55                                 .ops = &rte_port_ring_reader_ops,
56                                 .arg_create = &port_ring_params,
57                                 .f_action = NULL, //TODO: fill metadata
58                                 .arg_ah = NULL,
59                                 .burst_size = MAX_RING_BURST,
60                         };
61                         err = rte_pipeline_port_in_create(tpipe->p,
62                                         &port_params, &tpipe->port_in_id[i]);
63                         PROX_PANIC(err != 0, "Failed to create SW input port %u "
64                                         "for %s pipeline on core %u task %u: "
65                                         "err = %d\n",
66                                         i, mode, lcore_id, task_id, err);
67                 }
68                 tpipe->n_ports_in = tbase->rx_params_sw.nb_rxrings;
69         }
70         else {
71                 for (uint8_t i = 0; i < tbase->rx_params_hw.nb_rxports; ++i) {
72                         struct rte_port_ethdev_reader_params port_ethdev_params = {
73                                 .port_id = tbase->rx_params_hw.rx_pq[i].port,
74                                 .queue_id = tbase->rx_params_hw.rx_pq[i].queue,
75                         };
76                         struct rte_pipeline_port_in_params port_params = {
77                                 .ops = &rte_port_ethdev_reader_ops,
78                                 .arg_create = &port_ethdev_params,
79                                 .f_action = NULL, //TODO: fill metadata
80                                 .arg_ah = NULL,
81                                 .burst_size = MAX_PKT_BURST,
82                         };
83                         err = rte_pipeline_port_in_create(tpipe->p,
84                                         &port_params, &tpipe->port_in_id[0]);
85                         PROX_PANIC(err != 0, "Failed to create HW input port "
86                                         "for %s pipeline on core %u task %u: "
87                                         "err = %d\n",
88                                         mode, lcore_id, task_id, err);
89                 }
90                 tpipe->n_ports_in = tbase->rx_params_hw.nb_rxports;
91         }
92         PROX_PANIC(tpipe->n_ports_in < 1, "No input port created "
93                         "for %s pipeline on core %u task %u\n",
94                         mode, lcore_id, task_id);
95
96         /* create pipeline output ports */
97         if (targ->nb_txrings != 0) {
98                 for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
99                         struct rte_port_ring_writer_params port_ring_params = {
100                                 .ring = tbase->tx_params_sw.tx_rings[i],
101                                 .tx_burst_sz = MAX_RING_BURST,
102                         };
103                         struct rte_pipeline_port_out_params port_params = {
104                                 .ops = &rte_port_ring_writer_ops,
105                                 .arg_create = &port_ring_params,
106                                 .f_action = NULL,       //TODO
107 #if RTE_VERSION < RTE_VERSION_NUM(16,4,0,0)
108                                 .f_action_bulk = NULL,  //TODO
109 #endif
110                                 .arg_ah = NULL,
111                         };
112                         err = rte_pipeline_port_out_create(tpipe->p,
113                                         &port_params, &tpipe->port_out_id[i]);
114                         PROX_PANIC(err != 0, "Failed to create SW output port %u "
115                                         "for %s pipeline on core %u task %u: "
116                                         "err = %d\n",
117                                         i, mode, lcore_id, task_id, err);
118                 }
119                 tpipe->n_ports_out = tbase->tx_params_sw.nb_txrings;
120         }
121         else {
122                 for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
123                         struct rte_port_ethdev_writer_params port_ethdev_params = {
124                                 .port_id = tbase->tx_params_hw.tx_port_queue[i].port,
125                                 .queue_id = tbase->tx_params_hw.tx_port_queue[i].queue,
126                                 .tx_burst_sz = MAX_PKT_BURST,
127                         };
128                         struct rte_pipeline_port_out_params port_params = {
129                                 .ops = &rte_port_ethdev_writer_ops,
130                                 .arg_create = &port_ethdev_params,
131                                 .f_action = NULL,       //TODO
132 #if RTE_VERSION < RTE_VERSION_NUM(16,4,0,0)
133                                 .f_action_bulk = NULL,  //TODO
134 #endif
135                                 .arg_ah = NULL,
136                         };
137                         err = rte_pipeline_port_out_create(tpipe->p,
138                                         &port_params, &tpipe->port_out_id[i]);
139                         PROX_PANIC(err != 0, "Failed to create HW output port %u "
140                                         "for %s pipeline on core %u task %u: "
141                                         "err = %d\n",
142                                         i, mode, lcore_id, task_id, err);
143                 }
144                 tpipe->n_ports_out = tbase->tx_params_hw.nb_txports;
145         }
146         PROX_PANIC(tpipe->n_ports_out < 1, "No output port created "
147                         "for %s pipeline on core %u task %u\n",
148                         mode, lcore_id, task_id);
149 }
150
151 /* Helper function: connect pipeline input ports to one pipeline table */
152 void init_pipe_connect_one(struct task_pipe *tpipe, struct task_args *targ,
153                 uint32_t table_id)
154 {
155         const char *mode = targ->task_init->mode_str;
156         uint8_t lcore_id = targ->lconf->id;
157         uint8_t task_id = targ->task;
158         int err;
159
160         for (uint8_t i = 0; i < tpipe->n_ports_in; ++i) {
161                 err = rte_pipeline_port_in_connect_to_table(tpipe->p,
162                                 tpipe->port_in_id[i], table_id);
163                 PROX_PANIC(err != 0, "Failed to connect input port %u to table id %u "
164                                 "for %s pipeline on core %u task %u: "
165                                 "err = %d\n",
166                                 i, table_id, mode, lcore_id, task_id, err);
167         }
168 }
169
170 /* Helper function: connect pipeline input ports to all pipeline tables */
171 void init_pipe_connect_all(struct task_pipe *tpipe, struct task_args *targ)
172 {
173         const char *mode = targ->task_init->mode_str;
174         uint8_t lcore_id = targ->lconf->id;
175         uint8_t task_id = targ->task;
176         int err;
177
178         PROX_PANIC(tpipe->n_tables < tpipe->n_ports_in,
179                         "Not enough tables (%u) to connect %u input ports "
180                         "for %s pipeline on core %u task %u\n",
181                         tpipe->n_tables, tpipe->n_ports_in,
182                         mode, lcore_id, task_id);
183
184         for (uint8_t i = 0; i < tpipe->n_ports_in; ++i) {
185                 err = rte_pipeline_port_in_connect_to_table(tpipe->p,
186                                 tpipe->port_in_id[i], tpipe->table_id[i]);
187                 PROX_PANIC(err != 0, "Failed to connect input port %u to table id %u "
188                                 "for %s pipeline on core %u task %u: "
189                                 "err = %d\n",
190                                 i, tpipe->table_id[i], mode, lcore_id, task_id, err);
191         }
192 }
193
194 /* Helper function: enable pipeline input ports */
195 void init_pipe_enable(struct task_pipe *tpipe, struct task_args *targ)
196 {
197         const char *mode = targ->task_init->mode_str;
198         uint8_t lcore_id = targ->lconf->id;
199         uint8_t task_id = targ->task;
200         int err;
201
202         for (uint8_t i = 0; i < tpipe->n_ports_in; ++i) {
203                 err = rte_pipeline_port_in_enable(tpipe->p, tpipe->port_in_id[i]);
204                 PROX_PANIC(err != 0, "Failed to enable input port %u "
205                                 "for %s pipeline on core %u task %u: "
206                                 "err = %d\n",
207                                 i, mode, lcore_id, task_id, err);
208         }
209 }
210
211 /* Helper function: check pipeline consistency */
212 void init_pipe_check(struct task_pipe *tpipe, struct task_args *targ)
213 {
214         const char *mode = targ->task_init->mode_str;
215         uint8_t lcore_id = targ->lconf->id;
216         uint8_t task_id = targ->task;
217         int err;
218
219         err = rte_pipeline_check(tpipe->p);
220         PROX_PANIC(err != 0, "Failed consistency check "
221                         "for %s pipeline on core %u task %u: "
222                         "err = %d\n",
223                         mode, lcore_id, task_id, err);
224 }
225
226 /* This function will panic on purpose: tasks based on Packet Framework
227    pipelines should not be invoked via the usual task_base.handle_bulk method */
228 int handle_pipe(struct task_base *tbase,
229                 __attribute__((unused)) struct rte_mbuf **mbufs,
230                 __attribute__((unused)) uint16_t n_pkts)
231 {
232         uint32_t lcore_id = rte_lcore_id();
233         struct lcore_cfg *lconf = &lcore_cfg[lcore_id];
234
235         for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
236                 struct task_args *targ = &lconf->targs[task_id];
237                 if (lconf->tasks_all[task_id] == tbase) {
238                         PROX_PANIC(1, "Error on core %u task %u: cannot run "
239                                         "%s pipeline and other non-PF tasks\n",
240                                         lcore_id, task_id, targ->task_init->mode_str);
241                 }
242         }
243         PROX_PANIC(1, "Error: cannot find task on core %u\n", lcore_id);
244         return 0;
245 }
246
247 int thread_pipeline(struct lcore_cfg *lconf)
248 {
249         struct task_pipe *pipes[MAX_TASKS_PER_CORE];
250         uint64_t cur_tsc = rte_rdtsc();
251         uint64_t term_tsc = cur_tsc + TERM_TIMEOUT;
252         uint64_t drain_tsc = cur_tsc + DRAIN_TIMEOUT;
253         const uint8_t nb_tasks = lconf->n_tasks_all;
254
255         for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
256                 //TODO: solve other mutually exclusive thread/tasks
257                 struct task_args *targ = &lconf->targs[task_id];
258                 PROX_PANIC(targ->task_init->thread_x != thread_pipeline,
259                                 "Invalid task %u '%s' on core %u: %s() can only "
260                                 "run tasks based on Packet Framework pipelines\n",
261                                 targ->task, targ->task_init->mode_str,
262                                 targ->lconf->id, __func__);
263
264                 pipes[task_id] = (struct task_pipe *)lconf->tasks_all[task_id];
265         }
266
267         lconf->flags |= LCONF_FLAG_RUNNING;
268         for (;;) {
269                 cur_tsc = rte_rdtsc();
270                 if (cur_tsc > drain_tsc) {
271                         drain_tsc = cur_tsc + DRAIN_TIMEOUT;
272
273                         if (cur_tsc > term_tsc) {
274                                 term_tsc = cur_tsc + TERM_TIMEOUT;
275                                 if (lconf->msg.req && lconf->msg.type == LCONF_MSG_STOP) {
276                                         lconf->flags &= ~LCONF_FLAG_RUNNING;
277                                         break;
278                                 }
279                                 if (!lconf_is_req(lconf)) {
280                                         lconf_unset_req(lconf);
281                                         plog_warn("Command ignored (lconf functions not supported in Packet Framework pipelines)\n");
282                                 }
283                         }
284
285                         for (uint8_t task_id = 0; task_id < nb_tasks; ++task_id) {
286                                 rte_pipeline_flush(pipes[task_id]->p);
287                         }
288                 }
289
290                 for (uint8_t task_id = 0; task_id < nb_tasks; ++task_id) {
291                         rte_pipeline_run(pipes[task_id]->p);
292                 }
293         }
294         return 0;
295 }