2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include <rte_cycles.h>
18 #include <rte_port_ethdev.h>
19 #include <rte_port_ring.h>
20 #include <rte_version.h>
24 #include "thread_pipeline.h"
28 /* Helper function: create pipeline, input ports and output ports */
29 void init_pipe_create_in_out(struct task_pipe *tpipe, struct task_args *targ)
31 struct task_base *tbase = (struct task_base *)tpipe;
32 const char *name = targ->lconf->name;
33 const char *mode = targ->task_init->mode_str;
34 uint8_t lcore_id = targ->lconf->id;
35 uint8_t task_id = targ->task;
39 struct rte_pipeline_params pipeline_params = {
41 .socket_id = rte_lcore_to_socket_id(lcore_id),
43 tpipe->p = rte_pipeline_create(&pipeline_params);
44 PROX_PANIC(tpipe->p == NULL,
45 "Failed to create %s pipeline on core %u task %u\n",
46 mode, lcore_id, task_id);
48 /* create pipeline input ports */
49 if (targ->nb_rxrings != 0) {
50 for (uint8_t i = 0; i < tbase->rx_params_sw.nb_rxrings; ++i) {
51 struct rte_port_ring_reader_params port_ring_params = {
52 .ring = tbase->rx_params_sw.rx_rings[i],
54 struct rte_pipeline_port_in_params port_params = {
55 .ops = &rte_port_ring_reader_ops,
56 .arg_create = &port_ring_params,
57 .f_action = NULL, //TODO: fill metadata
59 .burst_size = MAX_RING_BURST,
61 err = rte_pipeline_port_in_create(tpipe->p,
62 &port_params, &tpipe->port_in_id[i]);
63 PROX_PANIC(err != 0, "Failed to create SW input port %u "
64 "for %s pipeline on core %u task %u: "
66 i, mode, lcore_id, task_id, err);
68 tpipe->n_ports_in = tbase->rx_params_sw.nb_rxrings;
71 for (uint8_t i = 0; i < tbase->rx_params_hw.nb_rxports; ++i) {
72 struct rte_port_ethdev_reader_params port_ethdev_params = {
73 .port_id = tbase->rx_params_hw.rx_pq[i].port,
74 .queue_id = tbase->rx_params_hw.rx_pq[i].queue,
76 struct rte_pipeline_port_in_params port_params = {
77 .ops = &rte_port_ethdev_reader_ops,
78 .arg_create = &port_ethdev_params,
79 .f_action = NULL, //TODO: fill metadata
81 .burst_size = MAX_PKT_BURST,
83 err = rte_pipeline_port_in_create(tpipe->p,
84 &port_params, &tpipe->port_in_id[0]);
85 PROX_PANIC(err != 0, "Failed to create HW input port "
86 "for %s pipeline on core %u task %u: "
88 mode, lcore_id, task_id, err);
90 tpipe->n_ports_in = tbase->rx_params_hw.nb_rxports;
92 PROX_PANIC(tpipe->n_ports_in < 1, "No input port created "
93 "for %s pipeline on core %u task %u\n",
94 mode, lcore_id, task_id);
96 /* create pipeline output ports */
97 if (targ->nb_txrings != 0) {
98 for (uint8_t i = 0; i < tbase->tx_params_sw.nb_txrings; ++i) {
99 struct rte_port_ring_writer_params port_ring_params = {
100 .ring = tbase->tx_params_sw.tx_rings[i],
101 .tx_burst_sz = MAX_RING_BURST,
103 struct rte_pipeline_port_out_params port_params = {
104 .ops = &rte_port_ring_writer_ops,
105 .arg_create = &port_ring_params,
106 .f_action = NULL, //TODO
107 #if RTE_VERSION < RTE_VERSION_NUM(16,4,0,0)
108 .f_action_bulk = NULL, //TODO
112 err = rte_pipeline_port_out_create(tpipe->p,
113 &port_params, &tpipe->port_out_id[i]);
114 PROX_PANIC(err != 0, "Failed to create SW output port %u "
115 "for %s pipeline on core %u task %u: "
117 i, mode, lcore_id, task_id, err);
119 tpipe->n_ports_out = tbase->tx_params_sw.nb_txrings;
122 for (uint8_t i = 0; i < tbase->tx_params_hw.nb_txports; ++i) {
123 struct rte_port_ethdev_writer_params port_ethdev_params = {
124 .port_id = tbase->tx_params_hw.tx_port_queue[i].port,
125 .queue_id = tbase->tx_params_hw.tx_port_queue[i].queue,
126 .tx_burst_sz = MAX_PKT_BURST,
128 struct rte_pipeline_port_out_params port_params = {
129 .ops = &rte_port_ethdev_writer_ops,
130 .arg_create = &port_ethdev_params,
131 .f_action = NULL, //TODO
132 #if RTE_VERSION < RTE_VERSION_NUM(16,4,0,0)
133 .f_action_bulk = NULL, //TODO
137 err = rte_pipeline_port_out_create(tpipe->p,
138 &port_params, &tpipe->port_out_id[i]);
139 PROX_PANIC(err != 0, "Failed to create HW output port %u "
140 "for %s pipeline on core %u task %u: "
142 i, mode, lcore_id, task_id, err);
144 tpipe->n_ports_out = tbase->tx_params_hw.nb_txports;
146 PROX_PANIC(tpipe->n_ports_out < 1, "No output port created "
147 "for %s pipeline on core %u task %u\n",
148 mode, lcore_id, task_id);
151 /* Helper function: connect pipeline input ports to one pipeline table */
152 void init_pipe_connect_one(struct task_pipe *tpipe, struct task_args *targ,
155 const char *mode = targ->task_init->mode_str;
156 uint8_t lcore_id = targ->lconf->id;
157 uint8_t task_id = targ->task;
160 for (uint8_t i = 0; i < tpipe->n_ports_in; ++i) {
161 err = rte_pipeline_port_in_connect_to_table(tpipe->p,
162 tpipe->port_in_id[i], table_id);
163 PROX_PANIC(err != 0, "Failed to connect input port %u to table id %u "
164 "for %s pipeline on core %u task %u: "
166 i, table_id, mode, lcore_id, task_id, err);
170 /* Helper function: connect pipeline input ports to all pipeline tables */
171 void init_pipe_connect_all(struct task_pipe *tpipe, struct task_args *targ)
173 const char *mode = targ->task_init->mode_str;
174 uint8_t lcore_id = targ->lconf->id;
175 uint8_t task_id = targ->task;
178 PROX_PANIC(tpipe->n_tables < tpipe->n_ports_in,
179 "Not enough tables (%u) to connect %u input ports "
180 "for %s pipeline on core %u task %u\n",
181 tpipe->n_tables, tpipe->n_ports_in,
182 mode, lcore_id, task_id);
184 for (uint8_t i = 0; i < tpipe->n_ports_in; ++i) {
185 err = rte_pipeline_port_in_connect_to_table(tpipe->p,
186 tpipe->port_in_id[i], tpipe->table_id[i]);
187 PROX_PANIC(err != 0, "Failed to connect input port %u to table id %u "
188 "for %s pipeline on core %u task %u: "
190 i, tpipe->table_id[i], mode, lcore_id, task_id, err);
194 /* Helper function: enable pipeline input ports */
195 void init_pipe_enable(struct task_pipe *tpipe, struct task_args *targ)
197 const char *mode = targ->task_init->mode_str;
198 uint8_t lcore_id = targ->lconf->id;
199 uint8_t task_id = targ->task;
202 for (uint8_t i = 0; i < tpipe->n_ports_in; ++i) {
203 err = rte_pipeline_port_in_enable(tpipe->p, tpipe->port_in_id[i]);
204 PROX_PANIC(err != 0, "Failed to enable input port %u "
205 "for %s pipeline on core %u task %u: "
207 i, mode, lcore_id, task_id, err);
211 /* Helper function: check pipeline consistency */
212 void init_pipe_check(struct task_pipe *tpipe, struct task_args *targ)
214 const char *mode = targ->task_init->mode_str;
215 uint8_t lcore_id = targ->lconf->id;
216 uint8_t task_id = targ->task;
219 err = rte_pipeline_check(tpipe->p);
220 PROX_PANIC(err != 0, "Failed consistency check "
221 "for %s pipeline on core %u task %u: "
223 mode, lcore_id, task_id, err);
226 /* This function will panic on purpose: tasks based on Packet Framework
227 pipelines should not be invoked via the usual task_base.handle_bulk method */
228 int handle_pipe(struct task_base *tbase,
229 __attribute__((unused)) struct rte_mbuf **mbufs,
230 __attribute__((unused)) uint16_t n_pkts)
232 uint32_t lcore_id = rte_lcore_id();
233 struct lcore_cfg *lconf = &lcore_cfg[lcore_id];
235 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
236 struct task_args *targ = &lconf->targs[task_id];
237 if (lconf->tasks_all[task_id] == tbase) {
238 PROX_PANIC(1, "Error on core %u task %u: cannot run "
239 "%s pipeline and other non-PF tasks\n",
240 lcore_id, task_id, targ->task_init->mode_str);
243 PROX_PANIC(1, "Error: cannot find task on core %u\n", lcore_id);
247 int thread_pipeline(struct lcore_cfg *lconf)
249 struct task_pipe *pipes[MAX_TASKS_PER_CORE];
250 uint64_t cur_tsc = rte_rdtsc();
251 uint64_t term_tsc = cur_tsc + TERM_TIMEOUT;
252 uint64_t drain_tsc = cur_tsc + DRAIN_TIMEOUT;
253 const uint8_t nb_tasks = lconf->n_tasks_all;
255 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
256 //TODO: solve other mutually exclusive thread/tasks
257 struct task_args *targ = &lconf->targs[task_id];
258 PROX_PANIC(targ->task_init->thread_x != thread_pipeline,
259 "Invalid task %u '%s' on core %u: %s() can only "
260 "run tasks based on Packet Framework pipelines\n",
261 targ->task, targ->task_init->mode_str,
262 targ->lconf->id, __func__);
264 pipes[task_id] = (struct task_pipe *)lconf->tasks_all[task_id];
267 lconf->flags |= LCONF_FLAG_RUNNING;
269 cur_tsc = rte_rdtsc();
270 if (cur_tsc > drain_tsc) {
271 drain_tsc = cur_tsc + DRAIN_TIMEOUT;
273 if (cur_tsc > term_tsc) {
274 term_tsc = cur_tsc + TERM_TIMEOUT;
275 if (lconf->msg.req && lconf->msg.type == LCONF_MSG_STOP) {
276 lconf->flags &= ~LCONF_FLAG_RUNNING;
279 if (!lconf_is_req(lconf)) {
280 lconf_unset_req(lconf);
281 plog_warn("Command ignored (lconf functions not supported in Packet Framework pipelines)\n");
285 for (uint8_t task_id = 0; task_id < nb_tasks; ++task_id) {
286 rte_pipeline_flush(pipes[task_id]->p);
290 for (uint8_t task_id = 0; task_id < nb_tasks; ++task_id) {
291 rte_pipeline_run(pipes[task_id]->p);