2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
18 #include <rte_cycles.h>
19 #include <rte_table_hash.h>
22 #include "thread_generic.h"
26 #include "hash_entry_types.h"
28 #include "hash_utils.h"
32 uint64_t (* tsc_task)(struct lcore_cfg *lconf);
35 static uint64_t tsc_drain(struct lcore_cfg *lconf)
37 lconf_flush_all_queues(lconf);
41 static uint64_t tsc_term(struct lcore_cfg *lconf)
43 if (lconf_is_req(lconf) && lconf_do_flags(lconf)) {
44 lconf_flush_all_queues(lconf);
50 static uint64_t tsc_period(struct lcore_cfg *lconf)
52 lconf->period_func(lconf->period_data);
53 return lconf->period_timeout;
56 static uint64_t tsc_ctrl(struct lcore_cfg *lconf)
58 const uint8_t n_tasks_all = lconf->n_tasks_all;
59 void *msgs[MAX_RING_BURST];
62 for (uint8_t task_id = 0; task_id < n_tasks_all; ++task_id) {
63 if (lconf->ctrl_rings_m[task_id] && lconf->ctrl_func_m[task_id]) {
64 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
65 n_msgs = rte_ring_sc_dequeue_burst(lconf->ctrl_rings_m[task_id], msgs, MAX_RING_BURST);
67 n_msgs = rte_ring_sc_dequeue_burst(lconf->ctrl_rings_m[task_id], msgs, MAX_RING_BURST, NULL);
70 lconf->ctrl_func_m[task_id](lconf->tasks_all[task_id], msgs, n_msgs);
73 if (lconf->ctrl_rings_p[task_id] && lconf->ctrl_func_p[task_id]) {
74 #if RTE_VERSION < RTE_VERSION_NUM(17,5,0,1)
75 n_msgs = rte_ring_sc_dequeue_burst(lconf->ctrl_rings_p[task_id], msgs, MAX_RING_BURST);
77 n_msgs = rte_ring_sc_dequeue_burst(lconf->ctrl_rings_p[task_id], msgs, MAX_RING_BURST, NULL);
80 lconf->ctrl_func_p[task_id](lconf->tasks_all[task_id], (struct rte_mbuf **)msgs, n_msgs);
84 return lconf->ctrl_timeout;
87 static void set_thread_policy(int policy)
90 int ret, old_policy, old_priority;
92 memset(&p, 0, sizeof(p));
93 ret = pthread_getschedparam(pthread_self(), &old_policy, &p);
95 plog_err("Failed getting thread policy: %d\n", ret);
98 old_priority = p.sched_priority;
99 p.sched_priority = sched_get_priority_max(policy);
100 ret = pthread_setschedparam(pthread_self(), policy, &p);
102 plog_err("Failed setting thread priority: %d", ret);
104 plog_info("Thread policy/priority changed from %d/%d to %d/%d\n", old_policy, old_priority, policy, p.sched_priority);
107 int thread_generic(struct lcore_cfg *lconf)
109 struct task_base *tasks[MAX_TASKS_PER_CORE];
110 int next[MAX_TASKS_PER_CORE] = {0};
111 struct rte_mbuf **mbufs;
112 uint64_t cur_tsc = rte_rdtsc();
113 uint8_t zero_rx[MAX_TASKS_PER_CORE] = {0};
114 struct tsc_task tsc_tasks[] = {
115 {.tsc = cur_tsc, .tsc_task = tsc_term},
116 {.tsc = cur_tsc + DRAIN_TIMEOUT, .tsc_task = tsc_drain},
121 uint8_t n_tasks_run = lconf->n_tasks_run;
123 if (lconf->flags & LCONF_FLAG_SCHED_RR)
124 set_thread_policy(SCHED_RR);
126 if (lconf->period_func) {
127 tsc_tasks[2].tsc = cur_tsc + lconf->period_timeout;
128 tsc_tasks[2].tsc_task = tsc_period;
131 for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
132 if (lconf->ctrl_func_m[task_id]) {
133 tsc_tasks[3].tsc = cur_tsc + lconf->ctrl_timeout;
134 tsc_tasks[3].tsc_task = tsc_ctrl;
137 if (lconf->ctrl_func_p[task_id]) {
138 tsc_tasks[3].tsc = cur_tsc + lconf->ctrl_timeout;
139 tsc_tasks[3].tsc_task = tsc_ctrl;
145 for (size_t i = 0; i < sizeof(tsc_tasks)/sizeof(tsc_tasks[0]); ++i) {
146 for (size_t j = i + 1; j < sizeof(tsc_tasks)/sizeof(tsc_tasks[0]); ++j) {
147 if (tsc_tasks[i].tsc > tsc_tasks[j].tsc) {
148 struct tsc_task tmp = tsc_tasks[i];
149 tsc_tasks[i] = tsc_tasks[j];
154 struct tsc_task next_tsc = tsc_tasks[0];
157 cur_tsc = rte_rdtsc();
158 /* Sort scheduled tsc_tasks starting from earliest
159 first. A linear search is performed moving
160 tsc_tasks that are scheduled earlier to the front
161 of the list. There is a high frequency tsc_task in
162 most cases. As a consequence, the currently
163 scheduled tsc_task will be rescheduled to be
164 executed as the first again. If many tsc_tasks are
165 to be used, the algorithm should be replaced with a
166 priority-queue (heap). */
167 if (unlikely(cur_tsc >= next_tsc.tsc)) {
168 uint64_t resched_diff = tsc_tasks[0].tsc_task(lconf);
170 if (resched_diff == (uint64_t)-2) {
171 n_tasks_run = lconf->n_tasks_run;
174 for (int i = 0; i < lconf->n_tasks_run; ++i) {
175 tasks[i] = lconf->tasks_run[i];
177 uint8_t task_id = lconf_get_task_id(lconf, tasks[i]);
178 if (lconf->targs[task_id].task_init->flag_features & TASK_FEATURE_ZERO_RX)
183 uint64_t new_tsc = tsc_tasks[0].tsc + resched_diff;
184 tsc_tasks[0].tsc = new_tsc;
185 next_tsc.tsc = new_tsc;
187 for (size_t i = 1; i < sizeof(tsc_tasks)/sizeof(tsc_tasks[0]); ++i) {
188 if (new_tsc < tsc_tasks[i].tsc) {
190 tsc_tasks[i - 1] = next_tsc;
191 next_tsc = tsc_tasks[0];
196 tsc_tasks[i - 1] = tsc_tasks[i];
201 for (uint8_t task_id = 0; task_id < n_tasks_run; ++task_id) {
202 struct task_base *t = tasks[task_id];
203 struct task_args *targ = &lconf->targs[task_id];
204 // Do not skip a task receiving packets from an optimized ring
205 // as the transmitting task expects such a receiving task to always run and consume
206 // the transmitted packets.
207 if (unlikely(next[task_id] && (targ->tx_opt_ring_task == NULL))) {
208 // plogx_info("task %d is too busy\n", task_id);
211 nb_rx = t->rx_pkt(t, &mbufs);
212 if (likely(nb_rx || zero_rx[task_id])) {
213 next[task_id] = t->handle_bulk(t, mbufs, nb_rx);