Prepare for DPDK 19.08 support
[samplevnf.git] / VNFs / DPPD-PROX / handle_qinq_encap4.c
1 /*
2 // Copyright (c) 2010-2017 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <rte_table_hash.h>
18 #include <rte_hash_crc.h>
19 #include <rte_cycles.h>
20
21 #include "mbuf_utils.h"
22 #include "prox_malloc.h"
23 #include "prox_lua.h"
24 #include "prox_lua_types.h"
25 #include "handle_qinq_encap4.h"
26 #include "handle_qinq_decap4.h"
27 #include "prox_args.h"
28 #include "defines.h"
29 #include "tx_pkt.h"
30 #include "prefetch.h"
31 #include "pkt_prototypes.h"
32 #include "hash_entry_types.h"
33 #include "task_init.h"
34 #include "bng_pkts.h"
35 #include "prox_cksum.h"
36 #include "hash_utils.h"
37 #include "quit.h"
38 #include "prox_port_cfg.h"
39 #include "handle_lb_net.h"
40 #include "prox_cfg.h"
41 #include "cfgfile.h"
42 #include "toeplitz.h"
43 #include "prox_shared.h"
44 #include "prox_compat.h"
45
46 static struct cpe_table_data *read_cpe_table_config(const char *name, uint8_t socket)
47 {
48         struct lua_State *L = prox_lua();
49         struct cpe_table_data *ret = NULL;
50
51         lua_getglobal(L, name);
52         PROX_PANIC(lua_isnil(L, -1), "Coudn't find cpe_table data\n");
53
54         return ret;
55 }
56
57 struct qinq_gre_map *get_qinq_gre_map(struct task_args *targ)
58 {
59         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
60         struct qinq_gre_map *ret = prox_sh_find_socket(socket_id, "qinq_gre_map");
61
62         if (!ret) {
63                 PROX_PANIC(!strcmp(targ->user_table, ""), "No user table defined\n");
64                 int rv = lua_to_qinq_gre_map(prox_lua(), GLOBAL, targ->user_table, socket_id, &ret);
65                 PROX_PANIC(rv, "Error reading mapping between qinq and gre from qinq_gre_map: \n%s\n",
66                            get_lua_to_errors());
67                 prox_sh_add_socket(socket_id, "qinq_gre_map", ret);
68         }
69         return ret;
70 }
71
72 /* Encapsulate IPv4 packets in QinQ. QinQ tags are derived from gre_id. */
73 int handle_qinq_encap4_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts);
74 static void arp_msg(struct task_base *tbase, void **data, uint16_t n_msgs);
75
76 static void fill_table(struct task_args *targ, struct rte_table_hash *table)
77 {
78         struct cpe_table_data *cpe_table_data;
79         const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
80         int ret = lua_to_cpe_table_data(prox_lua(), GLOBAL, targ->cpe_table_name, socket_id, &cpe_table_data);
81         const uint8_t n_slaves = targ->nb_slave_threads;
82         const uint8_t worker_id = targ->worker_thread_id;
83
84         for (uint32_t i = 0; i < cpe_table_data->n_entries; ++i) {
85                 if (rte_bswap32(cpe_table_data->entries[i].ip) % n_slaves != worker_id) {
86                         continue;
87                 }
88                 struct cpe_table_entry *entry = &cpe_table_data->entries[i];
89
90                 uint32_t port_idx = prox_cfg.cpe_table_ports[entry->port_idx];
91                 PROX_PANIC(targ->mapping[port_idx] == 255, "Error reading cpe table: Mapping for port %d is missing", port_idx);
92
93                 struct cpe_key key = {
94                         .ip = entry->ip,
95                         .gre_id = entry->gre_id,
96                 };
97
98                 struct cpe_data data = {
99                         .qinq_svlan = entry->svlan,
100                         .qinq_cvlan = entry->cvlan,
101                         .user = entry->user,
102                         .mac_port = {
103                                 .mac = entry->eth_addr,
104                                 .out_idx = targ->mapping[port_idx],
105                         },
106                         .tsc = UINT64_MAX,
107                 };
108
109                 int key_found;
110                 void* entry_in_hash;
111                 prox_rte_table_key8_add(table, &key, &data, &key_found, &entry_in_hash);
112         }
113 }
114
115 static void init_task_qinq_encap4(struct task_base *tbase, struct task_args *targ)
116 {
117         struct task_qinq_encap4 *task = (struct task_qinq_encap4 *)(tbase);
118         int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
119
120         task->qinq_tag = targ->qinq_tag;
121         task->cpe_table = targ->cpe_table;
122         task->cpe_timeout = msec_to_tsc(targ->cpe_table_timeout_ms);
123
124         if (!strcmp(targ->task_init->sub_mode_str, "pe")) {
125                 PROX_PANIC(!strcmp(targ->cpe_table_name, ""), "CPE table not configured\n");
126                 fill_table(targ, task->cpe_table);
127         }
128
129 #ifdef ENABLE_EXTRA_USER_STATISTICS
130         task->n_users = targ->n_users;
131         task->stats_per_user = prox_zmalloc(targ->n_users * sizeof(uint32_t), socket_id);
132 #endif
133         if (targ->runtime_flags & TASK_CLASSIFY) {
134                 PROX_PANIC(!strcmp(targ->dscp, ""), "DSCP table not specified\n");
135                 task->dscp = prox_sh_find_socket(socket_id, targ->dscp);
136                 if (!task->dscp) {
137                         int ret = lua_to_dscp(prox_lua(), GLOBAL, targ->dscp, socket_id, &task->dscp);
138                         PROX_PANIC(ret, "Failed to create dscp table from config:\n%s\n",
139                                    get_lua_to_errors());
140                         prox_sh_add_socket(socket_id, targ->dscp, task->dscp);
141                 }
142         }
143
144         task->runtime_flags = targ->runtime_flags;
145
146         for (uint32_t i = 0; i < 64; ++i) {
147                 task->fake_packets[i] = (struct rte_mbuf*)((uint8_t*)&task->keys[i] - sizeof (struct rte_mbuf));
148         }
149
150         targ->lconf->ctrl_timeout = freq_to_tsc(targ->ctrl_freq);
151         targ->lconf->ctrl_func_m[targ->task] = arp_msg;
152
153         struct prox_port_cfg *port = find_reachable_port(targ);
154         if (port) {
155                 task->offload_crc = port->requested_tx_offload & (DEV_TX_OFFLOAD_IPV4_CKSUM | DEV_TX_OFFLOAD_UDP_CKSUM);
156         }
157
158         /* TODO: check if it is not necessary to limit reverse mapping
159            for the elements that have been changing in mapping? */
160
161         for (uint32_t i =0 ; i < sizeof(targ->mapping)/sizeof(targ->mapping[0]); ++i) {
162                 task->src_mac[targ->mapping[i]] = *(uint64_t*)&prox_port_cfg[i].eth_addr;
163         }
164
165         /* task->src_mac[entry->port_idx] = *(uint64_t*)&prox_port_cfg[entry->port_idx].eth_addr; */
166         if (targ->runtime_flags & TASK_CLASSIFY) {
167                 int rc = init_port_sched(&task->sched_port, targ);
168                 PROX_PANIC(rc, "Did not find any QoS task to transmit to => undefined sched_port parameters\n");
169         }
170 }
171
172 static void arp_msg(struct task_base *tbase, void **data, uint16_t n_msgs)
173 {
174         struct task_qinq_encap4 *task = (struct task_qinq_encap4 *)tbase;
175         struct arp_msg **msgs = (struct arp_msg **)data;
176
177         arp_update_from_msg(task->cpe_table, msgs, n_msgs, task->cpe_timeout);
178 }
179
180 static inline void add_key(struct task_args *targ, struct qinq_gre_map *qinq_gre_map, struct rte_table_hash* qinq_gre_table, uint32_t i, uint32_t *count)
181 {
182         struct qinq_gre_data entry = {
183                 .gre_id = qinq_gre_map->entries[i].gre_id,
184                 .user = qinq_gre_map->entries[i].user,
185         };
186
187 #ifdef USE_QINQ
188         struct vlans qinq2 = {
189                 .svlan = {.eth_proto = targ->qinq_tag, .vlan_tci = qinq_gre_map->entries[i].svlan},
190                 .cvlan = {.eth_proto = ETYPE_VLAN,     .vlan_tci = qinq_gre_map->entries[i].cvlan}
191         };
192
193         int key_found = 0;
194         void* entry_in_hash = NULL;
195         prox_rte_table_key8_add(qinq_gre_table, &qinq2, &entry, &key_found, &entry_in_hash);
196
197         plog_dbg("Core %u adding user %u (tag %x svlan %x cvlan %x), rss=%x\n",
198                  targ->lconf->id, qinq_gre_map->entries[i].user, qinq2.svlan.eth_proto,
199                  rte_bswap16(qinq_gre_map->entries[i].svlan),
200                  rte_bswap16(qinq_gre_map->entries[i].cvlan),
201                  qinq_gre_map->entries[i].rss);
202 #else
203         /* lower 3 bytes of IPv4 address contain svlan/cvlan. */
204         uint64_t ip = ((uint32_t)rte_bswap16(qinq_gre_map->entries[i].svlan) << 12) |
205                 rte_bswap16(qinq_gre_map->entries[i].cvlan);
206         int key_found = 0;
207         void* entry_in_hash = NULL;
208         prox_rte_table_key8_add(qinq_gre_table, &ip, &entry, &key_found, &entry_in_hash);
209
210         plog_dbg("Core %u hash table add: key = %016"PRIx64"\n",
211                  targ->lconf->id, ip);
212 #endif
213         (*count)++;
214 }
215
216 void init_qinq_gre_table(struct task_args *targ, struct qinq_gre_map *qinq_gre_map)
217 {
218         struct rte_table_hash* qinq_gre_table;
219         uint8_t table_part = targ->nb_slave_threads;
220         if (!rte_is_power_of_2(table_part)) {
221                 table_part = rte_align32pow2(table_part) >> 1;
222         }
223
224         if (table_part == 0)
225                 table_part = 1;
226
227         uint32_t n_entries = MAX_GRE / table_part;
228         static char hash_name[30];
229         sprintf(hash_name, "qinq_gre_hash_table_%03d", targ->lconf->id);
230
231         struct prox_rte_table_params table_hash_params = {
232                 .name = hash_name,
233                 .key_size = 8,
234                 .n_keys = n_entries,
235                 .n_buckets = n_entries,
236                 .f_hash = (rte_table_hash_op_hash)hash_crc32,
237                 .seed = 0,
238                 .key_offset = HASH_METADATA_OFFSET(0),
239                 .key_mask = NULL
240         };
241
242         qinq_gre_table = prox_rte_table_create(&table_hash_params, rte_lcore_to_socket_id(targ->lconf->id), sizeof(struct qinq_gre_data));
243
244         // LB configuration known from Network Load Balancer
245         // Find LB network Load balancer, i.e. ENCAP friend.
246         for (uint8_t task_id = 0; task_id < targ->lconf->n_tasks_all; ++task_id) {
247                 enum task_mode smode = targ->lconf->targs[task_id].mode;
248                 if (QINQ_ENCAP4 == smode) {
249                         targ->lb_friend_core =  targ->lconf->targs[task_id].lb_friend_core;
250                         targ->lb_friend_task =  targ->lconf->targs[task_id].lb_friend_task;
251                 }
252         }
253         // Packet coming from Load balancer. LB could balance on gre_id LSB, qinq hash or qinq RSS
254         uint32_t flag_features = 0;
255         if (targ->lb_friend_core != 0xFF) {
256                 struct task_args *lb_targ = &lcore_cfg[targ->lb_friend_core].targs[targ->lb_friend_task];
257                 flag_features = lb_targ->task_init->flag_features;
258                 plog_info("\t\tWT %d Updated features to %x from friend %d\n", targ->lconf->id, flag_features, targ->lb_friend_core);
259         } else {
260                 plog_info("\t\tWT %d has no friend\n", targ->lconf->id);
261         }
262         if (targ->nb_slave_threads == 0)  {
263                 // No slave threads, i.e. using RSS
264                 plog_info("feature was %x is now %x\n", flag_features, TASK_FEATURE_LUT_QINQ_RSS);
265                 flag_features = TASK_FEATURE_LUT_QINQ_RSS;
266         }
267         if ((flag_features & (TASK_FEATURE_GRE_ID|TASK_FEATURE_LUT_QINQ_RSS|TASK_FEATURE_LUT_QINQ_HASH)) == 0) {
268                 plog_info("\t\tCould not find flag feature from Load balancer => supposing TASK_FEATURE_GRE_ID\n");
269                 flag_features = TASK_FEATURE_GRE_ID;
270         }
271
272         /* Only store QinQ <-> GRE mapping for packets that are handled by this worker thread */
273         uint32_t count = 0;
274         if (flag_features & TASK_FEATURE_LUT_QINQ_RSS) {
275                 // If there is a load balancer, number of worker thread is indicated by targ->nb_slave_threads and n_rxq = 0
276                 // If there is no load balancers, number of worker thread is indicated by n_rxq and nb_slave_threads = 0
277                 uint8_t nb_worker_threads, worker_thread_id;
278                 if (targ->nb_slave_threads) {
279                         nb_worker_threads = targ->nb_slave_threads;
280                         worker_thread_id = targ->worker_thread_id;
281                 } else if (prox_port_cfg[targ->rx_port_queue[0].port].n_rxq) {
282                         nb_worker_threads = prox_port_cfg[targ->rx_port_queue[0].port].n_rxq;
283                         worker_thread_id = targ->rx_port_queue[0].queue;
284                 } else {
285                         PROX_PANIC(1, "Unexpected: unknown number of worker thread\n");
286                 }
287                 plog_info("\t\tUsing %d worker_threads id %d\n", nb_worker_threads, worker_thread_id);
288                 for (uint32_t i = 0; i < qinq_gre_map->count; ++i) {
289                         if (targ->nb_slave_threads == 0 || rss_to_queue(qinq_gre_map->entries[i].rss, nb_worker_threads) == worker_thread_id) {
290                                 add_key(targ, qinq_gre_map, qinq_gre_table, i, &count);
291                                 //plog_info("Queue %d adding key %16lx, svlan %x cvlan %x, rss=%x\n", targ->rx_queue, *(uint64_t *)q, qinq_to_gre_lookup[i].svlan,  qinq_to_gre_lookup[i].cvlan, qinq_to_gre_lookup[i].rss);
292                         }
293                 }
294                 plog_info("\t\tAdded %d entries to worker thread %d\n", count,  worker_thread_id);
295         } else if (flag_features & TASK_FEATURE_LUT_QINQ_HASH) {
296                 for (uint32_t i = 0; i < qinq_gre_map->count; ++i) {
297                         uint64_t cvlan = rte_bswap16(qinq_gre_map->entries[i].cvlan & 0xFF0F);
298                         uint64_t svlan = rte_bswap16((qinq_gre_map->entries[i].svlan & 0xFF0F));
299                         uint64_t qinq = rte_bswap64((svlan << 32) | cvlan);
300                         uint8_t queue = rte_hash_crc(&qinq, 8, 0) % targ->nb_slave_threads;
301                         if (queue == targ->worker_thread_id) {
302                                 add_key(targ, qinq_gre_map, qinq_gre_table, i, &count);
303                         }
304                 }
305                 plog_info("\t\tAdded %d entries to WT %d\n", count,  targ->worker_thread_id);
306         } else if (flag_features & TASK_FEATURE_GRE_ID) {
307                 for (uint32_t i = 0; i < qinq_gre_map->count; ++i) {
308                         if (qinq_gre_map->entries[i].gre_id % targ->nb_slave_threads == targ->worker_thread_id) {
309                                 add_key(targ, qinq_gre_map, qinq_gre_table, i, &count);
310                         }
311                 }
312         }
313
314         for (uint8_t task_id = 0; task_id < targ->lconf->n_tasks_all; ++task_id) {
315                 enum task_mode smode = targ->lconf->targs[task_id].mode;
316                 if (QINQ_DECAP4 == smode) {
317                         targ->lconf->targs[task_id].qinq_gre_table = qinq_gre_table;
318                 }
319
320         }
321 }
322
323 void init_cpe4_table(struct task_args *targ)
324 {
325         char name[64];
326         sprintf(name, "core_%u_CPEv4Table", targ->lconf->id);
327
328         uint8_t table_part = targ->nb_slave_threads;
329         if (!rte_is_power_of_2(table_part)) {
330                 table_part = rte_align32pow2(table_part) >> 1;
331         }
332
333         if (table_part == 0)
334                 table_part = 1;
335
336         uint32_t n_entries = MAX_GRE / table_part;
337
338         static char hash_name[30];
339         sprintf(hash_name, "cpe4_table_%03d", targ->lconf->id);
340
341         struct prox_rte_table_params table_hash_params = {
342                 .name = hash_name,
343                 .key_size = 8,
344                 .n_keys = n_entries,
345                 .n_buckets = n_entries >> 1,
346                 .f_hash = (rte_table_hash_op_hash)hash_crc32,
347                 .seed = 0,
348                 .key_offset = HASH_METADATA_OFFSET(0),
349                 .key_mask = NULL
350         };
351         size_t entry_size = sizeof(struct cpe_data);
352         if (!rte_is_power_of_2(entry_size)) {
353                 entry_size = rte_align32pow2(entry_size);
354         }
355
356         struct rte_table_hash* phash = prox_rte_table_create(&table_hash_params, rte_lcore_to_socket_id(targ->lconf->id), entry_size);
357         PROX_PANIC(NULL == phash, "Unable to allocate memory for IPv4 hash table on core %u\n", targ->lconf->id);
358
359         /* for locality, copy the pointer to the port structure where it is needed at packet handling time */
360         for (uint8_t task_id = 0; task_id < targ->lconf->n_tasks_all; ++task_id) {
361                 enum task_mode smode = targ->lconf->targs[task_id].mode;
362                 if (QINQ_ENCAP4 == smode || QINQ_DECAP4 == smode) {
363                         targ->lconf->targs[task_id].cpe_table = phash;
364                 }
365         }
366 }
367
368 static void early_init_table(struct task_args* targ)
369 {
370         if (!targ->cpe_table) {
371                 init_cpe4_table(targ);
372         }
373 }
374
375 static inline void restore_cpe(struct cpe_pkt *packet, struct cpe_data *table, __attribute__((unused)) uint16_t qinq_tag, uint64_t *src_mac)
376 {
377 #ifdef USE_QINQ
378         struct qinq_hdr *pqinq = &packet->qinq_hdr;
379         rte_memcpy(pqinq, &qinq_proto, sizeof(struct qinq_hdr));
380         (*(uint64_t *)(&pqinq->d_addr)) = table->mac_port_8bytes;
381         /* set source as well now */
382         *((uint64_t *)(&pqinq->s_addr)) = *((uint64_t *)&src_mac[table->mac_port.out_idx]);
383         pqinq->svlan.vlan_tci = table->qinq_svlan;
384         pqinq->cvlan.vlan_tci = table->qinq_cvlan;
385         pqinq->svlan.eth_proto = qinq_tag;
386         pqinq->cvlan.eth_proto = ETYPE_VLAN;
387         pqinq->ether_type = ETYPE_IPv4;
388 #else
389         (*(uint64_t *)(&packet->ether_hdr.d_addr)) = table->mac_port_8bytes;
390         /* set source as well now */
391         *((uint64_t *)(&packet->ether_hdr.s_addr)) = *((uint64_t *)&src_mac[table->mac_port.out_idx]);
392         packet->ether_hdr.ether_type = ETYPE_IPv4;
393
394         packet->ipv4_hdr.dst_addr = rte_bswap32(10 << 24 | rte_bswap16(table->qinq_svlan) << 12 | rte_bswap16(table->qinq_cvlan));
395 #endif
396 }
397
398 static inline uint8_t handle_qinq_encap4(struct task_qinq_encap4 *task, struct cpe_pkt *cpe_pkt, struct rte_mbuf *mbuf, struct cpe_data *entry);
399
400 /* Same functionality as handle_qinq_encap_v4_bulk but untag MPLS as well. */
401 static int handle_qinq_encap4_untag_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
402 {
403         struct task_qinq_encap4 *task = (struct task_qinq_encap4 *)tbase;
404         uint8_t out[MAX_PKT_BURST];
405         prefetch_pkts(mbufs, n_pkts);
406
407         for (uint16_t j = 0; j < n_pkts; ++j) {
408                 if (likely(mpls_untag(mbufs[j]))) {
409                         struct cpe_pkt* cpe_pkt = (struct cpe_pkt*) rte_pktmbuf_adj(mbufs[j], UPSTREAM_DELTA);
410                         out[j] = handle_qinq_encap4(task, cpe_pkt, mbufs[j], NULL);
411                 }
412                 else {
413                         out[j] = OUT_DISCARD;
414                 }
415         }
416
417         return task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
418 }
419
420 static inline void extract_key_bulk(struct task_qinq_encap4 *task, struct rte_mbuf **mbufs, uint16_t n_pkts)
421 {
422         for (uint16_t j = 0; j < n_pkts; ++j) {
423                 extract_key_core(mbufs[j], &task->keys[j]);
424         }
425 }
426
427 __attribute__((cold)) static void handle_error(struct rte_mbuf *mbuf)
428 {
429         struct core_net_pkt* core_pkt = rte_pktmbuf_mtod(mbuf, struct core_net_pkt *);
430         uint32_t dst_ip = core_pkt->ip_hdr.dst_addr;
431         uint32_t le_gre_id = rte_be_to_cpu_32(core_pkt->gre_hdr.gre_id);
432
433         plogx_dbg("Unknown IP %x/gre_id %x\n", dst_ip, le_gre_id);
434 }
435
436 static int handle_qinq_encap4_bulk_pe(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
437 {
438         struct task_qinq_encap4 *task = (struct task_qinq_encap4 *)tbase;
439         uint64_t pkts_mask = RTE_LEN2MASK(n_pkts, uint64_t);
440         struct cpe_data* entries[64];
441         uint8_t out[MAX_PKT_BURST];
442         uint64_t lookup_hit_mask;
443
444         prefetch_pkts(mbufs, n_pkts);
445
446         for (uint16_t j = 0; j < n_pkts; ++j) {
447                 prox_rte_ipv4_hdr* ip = (prox_rte_ipv4_hdr *)(rte_pktmbuf_mtod(mbufs[j], prox_rte_ether_hdr *) + 1);
448                 task->keys[j] = (uint64_t)ip->dst_addr;
449         }
450         prox_rte_table_key8_lookup(task->cpe_table, task->fake_packets, pkts_mask, &lookup_hit_mask, (void**)entries);
451
452         if (likely(lookup_hit_mask == pkts_mask)) {
453                 for (uint16_t j = 0; j < n_pkts; ++j) {
454                         struct cpe_pkt* cpe_pkt = (struct cpe_pkt*) rte_pktmbuf_prepend(mbufs[j], sizeof(struct qinq_hdr) - sizeof(prox_rte_ether_hdr));
455                         uint16_t padlen = mbuf_calc_padlen(mbufs[j], cpe_pkt, &cpe_pkt->ipv4_hdr);
456
457                         if (padlen) {
458                                 rte_pktmbuf_trim(mbufs[j], padlen);
459                         }
460                         out[j] = handle_qinq_encap4(task, cpe_pkt, mbufs[j], entries[j]);
461                 }
462         }
463         else {
464                 for (uint16_t j = 0; j < n_pkts; ++j) {
465                         if (unlikely(!((lookup_hit_mask >> j) & 0x1))) {
466                                 handle_error(mbufs[j]);
467                                 out[j] = OUT_DISCARD;
468                                 continue;
469                         }
470                         struct cpe_pkt* cpe_pkt = (struct cpe_pkt*) rte_pktmbuf_prepend(mbufs[j], sizeof(struct qinq_hdr) - sizeof(prox_rte_ether_hdr));
471                         uint16_t padlen = mbuf_calc_padlen(mbufs[j], cpe_pkt, &cpe_pkt->ipv4_hdr);
472
473                         if (padlen) {
474                                 rte_pktmbuf_trim(mbufs[j], padlen);
475                         }
476                         out[j] = handle_qinq_encap4(task, cpe_pkt, mbufs[j], entries[j]);
477                 }
478         }
479
480         return task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
481 }
482 int handle_qinq_encap4_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
483 {
484         struct task_qinq_encap4 *task = (struct task_qinq_encap4 *)tbase;
485         uint64_t pkts_mask = RTE_LEN2MASK(n_pkts, uint64_t);
486         struct cpe_data* entries[64];
487         uint8_t out[MAX_PKT_BURST];
488         uint64_t lookup_hit_mask;
489
490         prefetch_pkts(mbufs, n_pkts);
491
492         // From GRE ID and IP address, retrieve QinQ and MAC addresses
493         extract_key_bulk(task, mbufs, n_pkts);
494         prox_rte_table_key8_lookup(task->cpe_table, task->fake_packets, pkts_mask, &lookup_hit_mask, (void**)entries);
495
496         if (likely(lookup_hit_mask == pkts_mask)) {
497                 for (uint16_t j = 0; j < n_pkts; ++j) {
498                         struct cpe_pkt* cpe_pkt = (struct cpe_pkt*) rte_pktmbuf_adj(mbufs[j], UPSTREAM_DELTA);
499                         // We are receiving GRE tunnelled packets (and removing UPSTRAM_DELTA bytes), whose length is > 64 bytes
500                         // So there should be no padding, but in case the is one, remove it
501                         uint16_t padlen = mbuf_calc_padlen(mbufs[j], cpe_pkt, &cpe_pkt->ipv4_hdr);
502
503                         if (padlen) {
504                                 rte_pktmbuf_trim(mbufs[j], padlen);
505                         }
506                         out[j] = handle_qinq_encap4(task, cpe_pkt, mbufs[j], entries[j]);
507                 }
508         }
509         else {
510                 for (uint16_t j = 0; j < n_pkts; ++j) {
511                         if (unlikely(!((lookup_hit_mask >> j) & 0x1))) {
512                                 handle_error(mbufs[j]);
513                                 out[j] = OUT_DISCARD;
514                                 continue;
515                         }
516                         struct cpe_pkt* cpe_pkt = (struct cpe_pkt*) rte_pktmbuf_adj(mbufs[j], UPSTREAM_DELTA);
517                         uint16_t padlen = mbuf_calc_padlen(mbufs[j], cpe_pkt, &cpe_pkt->ipv4_hdr);
518
519                         if (padlen) {
520                                 rte_pktmbuf_trim(mbufs[j], padlen);
521                         }
522                         out[j] = handle_qinq_encap4(task, cpe_pkt, mbufs[j], entries[j]);
523                 }
524         }
525
526         return task->base.tx_pkt(&task->base, mbufs, n_pkts, out);
527 }
528
529 static inline uint8_t handle_qinq_encap4(struct task_qinq_encap4 *task, struct cpe_pkt *cpe_pkt, struct rte_mbuf *mbuf, struct cpe_data *entry)
530 {
531         PROX_ASSERT(cpe_pkt);
532
533         if (cpe_pkt->ipv4_hdr.time_to_live) {
534                 cpe_pkt->ipv4_hdr.time_to_live--;
535         }
536         else {
537                 plog_info("TTL = 0 => Dropping\n");
538                 return OUT_DISCARD;
539         }
540         cpe_pkt->ipv4_hdr.hdr_checksum = 0;
541
542         restore_cpe(cpe_pkt, entry, task->qinq_tag, task->src_mac);
543
544         if (task->runtime_flags & TASK_CLASSIFY) {
545                 uint8_t queue = task->dscp[cpe_pkt->ipv4_hdr.type_of_service >> 2] & 0x3;
546                 uint8_t tc = task->dscp[cpe_pkt->ipv4_hdr.type_of_service >> 2] >> 2;
547
548                 prox_rte_sched_port_pkt_write(task->sched_port, mbuf, 0, entry->user, tc, queue, 0);
549         }
550 #ifdef ENABLE_EXTRA_USER_STATISTICS
551         task->stats_per_user[entry->user]++;
552 #endif
553         if (task->runtime_flags & TASK_TX_CRC) {
554                 prox_ip_cksum(mbuf, &cpe_pkt->ipv4_hdr, sizeof(struct qinq_hdr), sizeof(prox_rte_ipv4_hdr), task->offload_crc);
555         }
556         return entry->mac_port.out_idx;
557 }
558
559 static void flow_iter_next(struct flow_iter *iter, struct task_args *targ)
560 {
561         do {
562                 iter->idx++;
563                 uint8_t flag_features = iter->data;
564
565                 if (flag_features & TASK_FEATURE_LUT_QINQ_RSS) {
566                         // If there is a load balancer, number of worker thread is indicated by targ->nb_slave_threads and n_rxq = 0
567                         // If there is no load balancers, number of worker thread is indicated by n_rxq and nb_slave_threads = 0
568                         uint8_t nb_worker_threads, worker_thread_id;
569                         nb_worker_threads = 1;
570                         worker_thread_id = 1;
571                         if (targ->nb_slave_threads) {
572                                 nb_worker_threads = targ->nb_slave_threads;
573                                 worker_thread_id = targ->worker_thread_id;
574                         } else if (prox_port_cfg[targ->rx_port_queue[0].port].n_rxq) {
575                                 nb_worker_threads = prox_port_cfg[targ->rx_port_queue[0].port].n_rxq;
576                                 worker_thread_id = targ->rx_port_queue[0].queue;
577                         } else {
578                                 plog_err("Unexpected: unknown number of worker thread\n");
579                         }
580
581                         if (targ->nb_slave_threads == 0 || rss_to_queue(get_qinq_gre_map(targ)->entries[iter->idx].rss, nb_worker_threads) == worker_thread_id)
582                                 break;
583                 } else if (flag_features & TASK_FEATURE_LUT_QINQ_HASH) {
584                         uint64_t cvlan = rte_bswap16(get_qinq_gre_map(targ)->entries[iter->idx].cvlan & 0xFF0F);
585                         uint64_t svlan = rte_bswap16(get_qinq_gre_map(targ)->entries[iter->idx].svlan & 0xFF0F);
586                         uint64_t qinq = rte_bswap64((svlan << 32) | cvlan);
587                         uint8_t queue = rte_hash_crc(&qinq, 8, 0) % targ->nb_slave_threads;
588                         if (queue == targ->worker_thread_id)
589                                 break;
590                 } else if (flag_features & TASK_FEATURE_GRE_ID) {
591                         if (get_qinq_gre_map(targ)->entries[iter->idx].gre_id % targ->nb_slave_threads == targ->worker_thread_id)
592                                 break;
593                 }
594         } while (iter->idx != (int)get_qinq_gre_map(targ)->count);
595 }
596
597 static void flow_iter_beg(struct flow_iter *iter, struct task_args *targ)
598 {
599         uint32_t flag_features = 0;
600         if (targ->lb_friend_core != 0xFF) {
601                 struct task_args *lb_targ = &lcore_cfg[targ->lb_friend_core].targs[targ->lb_friend_task];
602                 flag_features = lb_targ->task_init->flag_features;
603                 plog_info("\t\tWT %d Updated features to %x from friend %d\n", targ->lconf->id, flag_features, targ->lb_friend_core);
604         } else {
605                 plog_info("\t\tWT %d has no friend\n", targ->lconf->id);
606         }
607         if (targ->nb_slave_threads == 0)  {
608                 // No slave threads, i.e. using RSS
609                 plog_info("feature was %x is now %x\n", flag_features, TASK_FEATURE_LUT_QINQ_RSS);
610                 flag_features = TASK_FEATURE_LUT_QINQ_RSS;
611         }
612         if ((flag_features & (TASK_FEATURE_GRE_ID|TASK_FEATURE_LUT_QINQ_RSS|TASK_FEATURE_LUT_QINQ_HASH)) == 0) {
613                 plog_info("\t\tCould not find flag feature from Load balancer => supposing TASK_FEATURE_GRE_ID\n");
614                 flag_features = TASK_FEATURE_GRE_ID;
615         }
616
617         iter->idx = -1;
618         flow_iter_next(iter, targ);
619 }
620
621 static int flow_iter_is_end(struct flow_iter *iter, struct task_args *targ)
622 {
623         return iter->idx == (int)get_qinq_gre_map(targ)->count;
624 }
625
626 static uint32_t flow_iter_get_gre_id(struct flow_iter *iter, struct task_args *targ)
627 {
628         return get_qinq_gre_map(targ)->entries[iter->idx].gre_id;
629 }
630
631 static struct task_init task_init_qinq_encap4_table = {
632         .mode = QINQ_ENCAP4,
633         .mode_str = "qinqencapv4",
634         .early_init = early_init_table,
635         .init = init_task_qinq_encap4,
636         .handle = handle_qinq_encap4_bulk,
637         /* In this case user in qinq_lookup table is the QoS user
638            (from user_table), i.e. usually from 0 to 32K Otherwise it
639            would have been a user from (0 to n_interface x 32K) */
640         .flow_iter = {
641                 .beg        = flow_iter_beg,
642                 .is_end     = flow_iter_is_end,
643                 .next       = flow_iter_next,
644                 .get_gre_id = flow_iter_get_gre_id,
645         },
646         .flag_features = TASK_FEATURE_CLASSIFY,
647         .size = sizeof(struct task_qinq_encap4)
648 };
649
650 static struct task_init task_init_qinq_encap4_table_pe = {
651         .mode = QINQ_ENCAP4,
652         .mode_str = "qinqencapv4",
653         .sub_mode_str = "pe",
654         .early_init = early_init_table,
655         .init = init_task_qinq_encap4,
656         .handle = handle_qinq_encap4_bulk_pe,
657         .flag_features = TASK_FEATURE_CLASSIFY,
658         .size = sizeof(struct task_qinq_encap4)
659 };
660
661 static struct task_init task_init_qinq_encap4_untag = {
662         .mode = QINQ_ENCAP4,
663         .sub_mode_str = "unmpls",
664         .mode_str = "qinqencapv4",
665         .init = init_task_qinq_encap4,
666         .handle = handle_qinq_encap4_untag_bulk,
667         .flag_features = TASK_FEATURE_CLASSIFY,
668         .size = sizeof(struct task_qinq_encap4)
669 };
670
671 __attribute__((constructor)) static void reg_task_qinq_encap4(void)
672 {
673         reg_task(&task_init_qinq_encap4_table);
674         reg_task(&task_init_qinq_encap4_table_pe);
675         reg_task(&task_init_qinq_encap4_untag);
676 }