2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
21 #include <rte_table_hash.h>
22 #include <rte_byteorder.h>
23 #include <rte_version.h>
25 #include "prox_malloc.h"
26 #include "handle_lb_net.h"
27 #include "task_base.h"
37 #include "hash_utils.h"
39 #include "flow_iter.h"
41 #if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0)
42 #define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE
46 struct task_base base;
49 uint8_t nb_worker_threads;
50 uint8_t worker_byte_offset_ipv4;
51 uint8_t worker_byte_offset_ipv6;
52 uint8_t runtime_flags;
55 struct task_lb_net_lut {
56 struct task_base base;
57 uint8_t nb_worker_threads;
58 uint8_t runtime_flags;
59 struct rte_table_hash *worker_hash_table;
62 struct rte_mbuf *fake_packets[64];
65 static inline uint8_t handle_lb_net(struct task_lb_net *task, struct rte_mbuf *mbuf);
66 static inline int extract_gre_key(struct task_lb_net_lut *task, uint32_t *key, struct rte_mbuf *mbuf);
68 static struct rte_table_hash *setup_gre_to_wt_lookup(struct task_args *targ, uint8_t n_workers, int socket_id)
73 struct rte_table_hash *ret;
76 for (int i = 0; i < n_workers; ++i) {
77 struct core_task ct = targ->core_task_set[0].core_task[i];
78 struct task_args *t = core_targ_get(ct.core, ct.task);
80 struct flow_iter *it = &t->task_init->flow_iter;
82 PROX_PANIC(t->task_init->flow_iter.beg == NULL,
83 "Load distributor can't find flows owned by destination worker %d\n", i);
85 for (it->beg(it, t); !it->is_end(it, t); it->next(it, t)) {
90 struct rte_table_hash_ext_params table_hash_params = {
94 .n_buckets_ext = count >> 1,
97 .signature_offset = HASH_METADATA_OFFSET(0),
98 .key_offset = HASH_METADATA_OFFSET(0),
101 ret = rte_table_hash_ext_dosig_ops.f_create(&table_hash_params, socket_id, sizeof(uint8_t));
103 for (int i = 0; i < n_workers; ++i) {
104 struct core_task ct = targ->core_task_set[0].core_task[i];
105 struct task_args *t = core_targ_get(ct.core, ct.task);
107 PROX_PANIC(t->task_init->flow_iter.beg == NULL,
108 "Load distributor can't find flows owned by destination worker %d\n", i);
110 struct flow_iter *it = &t->task_init->flow_iter;
112 for (it->beg(it, t); !it->is_end(it, t); it->next(it, t)) {
113 uint32_t gre_id = it->get_gre_id(it, t);
116 r = rte_table_hash_ext_dosig_ops.f_add(ret, &gre_id, &dst, &key_found, &entry_in_hash);
118 plog_err("Failed to add gre_id = %x, dest worker = %u\n", gre_id, i);
121 plog_dbg("Core %u added: gre_id %x, dest woker = %u\n", targ->lconf->id, gre_id, i);
128 static uint8_t *setup_wt_indexed_table(struct task_args *targ, uint8_t n_workers, int socket_id)
130 uint32_t gre_id, rss;
131 uint32_t max_gre_id = 0;
137 for (int i = 0; i < n_workers; ++i) {
138 struct core_task ct = targ->core_task_set[0].core_task[i];
139 struct task_args *t = core_targ_get(ct.core, ct.task);
141 struct flow_iter *it = &t->task_init->flow_iter;
143 PROX_PANIC(t->task_init->flow_iter.beg == NULL,
144 "Load distributor can't find flows owned by destination worker %d\n", i);
146 for (it->beg(it, t); !it->is_end(it, t); it->next(it, t)) {
147 uint32_t gre_id = it->get_gre_id(it, t);
148 if (gre_id > max_gre_id)
153 PROX_PANIC(max_gre_id == 0, "Failed to get maximum GRE ID from workers");
155 ret = prox_zmalloc(1 + max_gre_id, socket_id);
156 PROX_PANIC(ret == NULL, "Failed to allocate worker_lut\n");
158 for (int i = 0; i < n_workers; ++i) {
159 struct core_task ct = targ->core_task_set[0].core_task[i];
160 struct task_args *t = core_targ_get(ct.core, ct.task);
162 PROX_PANIC(t->task_init->flow_iter.beg == NULL,
163 "Load distributor can't find flows owned by destination worker %d\n", i);
165 struct flow_iter *it = &t->task_init->flow_iter;
167 for (it->beg(it, t); !it->is_end(it, t); it->next(it, t)) {
168 uint32_t gre_id = it->get_gre_id(it, t);
177 static void init_task_lb_net(struct task_base *tbase, struct task_args *targ)
179 struct task_lb_net *task = (struct task_lb_net *)tbase;
181 task->qinq_tag = targ->qinq_tag;
182 task->runtime_flags = targ->runtime_flags;
183 task->worker_byte_offset_ipv6 = 23;
184 task->worker_byte_offset_ipv4 = 15;
185 task->nb_worker_threads = targ->nb_worker_threads;
186 /* The optimal configuration is when the number of worker threads
187 is a power of 2. In that case, a bit_mask can be used. Setting
188 the bitmask to 0xff disables the "optimal" usage of bitmasks
189 and the actual number of worker threads will be used instead. */
190 task->bit_mask = rte_is_power_of_2(targ->nb_worker_threads) ? targ->nb_worker_threads - 1 : 0xff;
193 static void init_task_lb_net_lut(struct task_base *tbase, struct task_args *targ)
195 struct task_lb_net_lut *task = (struct task_lb_net_lut *)tbase;
196 const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
198 task->runtime_flags = targ->runtime_flags;
199 task->nb_worker_threads = targ->nb_worker_threads;
200 for (uint32_t i = 0; i < 64; ++i) {
201 task->fake_packets[i] = (struct rte_mbuf*)((uint8_t*)&task->keys[i] - sizeof (struct rte_mbuf));
204 task->worker_hash_table = setup_gre_to_wt_lookup(targ, task->nb_worker_threads, socket_id);
207 static void init_task_lb_net_indexed_table(struct task_base *tbase, struct task_args *targ)
209 struct task_lb_net_lut *task = (struct task_lb_net_lut *)tbase;
210 const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
212 task->runtime_flags = targ->runtime_flags;
213 task->nb_worker_threads = targ->nb_worker_threads;
215 task->worker_lut = setup_wt_indexed_table(targ, task->nb_worker_threads, socket_id);
218 static int handle_lb_net_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
220 struct task_lb_net *task = (struct task_lb_net *)tbase;
221 uint8_t out[MAX_PKT_BURST];
224 prefetch_first(mbufs, n_pkts);
226 for (j = 0; j + PREFETCH_OFFSET < n_pkts; ++j) {
227 #ifdef PROX_PREFETCH_OFFSET
228 PREFETCH0(mbufs[j + PREFETCH_OFFSET]);
229 PREFETCH0(rte_pktmbuf_mtod(mbufs[j + PREFETCH_OFFSET - 1], void *));
231 out[j] = handle_lb_net(task, mbufs[j]);
233 #ifdef PROX_PREFETCH_OFFSET
234 PREFETCH0(rte_pktmbuf_mtod(mbufs[n_pkts - 1], void *));
236 for (; j < n_pkts; ++j) {
237 out[j] = handle_lb_net(task, mbufs[j]);
240 return task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
243 static int handle_lb_net_lut_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
245 struct task_lb_net_lut *task = (struct task_lb_net_lut *)tbase;
246 uint16_t not_dropped = 0;
247 uint8_t out[MAX_PKT_BURST];
248 // process packet, i.e. decide if the packet has to be dropped or not and where the packet has to go
250 prefetch_first(mbufs, n_pkts);
252 uint64_t pkts_mask = RTE_LEN2MASK(n_pkts, uint64_t);
253 uint8_t *wt[MAX_PKT_BURST];
254 uint64_t lookup_hit_mask = 0;
255 for (j = 0; j + PREFETCH_OFFSET < n_pkts; ++j) {
256 #ifdef PROX_PREFETCH_OFFSET
257 PREFETCH0(mbufs[j + PREFETCH_OFFSET]);
258 PREFETCH0(rte_pktmbuf_mtod(mbufs[j + PREFETCH_OFFSET - 1], void *));
260 if (extract_gre_key(task, &task->keys[j], mbufs[j])) {
261 // Packet will be dropped after lookup
262 pkts_mask &= ~(1 << j);
263 out[j] = OUT_DISCARD;
266 #ifdef PROX_PREFETCH_OFFSET
267 PREFETCH0(rte_pktmbuf_mtod(mbufs[n_pkts - 1], void *));
268 for (; j < n_pkts; ++j) {
269 if (extract_gre_key(task, &task->keys[j], mbufs[j])) {
270 pkts_mask &= ~(1 << j);
271 out[j] = OUT_DISCARD;
272 rte_prefetch0(RTE_MBUF_METADATA_UINT8_PTR(mbufs[j], 0));
276 // keys have been extracted for all packets, now do the lookup
277 rte_table_hash_ext_dosig_ops.f_lookup(task->worker_hash_table, task->fake_packets, pkts_mask, &lookup_hit_mask, (void**)wt);
278 /* mbufs now contains the packets that have not been dropped */
279 if (likely(lookup_hit_mask == RTE_LEN2MASK(n_pkts, uint64_t))) {
280 for (j = 0; j < n_pkts; ++j) {
285 for (j = 0; j < n_pkts; ++j) {
286 if (unlikely(!((lookup_hit_mask >> j) & 0x1))) {
287 plog_warn("Packet %d keys %x can not be sent to worker thread => dropped\n", j, task->keys[j]);
288 out[j] = OUT_DISCARD;
295 return task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
298 static int handle_lb_net_indexed_table_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
300 struct task_lb_net_lut *task = (struct task_lb_net_lut *)tbase;
301 uint8_t out[MAX_PKT_BURST];
302 // process packet, i.e. decide if the packet has to be dropped or not and where the packet has to go
305 prefetch_first(mbufs, n_pkts);
307 uint64_t pkts_mask = RTE_LEN2MASK(n_pkts, uint64_t);
308 for (j = 0; j + PREFETCH_OFFSET < n_pkts; ++j) {
309 #ifdef PROX_PREFETCH_OFFSET
310 PREFETCH0(mbufs[j + PREFETCH_OFFSET]);
311 PREFETCH0(rte_pktmbuf_mtod(mbufs[j + PREFETCH_OFFSET - 1], void *));
313 if (extract_gre_key(task, &gre_id, mbufs[j])) {
314 // Packet will be dropped after lookup
315 pkts_mask &= ~(1 << j);
316 out[j] = OUT_DISCARD;
318 out[j] = task->worker_lut[rte_bswap32(gre_id)];
321 #ifdef PROX_PREFETCH_OFFSET
322 PREFETCH0(rte_pktmbuf_mtod(mbufs[n_pkts - 1], void *));
323 for (; j < n_pkts; ++j) {
324 if (extract_gre_key(task, &gre_id, mbufs[j])) {
325 pkts_mask &= ~(1 << j);
326 out[j] = OUT_DISCARD;
328 out[j] = task->worker_lut[rte_bswap32(gre_id)];
332 return task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
335 static inline uint8_t worker_from_mask(struct task_lb_net *task, uint32_t val)
337 if (task->bit_mask != 0xff) {
338 return val & task->bit_mask;
341 return val % task->nb_worker_threads;
345 static inline int extract_gre_key(struct task_lb_net_lut *task, uint32_t *key, struct rte_mbuf *mbuf)
347 // For all packets, one by one, remove MPLS tag if any and fills in keys used by "fake" packets
348 struct ether_hdr *peth = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
349 // Check for MPLS TAG
351 if (peth->ether_type == ETYPE_MPLSU) {
352 struct mpls_hdr *mpls = (struct mpls_hdr *)(peth + 1);
353 uint32_t mpls_len = 0;
354 while (!(mpls->bytes & 0x00010000)) {
356 mpls_len += sizeof(struct mpls_hdr);
358 mpls_len += sizeof(struct mpls_hdr);
359 ip = (struct ipv4_hdr *)(mpls + 1);
360 switch (ip->version_ihl >> 4) {
362 // Remove MPLS Tag if requested
363 if (task->runtime_flags & TASK_MPLS_TAGGING) {
364 peth = (struct ether_hdr *)rte_pktmbuf_adj(mbuf, mpls_len);
365 peth->ether_type = ETYPE_IPv4;
369 plog_warn("IPv6 not supported in this mode\n");
372 plog_warn("Unexpected IP version %d\n", ip->version_ihl >> 4);
377 ip = (struct ipv4_hdr *)(peth + 1);
379 // Entry point for the packet => check for packet validity
380 // => do not use extract_key_core(mbufs[j], &task->keys[j]);
382 if (likely(ip->next_proto_id == IPPROTO_GRE)) {
383 struct gre_hdr *pgre = (struct gre_hdr *)(ip + 1);
384 if (likely(pgre->bits & GRE_KEY_PRESENT)) {
386 if (pgre->bits & (GRE_CRC_PRESENT | GRE_ROUTING_PRESENT)) {
387 // gre_id = *((uint32_t *)((uint8_t *)pgre + 8));
388 *key = *(uint32_t *)((uint8_t *)pgre + 8);
391 // gre_id = *((uint32_t *)((uint8_t *)pgre + 4));
392 *key = *(uint32_t *)((uint8_t *)pgre + 4);
396 plog_warn("Key not present\n");
401 plog_warn("Invalid protocol: GRE xas expected, got 0x%x\n", ip->next_proto_id);
407 static inline uint8_t lb_ip4(struct task_lb_net *task, struct ipv4_hdr *ip)
409 if (unlikely(ip->version_ihl >> 4 != 4)) {
410 plog_warn("Expected to receive IPv4 packet but IP version was %d\n",
411 ip->version_ihl >> 4);
415 if (ip->next_proto_id == IPPROTO_GRE) {
416 struct gre_hdr *pgre = (struct gre_hdr *)(ip + 1);
418 if (pgre->bits & GRE_KEY_PRESENT) {
420 if (pgre->bits & (GRE_CRC_PRESENT | GRE_ROUTING_PRESENT)) {
421 gre_id = *((uint32_t *)((uint8_t *)pgre + 8));
424 gre_id = *((uint32_t *)((uint8_t *)pgre + 4));
427 gre_id = rte_be_to_cpu_32(gre_id) & 0xFFFFFFF;
428 uint8_t worker = worker_from_mask(task, gre_id);
429 plogx_dbg("gre_id = %u worker = %u\n", gre_id, worker);
430 return worker + task->nb_worker_threads * IPV4;
433 plog_warn("Key not present\n");
437 else if (ip->next_proto_id == IPPROTO_UDP) {
438 uint8_t worker = worker_from_mask(task, rte_bswap32(ip->dst_addr));
439 return worker + task->nb_worker_threads * IPV4;
444 static inline uint8_t lb_ip6(struct task_lb_net *task, struct ipv6_hdr *ip)
446 if (unlikely((*(uint8_t*)ip) >> 4 != 6)) {
447 plog_warn("Expected to receive IPv6 packet but IP version was %d\n",
452 uint8_t worker = worker_from_mask(task, *((uint8_t *)ip + task->worker_byte_offset_ipv6));
453 return worker + task->nb_worker_threads * IPV6;
456 static inline uint8_t lb_mpls(struct task_lb_net *task, struct ether_hdr *peth, struct rte_mbuf *mbuf)
458 struct mpls_hdr *mpls = (struct mpls_hdr *)(peth + 1);
459 uint32_t mpls_len = 0;
460 while (!(mpls->bytes & 0x00010000)) {
462 mpls_len += sizeof(struct mpls_hdr);
464 mpls_len += sizeof(struct mpls_hdr);
465 struct ipv4_hdr *ip = (struct ipv4_hdr *)(mpls + 1);
467 switch (ip->version_ihl >> 4) {
469 if (task->runtime_flags & TASK_MPLS_TAGGING) {
470 peth = (struct ether_hdr *)rte_pktmbuf_adj(mbuf, mpls_len);
471 peth->ether_type = ETYPE_IPv4;
473 return lb_ip4(task, ip);
475 if (task->runtime_flags & TASK_MPLS_TAGGING) {
476 peth = (struct ether_hdr *)rte_pktmbuf_adj(mbuf, mpls_len);
477 peth->ether_type = ETYPE_IPv6;
479 return lb_ip6(task, (struct ipv6_hdr *)ip);
481 plogd_warn(mbuf, "Failed Decoding MPLS Packet - neither IPv4 neither IPv6: version %u for packet : \n", ip->version_ihl);
486 static inline uint8_t lb_qinq(struct task_lb_net *task, struct qinq_hdr *qinq)
488 if (qinq->cvlan.eth_proto != ETYPE_VLAN) {
489 plog_warn("Unexpected proto in QinQ = %#04x\n", qinq->cvlan.eth_proto);
492 uint32_t qinq_tags = rte_bswap16(qinq->cvlan.vlan_tci & 0xFF0F);
493 return worker_from_mask(task, qinq_tags);
496 static inline uint8_t handle_lb_net(struct task_lb_net *task, struct rte_mbuf *mbuf)
498 struct ether_hdr *peth = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
499 const uint16_t len = rte_pktmbuf_pkt_len(mbuf);
501 plogd_warn(mbuf, "Unexpected frame len = %d for packet : \n", len);
505 switch (peth->ether_type) {
507 return lb_mpls(task, peth, mbuf);
509 return lb_qinq(task, (struct qinq_hdr *)peth);
511 return lb_ip4(task, (struct ipv4_hdr *)(peth + 1));
513 return lb_ip6(task, (struct ipv6_hdr *)(peth + 1));
517 if (peth->ether_type == task->qinq_tag)
518 return lb_qinq(task, (struct qinq_hdr *)peth);
519 plogd_warn(mbuf, "Unexpected frame Ether type = %#06x for packet : \n", peth->ether_type);
526 static struct task_init task_init_lb_net = {
527 .mode_str = "lbnetwork",
528 .init = init_task_lb_net,
529 .handle = handle_lb_net_bulk,
530 .size = sizeof(struct task_lb_net),
531 .flag_features = TASK_FEATURE_GRE_ID
534 static struct task_init task_init_lb_net_lut_qinq_rss = {
535 .mode_str = "lbnetwork",
536 .sub_mode_str = "lut_qinq_rss",
537 .init = init_task_lb_net_lut,
538 .handle = handle_lb_net_lut_bulk,
539 .size = sizeof(struct task_lb_net_lut),
540 .flag_features = TASK_FEATURE_LUT_QINQ_RSS
543 static struct task_init task_init_lb_net_lut_qinq_hash = {
544 .mode_str = "lbnetwork",
545 .sub_mode_str = "lut_qinq_hash",
546 .init = init_task_lb_net_lut,
547 .handle = handle_lb_net_lut_bulk,
548 .size = sizeof(struct task_lb_net_lut),
549 .flag_features = TASK_FEATURE_LUT_QINQ_HASH
552 static struct task_init task_init_lb_net_indexed_table_rss = {
553 .mode_str = "lbnetwork",
554 .sub_mode_str = "indexed_table_rss",
555 .init = init_task_lb_net_indexed_table,
556 .handle = handle_lb_net_indexed_table_bulk,
557 .size = sizeof(struct task_lb_net_lut),
558 .flag_features = TASK_FEATURE_LUT_QINQ_RSS
561 static struct task_init task_init_lb_net_indexed_table_hash = {
562 .mode_str = "lbnetwork",
563 .sub_mode_str = "indexed_table_hash",
564 .init = init_task_lb_net_indexed_table,
565 .handle = handle_lb_net_indexed_table_bulk,
566 .size = sizeof(struct task_lb_net_lut),
567 .flag_features = TASK_FEATURE_LUT_QINQ_HASH
570 __attribute__((constructor)) static void reg_task_lb_net(void)
572 reg_task(&task_init_lb_net);
573 reg_task(&task_init_lb_net_lut_qinq_rss);
574 reg_task(&task_init_lb_net_lut_qinq_hash);
575 reg_task(&task_init_lb_net_indexed_table_rss);
576 reg_task(&task_init_lb_net_indexed_table_hash);