2 // Copyright (c) 2010-2017 Intel Corporation
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
21 #include <rte_table_hash.h>
22 #include <rte_byteorder.h>
23 #include <rte_version.h>
25 #include "prox_malloc.h"
26 #include "handle_lb_net.h"
27 #include "task_base.h"
37 #include "hash_utils.h"
39 #include "flow_iter.h"
40 #include "prox_compat.h"
42 #if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0)
43 #define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE
47 struct task_base base;
50 uint8_t nb_worker_threads;
51 uint8_t worker_byte_offset_ipv4;
52 uint8_t worker_byte_offset_ipv6;
53 uint8_t runtime_flags;
56 struct task_lb_net_lut {
57 struct task_base base;
58 uint8_t nb_worker_threads;
59 uint8_t runtime_flags;
60 struct rte_table_hash *worker_hash_table;
63 struct rte_mbuf *fake_packets[64];
66 static inline uint8_t handle_lb_net(struct task_lb_net *task, struct rte_mbuf *mbuf);
67 static inline int extract_gre_key(struct task_lb_net_lut *task, uint32_t *key, struct rte_mbuf *mbuf);
69 static struct rte_table_hash *setup_gre_to_wt_lookup(struct task_args *targ, uint8_t n_workers, int socket_id)
74 struct rte_table_hash *ret;
77 for (int i = 0; i < n_workers; ++i) {
78 struct core_task ct = targ->core_task_set[0].core_task[i];
79 struct task_args *t = core_targ_get(ct.core, ct.task);
81 struct flow_iter *it = &t->task_init->flow_iter;
83 PROX_PANIC(t->task_init->flow_iter.beg == NULL,
84 "Load distributor can't find flows owned by destination worker %d\n", i);
86 for (it->beg(it, t); !it->is_end(it, t); it->next(it, t)) {
91 static char hash_name[30];
92 sprintf(hash_name, "lb_hash_table_%03d", targ->lconf->id);
94 // The key offset in the real packets might depend of the packet type; hence we need to extract the
95 // keys and copy them.
96 // The packets will be parsed runtime and keys will be created and stored in the metadata of fake mbufs.
97 // Then hash functions will be used on the fake mbufs.
98 // Keys are stored in (metadata of) fake mbufs to reduce the memory/cache usage: in this way we use only
99 // 64 cache lines for all keys (we always use the same fake mbufs). If using metadata of real packets/mbufs,
100 // we would use as many cache lines as there are mbufs, which might be very high in if QoS is supported for instance.
102 struct prox_rte_table_params table_hash_params = {
107 .f_hash = (rte_table_hash_op_hash)hash_crc32,
109 .key_offset = HASH_METADATA_OFFSET(0),
113 ret = prox_rte_table_create(&table_hash_params, socket_id, sizeof(uint8_t));
115 for (int i = 0; i < n_workers; ++i) {
116 struct core_task ct = targ->core_task_set[0].core_task[i];
117 struct task_args *t = core_targ_get(ct.core, ct.task);
119 PROX_PANIC(t->task_init->flow_iter.beg == NULL,
120 "Load distributor can't find flows owned by destination worker %d\n", i);
122 struct flow_iter *it = &t->task_init->flow_iter;
124 for (it->beg(it, t); !it->is_end(it, t); it->next(it, t)) {
125 uint32_t gre_id = it->get_gre_id(it, t);
128 r = prox_rte_table_add(ret, &gre_id, &dst, &key_found, &entry_in_hash);
130 plog_err("Failed to add gre_id = %x, dest worker = %u\n", gre_id, i);
133 plog_dbg("Core %u added: gre_id %x, dest woker = %u\n", targ->lconf->id, gre_id, i);
140 static uint8_t *setup_wt_indexed_table(struct task_args *targ, uint8_t n_workers, int socket_id)
142 uint32_t gre_id, rss;
143 uint32_t max_gre_id = 0;
149 for (int i = 0; i < n_workers; ++i) {
150 struct core_task ct = targ->core_task_set[0].core_task[i];
151 struct task_args *t = core_targ_get(ct.core, ct.task);
153 struct flow_iter *it = &t->task_init->flow_iter;
155 PROX_PANIC(t->task_init->flow_iter.beg == NULL,
156 "Load distributor can't find flows owned by destination worker %d\n", i);
158 for (it->beg(it, t); !it->is_end(it, t); it->next(it, t)) {
159 uint32_t gre_id = it->get_gre_id(it, t);
160 if (gre_id > max_gre_id)
165 PROX_PANIC(max_gre_id == 0, "Failed to get maximum GRE ID from workers");
167 ret = prox_zmalloc(1 + max_gre_id, socket_id);
168 PROX_PANIC(ret == NULL, "Failed to allocate worker_lut\n");
170 for (int i = 0; i < n_workers; ++i) {
171 struct core_task ct = targ->core_task_set[0].core_task[i];
172 struct task_args *t = core_targ_get(ct.core, ct.task);
174 PROX_PANIC(t->task_init->flow_iter.beg == NULL,
175 "Load distributor can't find flows owned by destination worker %d\n", i);
177 struct flow_iter *it = &t->task_init->flow_iter;
179 for (it->beg(it, t); !it->is_end(it, t); it->next(it, t)) {
180 uint32_t gre_id = it->get_gre_id(it, t);
189 static void init_task_lb_net(struct task_base *tbase, struct task_args *targ)
191 struct task_lb_net *task = (struct task_lb_net *)tbase;
193 task->qinq_tag = targ->qinq_tag;
194 task->runtime_flags = targ->runtime_flags;
195 task->worker_byte_offset_ipv6 = 23;
196 task->worker_byte_offset_ipv4 = 15;
197 task->nb_worker_threads = targ->nb_worker_threads;
198 /* The optimal configuration is when the number of worker threads
199 is a power of 2. In that case, a bit_mask can be used. Setting
200 the bitmask to 0xff disables the "optimal" usage of bitmasks
201 and the actual number of worker threads will be used instead. */
202 task->bit_mask = rte_is_power_of_2(targ->nb_worker_threads) ? targ->nb_worker_threads - 1 : 0xff;
205 static void init_task_lb_net_lut(struct task_base *tbase, struct task_args *targ)
207 struct task_lb_net_lut *task = (struct task_lb_net_lut *)tbase;
208 const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
210 task->runtime_flags = targ->runtime_flags;
211 task->nb_worker_threads = targ->nb_worker_threads;
212 for (uint32_t i = 0; i < 64; ++i) {
213 task->fake_packets[i] = (struct rte_mbuf*)((uint8_t*)&task->keys[i] - sizeof (struct rte_mbuf));
216 task->worker_hash_table = setup_gre_to_wt_lookup(targ, task->nb_worker_threads, socket_id);
219 static void init_task_lb_net_indexed_table(struct task_base *tbase, struct task_args *targ)
221 struct task_lb_net_lut *task = (struct task_lb_net_lut *)tbase;
222 const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
224 task->runtime_flags = targ->runtime_flags;
225 task->nb_worker_threads = targ->nb_worker_threads;
227 task->worker_lut = setup_wt_indexed_table(targ, task->nb_worker_threads, socket_id);
230 static int handle_lb_net_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
232 struct task_lb_net *task = (struct task_lb_net *)tbase;
233 uint8_t out[MAX_PKT_BURST];
236 prefetch_first(mbufs, n_pkts);
238 for (j = 0; j + PREFETCH_OFFSET < n_pkts; ++j) {
239 #ifdef PROX_PREFETCH_OFFSET
240 PREFETCH0(mbufs[j + PREFETCH_OFFSET]);
241 PREFETCH0(rte_pktmbuf_mtod(mbufs[j + PREFETCH_OFFSET - 1], void *));
243 out[j] = handle_lb_net(task, mbufs[j]);
245 #ifdef PROX_PREFETCH_OFFSET
246 PREFETCH0(rte_pktmbuf_mtod(mbufs[n_pkts - 1], void *));
248 for (; j < n_pkts; ++j) {
249 out[j] = handle_lb_net(task, mbufs[j]);
252 return task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
255 static int handle_lb_net_lut_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
257 struct task_lb_net_lut *task = (struct task_lb_net_lut *)tbase;
258 uint16_t not_dropped = 0;
259 uint8_t out[MAX_PKT_BURST];
260 // process packet, i.e. decide if the packet has to be dropped or not and where the packet has to go
262 prefetch_first(mbufs, n_pkts);
264 uint64_t pkts_mask = RTE_LEN2MASK(n_pkts, uint64_t);
265 uint8_t *wt[MAX_PKT_BURST];
266 uint64_t lookup_hit_mask = 0;
267 for (j = 0; j + PREFETCH_OFFSET < n_pkts; ++j) {
268 #ifdef PROX_PREFETCH_OFFSET
269 PREFETCH0(mbufs[j + PREFETCH_OFFSET]);
270 PREFETCH0(rte_pktmbuf_mtod(mbufs[j + PREFETCH_OFFSET - 1], void *));
272 if (extract_gre_key(task, &task->keys[j], mbufs[j])) {
273 // Packet will be dropped after lookup
274 pkts_mask &= ~(1 << j);
275 out[j] = OUT_DISCARD;
278 #ifdef PROX_PREFETCH_OFFSET
279 PREFETCH0(rte_pktmbuf_mtod(mbufs[n_pkts - 1], void *));
280 for (; j < n_pkts; ++j) {
281 if (extract_gre_key(task, &task->keys[j], mbufs[j])) {
282 pkts_mask &= ~(1 << j);
283 out[j] = OUT_DISCARD;
284 rte_prefetch0(RTE_MBUF_METADATA_UINT8_PTR(mbufs[j], 0));
288 // keys have been extracted for all packets, now do the lookup
289 prox_rte_table_lookup(task->worker_hash_table, task->fake_packets, pkts_mask, &lookup_hit_mask, (void**)wt);
290 /* mbufs now contains the packets that have not been dropped */
291 if (likely(lookup_hit_mask == RTE_LEN2MASK(n_pkts, uint64_t))) {
292 for (j = 0; j < n_pkts; ++j) {
297 for (j = 0; j < n_pkts; ++j) {
298 if (unlikely(!((lookup_hit_mask >> j) & 0x1))) {
299 plog_warn("Packet %d keys %x can not be sent to worker thread => dropped\n", j, task->keys[j]);
300 out[j] = OUT_DISCARD;
307 return task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
310 static int handle_lb_net_indexed_table_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
312 struct task_lb_net_lut *task = (struct task_lb_net_lut *)tbase;
313 uint8_t out[MAX_PKT_BURST];
314 // process packet, i.e. decide if the packet has to be dropped or not and where the packet has to go
317 prefetch_first(mbufs, n_pkts);
319 uint64_t pkts_mask = RTE_LEN2MASK(n_pkts, uint64_t);
320 for (j = 0; j + PREFETCH_OFFSET < n_pkts; ++j) {
321 #ifdef PROX_PREFETCH_OFFSET
322 PREFETCH0(mbufs[j + PREFETCH_OFFSET]);
323 PREFETCH0(rte_pktmbuf_mtod(mbufs[j + PREFETCH_OFFSET - 1], void *));
325 if (extract_gre_key(task, &gre_id, mbufs[j])) {
326 // Packet will be dropped after lookup
327 pkts_mask &= ~(1 << j);
328 out[j] = OUT_DISCARD;
330 out[j] = task->worker_lut[rte_bswap32(gre_id)];
333 #ifdef PROX_PREFETCH_OFFSET
334 PREFETCH0(rte_pktmbuf_mtod(mbufs[n_pkts - 1], void *));
335 for (; j < n_pkts; ++j) {
336 if (extract_gre_key(task, &gre_id, mbufs[j])) {
337 pkts_mask &= ~(1 << j);
338 out[j] = OUT_DISCARD;
340 out[j] = task->worker_lut[rte_bswap32(gre_id)];
344 return task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
347 static inline uint8_t worker_from_mask(struct task_lb_net *task, uint32_t val)
349 if (task->bit_mask != 0xff) {
350 return val & task->bit_mask;
353 return val % task->nb_worker_threads;
357 static inline int extract_gre_key(struct task_lb_net_lut *task, uint32_t *key, struct rte_mbuf *mbuf)
359 // For all packets, one by one, remove MPLS tag if any and fills in keys used by "fake" packets
360 struct ether_hdr *peth = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
361 // Check for MPLS TAG
363 if (peth->ether_type == ETYPE_MPLSU) {
364 struct mpls_hdr *mpls = (struct mpls_hdr *)(peth + 1);
365 uint32_t mpls_len = 0;
366 while (!(mpls->bytes & 0x00010000)) {
368 mpls_len += sizeof(struct mpls_hdr);
370 mpls_len += sizeof(struct mpls_hdr);
371 ip = (struct ipv4_hdr *)(mpls + 1);
372 switch (ip->version_ihl >> 4) {
374 // Remove MPLS Tag if requested
375 if (task->runtime_flags & TASK_MPLS_TAGGING) {
376 peth = (struct ether_hdr *)rte_pktmbuf_adj(mbuf, mpls_len);
377 peth->ether_type = ETYPE_IPv4;
381 plog_warn("IPv6 not supported in this mode\n");
384 plog_warn("Unexpected IP version %d\n", ip->version_ihl >> 4);
389 ip = (struct ipv4_hdr *)(peth + 1);
391 // Entry point for the packet => check for packet validity
392 // => do not use extract_key_core(mbufs[j], &task->keys[j]);
394 if (likely(ip->next_proto_id == IPPROTO_GRE)) {
395 struct gre_hdr *pgre = (struct gre_hdr *)(ip + 1);
396 if (likely(pgre->bits & GRE_KEY_PRESENT)) {
398 if (pgre->bits & (GRE_CRC_PRESENT | GRE_ROUTING_PRESENT)) {
399 // gre_id = *((uint32_t *)((uint8_t *)pgre + 8));
400 *key = *(uint32_t *)((uint8_t *)pgre + 8);
403 // gre_id = *((uint32_t *)((uint8_t *)pgre + 4));
404 *key = *(uint32_t *)((uint8_t *)pgre + 4);
408 plog_warn("Key not present\n");
413 plog_warn("Invalid protocol: GRE was expected, got 0x%x\n", ip->next_proto_id);
419 static inline uint8_t lb_ip4(struct task_lb_net *task, struct ipv4_hdr *ip)
421 if (unlikely(ip->version_ihl >> 4 != 4)) {
422 plog_warn("Expected to receive IPv4 packet but IP version was %d\n",
423 ip->version_ihl >> 4);
427 if (ip->next_proto_id == IPPROTO_GRE) {
428 struct gre_hdr *pgre = (struct gre_hdr *)(ip + 1);
430 if (pgre->bits & GRE_KEY_PRESENT) {
432 if (pgre->bits & (GRE_CRC_PRESENT | GRE_ROUTING_PRESENT)) {
433 gre_id = *((uint32_t *)((uint8_t *)pgre + 8));
436 gre_id = *((uint32_t *)((uint8_t *)pgre + 4));
439 gre_id = rte_be_to_cpu_32(gre_id) & 0xFFFFFFF;
440 uint8_t worker = worker_from_mask(task, gre_id);
441 plogx_dbg("gre_id = %u worker = %u\n", gre_id, worker);
442 return worker + task->nb_worker_threads * IPV4;
445 plog_warn("Key not present\n");
449 else if (ip->next_proto_id == IPPROTO_UDP) {
450 uint8_t worker = worker_from_mask(task, rte_bswap32(ip->dst_addr));
451 return worker + task->nb_worker_threads * IPV4;
456 static inline uint8_t lb_ip6(struct task_lb_net *task, struct ipv6_hdr *ip)
458 if (unlikely((*(uint8_t*)ip) >> 4 != 6)) {
459 plog_warn("Expected to receive IPv6 packet but IP version was %d\n",
464 uint8_t worker = worker_from_mask(task, *((uint8_t *)ip + task->worker_byte_offset_ipv6));
465 return worker + task->nb_worker_threads * IPV6;
468 static inline uint8_t lb_mpls(struct task_lb_net *task, struct ether_hdr *peth, struct rte_mbuf *mbuf)
470 struct mpls_hdr *mpls = (struct mpls_hdr *)(peth + 1);
471 uint32_t mpls_len = 0;
472 while (!(mpls->bytes & 0x00010000)) {
474 mpls_len += sizeof(struct mpls_hdr);
476 mpls_len += sizeof(struct mpls_hdr);
477 struct ipv4_hdr *ip = (struct ipv4_hdr *)(mpls + 1);
479 switch (ip->version_ihl >> 4) {
481 if (task->runtime_flags & TASK_MPLS_TAGGING) {
482 peth = (struct ether_hdr *)rte_pktmbuf_adj(mbuf, mpls_len);
483 peth->ether_type = ETYPE_IPv4;
485 return lb_ip4(task, ip);
487 if (task->runtime_flags & TASK_MPLS_TAGGING) {
488 peth = (struct ether_hdr *)rte_pktmbuf_adj(mbuf, mpls_len);
489 peth->ether_type = ETYPE_IPv6;
491 return lb_ip6(task, (struct ipv6_hdr *)ip);
493 plogd_warn(mbuf, "Failed Decoding MPLS Packet - neither IPv4 neither IPv6: version %u for packet : \n", ip->version_ihl);
498 static inline uint8_t lb_qinq(struct task_lb_net *task, struct qinq_hdr *qinq)
500 if (qinq->cvlan.eth_proto != ETYPE_VLAN) {
501 plog_warn("Unexpected proto in QinQ = %#04x\n", qinq->cvlan.eth_proto);
504 uint32_t qinq_tags = rte_bswap16(qinq->cvlan.vlan_tci & 0xFF0F);
505 return worker_from_mask(task, qinq_tags);
508 static inline uint8_t handle_lb_net(struct task_lb_net *task, struct rte_mbuf *mbuf)
510 struct ether_hdr *peth = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
511 const uint16_t len = rte_pktmbuf_pkt_len(mbuf);
513 plogd_warn(mbuf, "Unexpected frame len = %d for packet : \n", len);
517 switch (peth->ether_type) {
519 return lb_mpls(task, peth, mbuf);
521 return lb_qinq(task, (struct qinq_hdr *)peth);
523 return lb_ip4(task, (struct ipv4_hdr *)(peth + 1));
525 return lb_ip6(task, (struct ipv6_hdr *)(peth + 1));
529 if (peth->ether_type == task->qinq_tag)
530 return lb_qinq(task, (struct qinq_hdr *)peth);
531 plogd_warn(mbuf, "Unexpected frame Ether type = %#06x for packet : \n", peth->ether_type);
538 static struct task_init task_init_lb_net = {
539 .mode_str = "lbnetwork",
540 .init = init_task_lb_net,
541 .handle = handle_lb_net_bulk,
542 .size = sizeof(struct task_lb_net),
543 .flag_features = TASK_FEATURE_GRE_ID
546 static struct task_init task_init_lb_net_lut_qinq_rss = {
547 .mode_str = "lbnetwork",
548 .sub_mode_str = "lut_qinq_rss",
549 .init = init_task_lb_net_lut,
550 .handle = handle_lb_net_lut_bulk,
551 .size = sizeof(struct task_lb_net_lut),
552 .flag_features = TASK_FEATURE_LUT_QINQ_RSS
555 static struct task_init task_init_lb_net_lut_qinq_hash = {
556 .mode_str = "lbnetwork",
557 .sub_mode_str = "lut_qinq_hash",
558 .init = init_task_lb_net_lut,
559 .handle = handle_lb_net_lut_bulk,
560 .size = sizeof(struct task_lb_net_lut),
561 .flag_features = TASK_FEATURE_LUT_QINQ_HASH
564 static struct task_init task_init_lb_net_indexed_table_rss = {
565 .mode_str = "lbnetwork",
566 .sub_mode_str = "indexed_table_rss",
567 .init = init_task_lb_net_indexed_table,
568 .handle = handle_lb_net_indexed_table_bulk,
569 .size = sizeof(struct task_lb_net_lut),
570 .flag_features = TASK_FEATURE_LUT_QINQ_RSS
573 static struct task_init task_init_lb_net_indexed_table_hash = {
574 .mode_str = "lbnetwork",
575 .sub_mode_str = "indexed_table_hash",
576 .init = init_task_lb_net_indexed_table,
577 .handle = handle_lb_net_indexed_table_bulk,
578 .size = sizeof(struct task_lb_net_lut),
579 .flag_features = TASK_FEATURE_LUT_QINQ_HASH
582 __attribute__((constructor)) static void reg_task_lb_net(void)
584 reg_task(&task_init_lb_net);
585 reg_task(&task_init_lb_net_lut_qinq_rss);
586 reg_task(&task_init_lb_net_lut_qinq_hash);
587 reg_task(&task_init_lb_net_indexed_table_rss);
588 reg_task(&task_init_lb_net_indexed_table_hash);