Added initial support for BGP
[samplevnf.git] / VNFs / DPPD-PROX / handle_master.c
1 /*
2 // Copyright (c) 2010-2020 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <fcntl.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <linux/netlink.h>
21 #include <linux/rtnetlink.h>
22 #include <poll.h>
23 #include <net/if.h>
24
25 #include <rte_hash.h>
26 #include <rte_hash_crc.h>
27 #include <rte_ether.h>
28
29 #include "prox_cfg.h"
30 #include "prox_globals.h"
31 #include "rx_pkt.h"
32 #include "arp.h"
33 #include "handle_master.h"
34 #include "log.h"
35 #include "mbuf_utils.h"
36 #include "etypes.h"
37 #include "defaults.h"
38 #include "prox_malloc.h"
39 #include "quit.h"
40 #include "task_init.h"
41 #include "prox_port_cfg.h"
42 #include "main.h"
43 #include "lconf.h"
44 #include "input.h"
45 #include "tx_pkt.h"
46 #include "defines.h"
47
48 #define PROX_MAX_ARP_REQUESTS   32      // Maximum number of tasks requesting the same MAC address
49 #define NETLINK_BUF_SIZE        16384
50
51 static char netlink_buf[NETLINK_BUF_SIZE];
52
53 const char *actions_string[] = {
54         "UPDATE_FROM_CTRL",             // Controlplane sending a MAC update to dataplane
55         "ROUTE_ADD_FROM_CTRL",          // Controlplane sending a new route to dataplane
56         "ROUTE_DEL_FROM_CTRL",          // Controlplane deleting a new route from dataplane
57         "SEND_ARP_REQUEST_FROM_CTRL",   // Controlplane requesting dataplane to send ARP request
58         "SEND_ARP_REPLY_FROM_CTRL",     // Controlplane requesting dataplane to send ARP reply
59         "SEND_ICMP_FROM_CTRL",          // Controlplane requesting dataplane to send ICMP message
60         "SEND_BGP_FROM_CTRL",           // Controlplane requesting dataplane to send BGP message
61         "ARP_TO_CTRL",                  // ARP sent by datplane to Controlpane for handling
62         "ICMP_TO_CTRL",                 // ICMP sent by datplane to Controlpane for handling
63         "BGP_TO_CTRL",                  // BGP sent by datplane to Controlpane for handling
64         "REQ_MAC_TO_CTRL",              // Dataplane requesting MAC resolution to Controlplane
65         "PKT_FROM_TAP"                  // Packet received by Controlplane from kernel and forwarded to dataplane for sending
66 };
67
68 static struct my_arp_t arp_reply = {
69         .htype = 0x100,
70         .ptype = 8,
71         .hlen = 6,
72         .plen = 4,
73         .oper = 0x200
74 };
75 static struct my_arp_t arp_request = {
76         .htype = 0x100,
77         .ptype = 8,
78         .hlen = 6,
79         .plen = 4,
80         .oper = 0x100
81 };
82
83 struct ip_table {
84         prox_rte_ether_addr     mac;
85         struct rte_ring         *ring;
86 };
87
88 struct external_ip_table {
89         prox_rte_ether_addr     mac;
90         struct rte_ring         *rings[PROX_MAX_ARP_REQUESTS];
91         uint16_t                nb_requests;
92 };
93
94 struct port_table {
95         prox_rte_ether_addr     mac;
96         struct rte_ring         *ring;
97         uint32_t                ip;
98         uint8_t                 port;
99         uint8_t                 flags;
100         uint64_t last_echo_req_rcvd_tsc;
101         uint64_t last_echo_rep_rcvd_tsc;
102         uint32_t n_echo_req;
103         uint32_t n_echo_rep;
104 };
105
106 struct task_master {
107         struct task_base base;
108         struct rte_ring *ctrl_rx_ring;
109         struct rte_ring **ctrl_tx_rings;
110         struct ip_table *internal_ip_table;
111         struct external_ip_table *external_ip_table;
112         struct rte_hash  *external_ip_hash;
113         struct rte_hash  *internal_ip_hash;
114         struct port_table internal_port_table[PROX_MAX_PORTS];
115         struct vdev all_vdev[PROX_MAX_PORTS];
116         int max_vdev_id;
117         struct pollfd arp_fds;
118         struct pollfd route_fds;
119 };
120
121 struct ip_port {
122         uint32_t ip;
123         uint8_t port;
124 } __attribute__((packed));
125
126 static inline uint8_t get_command(struct rte_mbuf *mbuf)
127 {
128         return mbuf->udata64 & 0xFF;
129 }
130 static inline uint8_t get_task(struct rte_mbuf *mbuf)
131 {
132         return (mbuf->udata64 >> 8) & 0xFF;
133 }
134 static inline uint8_t get_core(struct rte_mbuf *mbuf)
135 {
136         return (mbuf->udata64 >> 16) & 0xFF;
137 }
138 static inline uint8_t get_port(struct rte_mbuf *mbuf)
139 {
140         return mbuf->port;
141 }
142 static inline uint32_t get_ip(struct rte_mbuf *mbuf)
143 {
144         return (mbuf->udata64 >> 32) & 0xFFFFFFFF;
145 }
146
147 void master_init_vdev(struct task_base *tbase, uint8_t port_id, uint8_t core_id, uint8_t task_id)
148 {
149         struct task_master *task = (struct task_master *)tbase;
150         uint8_t vdev_port = prox_port_cfg[port_id].dpdk_mapping;
151         int rc, i;
152         if (vdev_port != NO_VDEV_PORT) {
153                 for (i = 0; i < task->max_vdev_id; i++) {
154                         if (task->all_vdev[i].port_id == vdev_port)
155                                 break;
156                 }
157                 if (i <  task->max_vdev_id) {
158                         // Already initialized (e.g. by another core handling the same port).
159                         return;
160                 }
161                 task->all_vdev[task->max_vdev_id].port_id = vdev_port;
162                 task->all_vdev[task->max_vdev_id].ring = task->ctrl_tx_rings[core_id * MAX_TASKS_PER_CORE + task_id];
163
164                 struct sockaddr_in dst, src;
165                 src.sin_family = AF_INET;
166                 src.sin_addr.s_addr = prox_port_cfg[vdev_port].ip;
167                 src.sin_port = rte_cpu_to_be_16(5000);
168
169                 int fd = socket(AF_INET,  SOCK_DGRAM, 0);
170                 PROX_PANIC(fd < 0, "Failed to open socket(AF_INET,  SOCK_DGRAM, 0)\n");
171                 prox_port_cfg[vdev_port].fd = fd;
172                 rc = bind(fd,(struct sockaddr *)&src, sizeof(struct sockaddr_in));
173                 PROX_PANIC(rc, "Failed to bind("IPv4_BYTES_FMT":%d): errno = %d\n", IPv4_BYTES(((uint8_t*)&src.sin_addr.s_addr)), src.sin_port, errno);
174                 plog_info("DPDK port %d bound("IPv4_BYTES_FMT":%d) to fd %d\n", port_id, IPv4_BYTES(((uint8_t*)&src.sin_addr.s_addr)), src.sin_port, fd);
175                 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
176                 task->max_vdev_id++;
177         }
178 }
179
180 void register_ip_to_ctrl_plane(struct task_base *tbase, uint32_t ip, uint8_t port_id, uint8_t core_id, uint8_t task_id)
181 {
182         struct task_master *task = (struct task_master *)tbase;
183         struct ip_port key;
184         plogx_info("\tregistering IP "IPv4_BYTES_FMT" with port %d core %d and task %d\n", IP4(ip), port_id, core_id, task_id);
185
186         if (port_id >= PROX_MAX_PORTS) {
187                 plog_err("Unable to register ip "IPv4_BYTES_FMT", port %d\n", IP4(ip), port_id);
188                 return;
189         }
190
191         /* TODO - stoe multiple rings if multiple cores able to handle IP
192            Remove them when such cores are stopped and de-register IP
193         */
194         task->internal_port_table[port_id].ring = task->ctrl_tx_rings[core_id * MAX_TASKS_PER_CORE + task_id];
195         memcpy(&task->internal_port_table[port_id].mac, &prox_port_cfg[port_id].eth_addr, sizeof(prox_rte_ether_addr));
196         task->internal_port_table[port_id].ip = ip;
197
198         if (ip == RANDOM_IP) {
199                 task->internal_port_table[port_id].flags |= HANDLE_RANDOM_IP_FLAG;
200                 return;
201         }
202
203         key.ip = ip;
204         key.port = port_id;
205         int ret = rte_hash_add_key(task->internal_ip_hash, (const void *)&key);
206         if (unlikely(ret < 0)) {
207                 plog_err("Unable to register ip "IPv4_BYTES_FMT"\n", IP4(ip));
208                 return;
209         }
210         memcpy(&task->internal_ip_table[ret].mac, &prox_port_cfg[port_id].eth_addr, sizeof(prox_rte_ether_addr));
211         task->internal_ip_table[ret].ring = task->ctrl_tx_rings[core_id * MAX_TASKS_PER_CORE + task_id];
212
213 }
214
215 static inline void handle_arp_reply(struct task_base *tbase, struct rte_mbuf *mbuf)
216 {
217         struct task_master *task = (struct task_master *)tbase;
218         struct ether_hdr_arp *hdr_arp = rte_pktmbuf_mtod(mbuf, struct ether_hdr_arp *);
219         int i, ret;
220         uint32_t key = hdr_arp->arp.data.spa;
221         plogx_dbg("\tMaster handling ARP reply for ip "IPv4_BYTES_FMT"\n", IP4(key));
222
223         ret = rte_hash_lookup(task->external_ip_hash, (const void *)&key);
224         if (unlikely(ret < 0)) {
225                 // entry not found for this IP: we did not ask a request, delete the reply
226                 tx_drop(mbuf);
227         } else {
228                 // entry found for this IP
229                 uint16_t nb_requests = task->external_ip_table[ret].nb_requests;
230                 // If we receive a request from multiple task for the same IP, then we update all tasks
231                 if (task->external_ip_table[ret].nb_requests) {
232                         rte_mbuf_refcnt_set(mbuf, nb_requests);
233                         for (int i = 0; i < nb_requests; i++) {
234                                 struct rte_ring *ring = task->external_ip_table[ret].rings[i];
235                                 tx_ring_ip(tbase, ring, UPDATE_FROM_CTRL, mbuf, key);
236                         }
237                         task->external_ip_table[ret].nb_requests = 0;
238                 } else {
239                         tx_drop(mbuf);
240                 }
241         }
242 }
243
244 static inline void handle_arp_request(struct task_base *tbase, struct rte_mbuf *mbuf)
245 {
246         struct task_master *task = (struct task_master *)tbase;
247         struct ether_hdr_arp *hdr_arp = rte_pktmbuf_mtod(mbuf, struct ether_hdr_arp *);
248         int i, ret;
249         uint8_t port = get_port(mbuf);
250
251         struct ip_port key;
252         key.ip = hdr_arp->arp.data.tpa;
253         key.port = port;
254         if (task->internal_port_table[port].flags & HANDLE_RANDOM_IP_FLAG) {
255                 prox_rte_ether_addr mac;
256                 plogx_dbg("\tMaster handling ARP request for ip "IPv4_BYTES_FMT" on port %d which supports random ip\n", IP4(key.ip), key.port);
257                 struct rte_ring *ring = task->internal_port_table[port].ring;
258                 create_mac(hdr_arp, &mac);
259                 mbuf->ol_flags &= ~(PKT_TX_IP_CKSUM|PKT_TX_UDP_CKSUM);
260                 build_arp_reply(hdr_arp, &mac);
261                 tx_ring(tbase, ring, ARP_REPLY_FROM_CTRL, mbuf);
262                 return;
263         }
264
265         plogx_dbg("\tMaster handling ARP request for ip "IPv4_BYTES_FMT"\n", IP4(key.ip));
266
267         ret = rte_hash_lookup(task->internal_ip_hash, (const void *)&key);
268         if (unlikely(ret < 0)) {
269                 // entry not found for this IP.
270                 plogx_dbg("Master ignoring ARP REQUEST received on un-registered IP "IPv4_BYTES_FMT" on port %d\n", IP4(hdr_arp->arp.data.tpa), port);
271                 tx_drop(mbuf);
272         } else {
273                 struct rte_ring *ring = task->internal_ip_table[ret].ring;
274                 mbuf->ol_flags &= ~(PKT_TX_IP_CKSUM|PKT_TX_UDP_CKSUM);
275                 build_arp_reply(hdr_arp, &task->internal_ip_table[ret].mac);
276                 tx_ring(tbase, ring, ARP_REPLY_FROM_CTRL, mbuf);
277         }
278 }
279
280 static inline int record_request(struct task_base *tbase, uint32_t ip_dst, uint8_t port, struct rte_ring *ring)
281 {
282         struct task_master *task = (struct task_master *)tbase;
283         int ret = rte_hash_add_key(task->external_ip_hash, (const void *)&ip_dst);
284         int i;
285
286         if (unlikely(ret < 0)) {
287                 plogx_dbg("Unable to add IP "IPv4_BYTES_FMT" in external_ip_hash\n", IP4(ip_dst));
288                 return -1;
289         }
290
291         // If multiple tasks requesting the same info, we will need to send a reply to all of them
292         // However if one task sends multiple requests to the same IP (e.g. because it is not answering)
293         // then we should not send multiple replies to the same task
294         if (task->external_ip_table[ret].nb_requests >= PROX_MAX_ARP_REQUESTS) {
295                 // This can only happen if really many tasks requests the same IP
296                 plogx_dbg("Unable to add request for IP "IPv4_BYTES_FMT" in external_ip_table\n", IP4(ip_dst));
297                 return -1;
298         }
299         for (i = 0; i < task->external_ip_table[ret].nb_requests; i++) {
300                 if (task->external_ip_table[ret].rings[i] == ring)
301                         break;
302         }
303         if (i >= task->external_ip_table[ret].nb_requests) {
304                 // If this is a new request i.e. a new task requesting a new IP
305                 task->external_ip_table[ret].rings[task->external_ip_table[ret].nb_requests] = ring;
306                 task->external_ip_table[ret].nb_requests++;
307         }
308         return 0;
309 }
310
311 static inline void handle_unknown_ip(struct task_base *tbase, struct rte_mbuf *mbuf)
312 {
313         struct task_master *task = (struct task_master *)tbase;
314         struct ether_hdr_arp *hdr_arp = rte_pktmbuf_mtod(mbuf, struct ether_hdr_arp *);
315         uint8_t port = get_port(mbuf);
316         uint32_t ip_dst = get_ip(mbuf);
317
318         plogx_dbg("\tMaster handling unknown ip "IPv4_BYTES_FMT" for port %d\n", IP4(ip_dst), port);
319         if (unlikely(port >= PROX_MAX_PORTS)) {
320                 plogx_dbg("Port %d not found", port);
321                 tx_drop(mbuf);
322                 return;
323         }
324         uint32_t ip_src = task->internal_port_table[port].ip;
325         struct rte_ring *ring = task->ctrl_tx_rings[get_core(mbuf) * MAX_TASKS_PER_CORE + get_task(mbuf)];
326
327         if (ring == NULL) {
328                 plogx_dbg("Port %d not registered", port);
329                 tx_drop(mbuf);
330                 return;
331         }
332
333         if (record_request(tbase, ip_dst, port, ring) < 0) {
334                 tx_drop(mbuf);
335                 return;
336         }
337         // We send an ARP request even if one was just sent (and not yet answered) by another task
338         mbuf->ol_flags &= ~(PKT_TX_IP_CKSUM|PKT_TX_UDP_CKSUM);
339         build_arp_request(mbuf, &task->internal_port_table[port].mac, ip_dst, ip_src);
340         tx_ring(tbase, ring, ARP_REQ_FROM_CTRL, mbuf);
341 }
342
343 static inline void build_icmp_reply_message(struct task_base *tbase, struct rte_mbuf *mbuf)
344 {
345         struct task_master *task = (struct task_master *)tbase;
346         struct ip_port key;
347         key.port = mbuf->port;
348         prox_rte_ether_hdr *hdr = rte_pktmbuf_mtod(mbuf, prox_rte_ether_hdr *);
349         prox_rte_ether_addr dst_mac;
350         prox_rte_ether_addr_copy(&hdr->s_addr, &dst_mac);
351         prox_rte_ether_addr_copy(&hdr->d_addr, &hdr->s_addr);
352         prox_rte_ether_addr_copy(&dst_mac, &hdr->d_addr);
353         prox_rte_ipv4_hdr *ip_hdr = (prox_rte_ipv4_hdr *)(hdr + 1);
354         key.ip = ip_hdr->dst_addr;
355         ip_hdr->dst_addr = ip_hdr->src_addr;
356         ip_hdr->src_addr = key.ip;
357         prox_rte_icmp_hdr *picmp = (prox_rte_icmp_hdr *)(ip_hdr + 1);
358         picmp->icmp_type = PROX_RTE_IP_ICMP_ECHO_REPLY;
359
360         int ret = rte_hash_lookup(task->internal_ip_hash, (const void *)&key);
361         if (unlikely(ret < 0)) {
362                 // entry not found for this IP.
363                 plogx_dbg("Master ignoring ICMP received on un-registered IP "IPv4_BYTES_FMT" on port %d\n", IP4(key.ip), mbuf->port);
364                 tx_drop(mbuf);
365         } else {
366                 struct rte_ring *ring = task->internal_ip_table[ret].ring;
367                 mbuf->ol_flags &= ~(PKT_TX_IP_CKSUM|PKT_TX_UDP_CKSUM);
368                 tx_ring(tbase, ring, ICMP_FROM_CTRL, mbuf);
369         }
370 }
371
372 static inline void handle_icmp(struct task_base *tbase, struct rte_mbuf *mbuf)
373 {
374         struct task_master *task = (struct task_master *)tbase;
375         uint8_t port_id = mbuf->port;
376         struct port_table *port = &task->internal_port_table[port_id];
377         prox_rte_ether_hdr *hdr = rte_pktmbuf_mtod(mbuf, prox_rte_ether_hdr *);
378         if (hdr->ether_type != ETYPE_IPv4) {
379                 tx_drop(mbuf);
380                 return;
381         }
382         prox_rte_ipv4_hdr *ip_hdr = (prox_rte_ipv4_hdr *)(hdr + 1);
383         if (ip_hdr->next_proto_id != IPPROTO_ICMP) {
384                 tx_drop(mbuf);
385                 return;
386         }
387         if (ip_hdr->dst_addr != port->ip) {
388                 tx_drop(mbuf);
389                 return;
390         }
391
392         prox_rte_icmp_hdr *picmp = (prox_rte_icmp_hdr *)(ip_hdr + 1);
393         uint8_t type = picmp->icmp_type;
394         if (type == PROX_RTE_IP_ICMP_ECHO_REQUEST) {
395                 port->n_echo_req++;
396                 if (rte_rdtsc() - port->last_echo_req_rcvd_tsc > rte_get_tsc_hz()) {
397                         plog_dbg("Received %u Echo Request on IP "IPv4_BYTES_FMT" (last received from IP "IPv4_BYTES_FMT")\n", port->n_echo_req, IPv4_BYTES(((uint8_t*)&ip_hdr->dst_addr)), IPv4_BYTES(((uint8_t*)&ip_hdr->src_addr)));
398                         port->n_echo_req = 0;
399                         port->last_echo_req_rcvd_tsc = rte_rdtsc();
400                 }
401                 build_icmp_reply_message(tbase, mbuf);
402         } else if (type == PROX_RTE_IP_ICMP_ECHO_REPLY) {
403                 port->n_echo_rep++;
404                 if (rte_rdtsc() - port->last_echo_rep_rcvd_tsc > rte_get_tsc_hz()) {
405                         plog_info("Received %u Echo Reply on IP "IPv4_BYTES_FMT" (last received from IP "IPv4_BYTES_FMT")\n", port->n_echo_rep, IPv4_BYTES(((uint8_t*)&ip_hdr->dst_addr)), IPv4_BYTES(((uint8_t*)&ip_hdr->src_addr)));
406                         port->n_echo_rep = 0;
407                         port->last_echo_rep_rcvd_tsc = rte_rdtsc();
408                 }
409         }
410         tx_drop(mbuf);
411         return;
412 }
413
414 static inline void handle_message(struct task_base *tbase, struct rte_mbuf *mbuf, int ring_id)
415 {
416         struct task_master *task = (struct task_master *)tbase;
417         struct ether_hdr_arp *hdr_arp = rte_pktmbuf_mtod(mbuf, struct ether_hdr_arp *);
418         int command = get_command(mbuf);
419         uint8_t port = get_port(mbuf);
420         uint32_t ip;
421         uint8_t vdev_port = prox_port_cfg[port].dpdk_mapping;
422         plogx_dbg("\tMaster received %s (%x) from mbuf %p\n", actions_string[command], command, mbuf);
423
424         switch(command) {
425         case BGP_TO_CTRL:
426                 if (vdev_port != NO_VDEV_PORT) {
427                         // If a virtual (net_tap) device is attached, send the (BGP) packet to this device
428                         // The kernel will receive and handle it.
429                         plogx_dbg("\tMaster forwarding BGP packet to TAP\n");
430                         int n = rte_eth_tx_burst(prox_port_cfg[port].dpdk_mapping, 0, &mbuf, 1);
431                         return;
432                 }
433                 tx_drop(mbuf);
434                 break;
435         case ICMP_TO_CTRL:
436                 if (vdev_port != NO_VDEV_PORT) {
437                         // If a virtual (net_tap) device is attached, send the (PING) packet to this device
438                         // The kernel will receive and handle it.
439                         plogx_dbg("\tMaster forwarding packet to TAP\n");
440                         int n = rte_eth_tx_burst(prox_port_cfg[port].dpdk_mapping, 0, &mbuf, 1);
441                         return;
442                 }
443                 handle_icmp(tbase, mbuf);
444                 break;
445         case ARP_TO_CTRL:
446                 if (vdev_port != NO_VDEV_PORT) {
447                         // If a virtual (net_tap) device is attached, send the (ARP) packet to this device
448                         // The kernel will receive and handle it.
449                         plogx_dbg("\tMaster forwarding packet to TAP\n");
450                         int n = rte_eth_tx_burst(prox_port_cfg[port].dpdk_mapping, 0, &mbuf, 1);
451                         return;
452                 }
453                 if (hdr_arp->ether_hdr.ether_type != ETYPE_ARP) {
454                         plog_err("\tUnexpected message received: ARP_TO_CTRL with ether_type %x\n", hdr_arp->ether_hdr.ether_type);
455                         tx_drop(mbuf);
456                         return;
457                 } else if (arp_is_gratuitous(hdr_arp)) {
458                         plog_info("\tReceived gratuitous packet \n");
459                         tx_drop(mbuf);
460                         return;
461                 } else if (memcmp(&hdr_arp->arp, &arp_reply, 8) == 0) {
462                         uint32_t ip = hdr_arp->arp.data.spa;
463                         handle_arp_reply(tbase, mbuf);
464                 } else if (memcmp(&hdr_arp->arp, &arp_request, 8) == 0) {
465                         handle_arp_request(tbase, mbuf);
466                 } else {
467                         plog_info("\tReceived unexpected ARP operation %d\n", hdr_arp->arp.oper);
468                         tx_drop(mbuf);
469                         return;
470                 }
471                 break;
472         case REQ_MAC_TO_CTRL:
473                 if (vdev_port != NO_VDEV_PORT) {
474                         // We send a packet to the kernel with the proper destnation IP address and our src IP address
475                         // This means that if a generator sends packets from many sources all ARP will still
476                         // be sent from the same IP src. This might be a limitation.
477                         // This prevent to have to open as many sockets as there are sources MAC addresses
478                         // We also always use the same UDP ports - as the packet will finally not leave the system anyhow
479
480                         struct ether_hdr_arp *hdr_arp = rte_pktmbuf_mtod(mbuf, struct ether_hdr_arp *);
481                         uint32_t ip = get_ip(mbuf);
482                         struct rte_ring *ring = task->ctrl_tx_rings[get_core(mbuf) * MAX_TASKS_PER_CORE + get_task(mbuf)];
483
484                         // First check whether MAC address is not already in kernel MAC table.
485                         // If present in our hash with a non-null MAC, then present in kernel. A null MAC
486                         // might just mean that we sent a request.
487                         // If MAC present in kernel, do not send a packet towards the kernel to try to generate
488                         // an ARP request, as the kernel would not generate it.
489                         int ret = rte_hash_lookup(task->external_ip_hash, (const void *)&ip);
490                         if ((ret >= 0) && (!prox_rte_is_zero_ether_addr(&task->external_ip_table[ret].mac))) {
491                                 memcpy(&hdr_arp->arp.data.sha, &task->external_ip_table[ret].mac, sizeof(prox_rte_ether_addr));
492                                 plogx_dbg("\tMaster ready to send UPDATE_FROM_CTRL ip "IPv4_BYTES_FMT" with mac "MAC_BYTES_FMT"\n",
493                                         IP4(ip), MAC_BYTES(hdr_arp->arp.data.sha.addr_bytes));
494                                 tx_ring_ip(tbase, ring, UPDATE_FROM_CTRL, mbuf, ip);
495                                 return;
496                         }
497
498                         struct sockaddr_in dst;
499                         dst.sin_family = AF_INET;
500                         dst.sin_addr.s_addr = ip;
501                         dst.sin_port = rte_cpu_to_be_16(5000);
502                         int n = sendto(prox_port_cfg[vdev_port].fd, (char*)(&ip), 0, 0,  (struct sockaddr *)&dst, sizeof(struct sockaddr_in));
503                         plogx_dbg("\tSent %d bytes to TAP IP "IPv4_BYTES_FMT" using fd %d\n", n, IPv4_BYTES(((uint8_t*)&ip)), prox_port_cfg[vdev_port].fd);
504
505                         record_request(tbase, ip, port, ring);
506                         tx_drop(mbuf);
507                         break;
508                 }
509                 handle_unknown_ip(tbase, mbuf);
510                 break;
511         default:
512                 plogx_dbg("\tMaster received unexpected message\n");
513                 tx_drop(mbuf);
514                 break;
515         }
516 }
517
518 void init_ctrl_plane(struct task_base *tbase)
519 {
520         prox_cfg.flags |= DSF_CTRL_PLANE_ENABLED;
521         struct task_master *task = (struct task_master *)tbase;
522         int socket_id = rte_lcore_to_socket_id(prox_cfg.master);
523         uint32_t n_entries = MAX_ARP_ENTRIES * 4;
524         static char hash_name[30];
525
526         sprintf(hash_name, "A%03d_hash_arp_table", prox_cfg.master);
527         struct rte_hash_parameters hash_params = {
528                 .name = hash_name,
529                 .entries = n_entries,
530                 .key_len = sizeof(uint32_t),
531                 .hash_func = rte_hash_crc,
532                 .hash_func_init_val = 0,
533         };
534         task->external_ip_hash = rte_hash_create(&hash_params);
535         PROX_PANIC(task->external_ip_hash == NULL, "Failed to set up external ip hash\n");
536         plog_info("\texternal ip hash table allocated, with %d entries of size %d\n", hash_params.entries, hash_params.key_len);
537         task->external_ip_table = (struct external_ip_table *)prox_zmalloc(n_entries * sizeof(struct external_ip_table), socket_id);
538         PROX_PANIC(task->external_ip_table == NULL, "Failed to allocate memory for %u entries in external ip table\n", n_entries);
539         plog_info("\texternal ip table, with %d entries of size %ld\n", n_entries, sizeof(struct external_ip_table));
540
541         hash_name[0]++;
542         hash_params.key_len = sizeof(struct ip_port);
543         task->internal_ip_hash = rte_hash_create(&hash_params);
544         PROX_PANIC(task->internal_ip_hash == NULL, "Failed to set up internal ip hash\n");
545         plog_info("\tinternal ip hash table allocated, with %d entries of size %d\n", hash_params.entries, hash_params.key_len);
546         task->internal_ip_table = (struct ip_table *)prox_zmalloc(n_entries * sizeof(struct ip_table), socket_id);
547         PROX_PANIC(task->internal_ip_table == NULL, "Failed to allocate memory for %u entries in internal ip table\n", n_entries);
548         plog_info("\tinternal ip table, with %d entries of size %ld\n", n_entries, sizeof(struct ip_table));
549
550         int fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE);
551         PROX_PANIC(fd < 0, "Failed to open netlink socket: %d\n", errno);
552         fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
553
554         struct sockaddr_nl sockaddr;
555         memset(&sockaddr, 0, sizeof(struct sockaddr_nl));
556         sockaddr.nl_family = AF_NETLINK;
557         sockaddr.nl_groups = RTMGRP_NEIGH | RTMGRP_NOTIFY;
558         int rc = bind(fd, (struct sockaddr *)&sockaddr, sizeof(struct sockaddr_nl));
559         PROX_PANIC(rc < 0, "Failed to bind to RTMGRP_NEIGH netlink group\n");
560         task->arp_fds.fd = fd;
561         task->arp_fds.events = POLL_IN;
562         plog_info("\tRTMGRP_NEIGH netlink group bound; fd = %d\n", fd);
563
564         fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE);
565         PROX_PANIC(fd < 0, "Failed to open netlink socket: %d\n", errno);
566         fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
567         struct sockaddr_nl sockaddr2;
568         memset(&sockaddr2, 0, sizeof(struct sockaddr_nl));
569         sockaddr2.nl_family = AF_NETLINK;
570         sockaddr2.nl_groups = RTMGRP_IPV6_ROUTE | RTMGRP_IPV4_ROUTE | RTMGRP_NOTIFY;
571         rc = bind(fd, (struct sockaddr *)&sockaddr2, sizeof(struct sockaddr_nl));
572         PROX_PANIC(rc < 0, "Failed to bind to RTMGRP_NEIGH netlink group\n");
573         task->route_fds.fd = fd;
574         task->route_fds.events = POLL_IN;
575         plog_info("\tRTMGRP_IPV4_ROUTE netlink group bound; fd = %d\n", fd);
576
577         static char name[] = "master_arp_pool";
578         const int NB_ARP_MBUF = 1024;
579         const int ARP_MBUF_SIZE = 2048;
580         const int NB_CACHE_ARP_MBUF = 256;
581         struct rte_mempool *ret = rte_mempool_create(name, NB_ARP_MBUF, ARP_MBUF_SIZE, NB_CACHE_ARP_MBUF,
582                 sizeof(struct rte_pktmbuf_pool_private), rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, 0,
583                 rte_socket_id(), 0);
584         PROX_PANIC(ret == NULL, "Failed to allocate ARP memory pool on socket %u with %u elements\n",
585                 rte_socket_id(), NB_ARP_MBUF);
586         plog_info("\t\tMempool %p (%s) size = %u * %u cache %u, socket %d\n", ret, name, NB_ARP_MBUF,
587                 ARP_MBUF_SIZE, NB_CACHE_ARP_MBUF, rte_socket_id());
588         tbase->l3.arp_pool = ret;
589 }
590
591 static void handle_route_event(struct task_base *tbase)
592 {
593         struct task_master *task = (struct task_master *)tbase;
594         struct rte_mbuf *mbufs[MAX_RING_BURST];
595         int fd = task->route_fds.fd, interface_index, mask = -1;
596         char interface_name[IF_NAMESIZE] = {0};
597         int len = recv(fd, netlink_buf, sizeof(netlink_buf), 0);
598         uint32_t ip = 0, gw_ip = 0;
599         if (len < 0) {
600                 plog_err("Failed to recv from netlink: %d\n", errno);
601                 return;
602         }
603         struct nlmsghdr * nl_hdr = (struct nlmsghdr *)netlink_buf;
604         if (nl_hdr->nlmsg_flags & NLM_F_MULTI) {
605                 plog_err("Unexpected multipart netlink message\n");
606                 return;
607         }
608         if ((nl_hdr->nlmsg_type != RTM_NEWROUTE) && (nl_hdr->nlmsg_type != RTM_DELROUTE))
609                 return;
610
611         struct rtmsg *rtmsg = (struct rtmsg *)NLMSG_DATA(nl_hdr);
612         int rtm_family = rtmsg->rtm_family;
613         if ((rtm_family == AF_INET) && (rtmsg->rtm_table != RT_TABLE_MAIN) &&(rtmsg->rtm_table != RT_TABLE_LOCAL))
614                 return;
615         int dst_len = rtmsg->rtm_dst_len;
616
617         struct rtattr *rta = (struct rtattr *)RTM_RTA(rtmsg);
618         int rtl = RTM_PAYLOAD(nl_hdr);
619         for (; RTA_OK(rta, rtl); rta = RTA_NEXT(rta, rtl)) {
620                 switch (rta->rta_type) {
621                 case RTA_DST:
622                         ip = *((uint32_t *)RTA_DATA(rta));
623                         break;
624                 case RTA_OIF:
625                         interface_index = *((int *)RTA_DATA(rta));
626                         if (if_indextoname(interface_index, interface_name) == NULL) {
627                                 plog_info("Unknown Interface Index %d\n", interface_index);
628                         }
629                         break;
630                 case RTA_METRICS:
631                         mask = *((int *)RTA_DATA(rta));
632                         break;
633                 case RTA_GATEWAY:
634                         gw_ip = *((uint32_t *)RTA_DATA(rta));
635                         break;
636                 default:
637                         break;
638                 }
639         }
640         int dpdk_vdev_port = -1;
641         for (int i = 0; i< rte_eth_dev_count(); i++) {
642                 if (strcmp(prox_port_cfg[i].name, interface_name) == 0)
643                         dpdk_vdev_port = i;
644         }
645         if (dpdk_vdev_port != -1) {
646                 plogx_info("Received netlink message on tap interface %s for IP "IPv4_BYTES_FMT"/%d, Gateway  "IPv4_BYTES_FMT"\n", interface_name, IP4(ip), dst_len, IP4(gw_ip));
647                 int ret1 = rte_mempool_get(tbase->l3.arp_pool, (void **)mbufs);
648                 if (unlikely(ret1 != 0)) {
649                         plog_err("Unable to allocate a mbuf for master to core communication\n");
650                         return;
651                 }
652                 int dpdk_port = prox_port_cfg[dpdk_vdev_port].dpdk_mapping;
653                 tx_ring_route(tbase, task->internal_port_table[dpdk_port].ring, (nl_hdr->nlmsg_type == RTM_NEWROUTE), mbufs[0], ip, gw_ip, dst_len);
654         } else
655                 plog_info("Received netlink message on unknown interface %s for IP "IPv4_BYTES_FMT"/%d, Gateway  "IPv4_BYTES_FMT"\n", interface_name[0] ? interface_name:"", IP4(ip), dst_len, IP4(gw_ip));
656         return;
657 }
658
659 static void handle_arp_event(struct task_base *tbase)
660 {
661         struct task_master *task = (struct task_master *)tbase;
662         struct rte_mbuf *mbufs[MAX_RING_BURST];
663         struct nlmsghdr * nl_hdr;
664         int fd = task->arp_fds.fd;
665         int len, ret;
666         uint32_t ip = 0;
667         prox_rte_ether_addr mac;
668         memset(&mac, 0, sizeof(mac));
669         len = recv(fd, netlink_buf, sizeof(netlink_buf), 0);
670         if (len < 0) {
671                 plog_err("Failed to recv from netlink: %d\n", errno);
672                 return;
673         }
674         nl_hdr = (struct nlmsghdr *)netlink_buf;
675         if (nl_hdr->nlmsg_flags & NLM_F_MULTI) {
676                 plog_err("Unexpected multipart netlink message\n");
677                 return;
678         }
679         if ((nl_hdr->nlmsg_type != RTM_NEWNEIGH) && (nl_hdr->nlmsg_type != RTM_DELNEIGH))
680                 return;
681
682         struct ndmsg *ndmsg = (struct ndmsg *)NLMSG_DATA(nl_hdr);
683         int ndm_family = ndmsg->ndm_family;
684         struct rtattr *rta = (struct rtattr *)RTM_RTA(ndmsg);
685         int rtl = RTM_PAYLOAD(nl_hdr);
686         for (; RTA_OK(rta, rtl); rta = RTA_NEXT(rta, rtl)) {
687                 switch (rta->rta_type) {
688                 case NDA_DST:
689                         ip = *((uint32_t *)RTA_DATA(rta));
690                         break;
691                 case NDA_LLADDR:
692                         mac = *((prox_rte_ether_addr *)(uint64_t *)RTA_DATA(rta));
693                         break;
694                 default:
695                         break;
696                 }
697         }
698         plogx_info("Received netlink ip "IPv4_BYTES_FMT" with mac "MAC_BYTES_FMT"\n", IP4(ip), MAC_BYTES(mac.addr_bytes));
699         ret = rte_hash_lookup(task->external_ip_hash, (const void *)&ip);
700         if (unlikely(ret < 0)) {
701                 // entry not found for this IP: we did not ask a request.
702                 // This can happen if the kernel updated the ARP table when receiving an ARP_REQUEST
703                 // We must record this, as the ARP entry is now in the kernel table
704                 if (prox_rte_is_zero_ether_addr(&mac)) {
705                         // Timeout or MAC deleted from kernel MAC table
706                         int ret = rte_hash_del_key(task->external_ip_hash, (const void *)&ip);
707                         plogx_dbg("ip "IPv4_BYTES_FMT" removed from external_ip_hash\n", IP4(ip));
708                         return;
709                 }
710                 int ret = rte_hash_add_key(task->external_ip_hash, (const void *)&ip);
711                 if (unlikely(ret < 0)) {
712                         plogx_dbg("IP "IPv4_BYTES_FMT" not found in external_ip_hash and unable to add it\n", IP4(ip));
713                         return;
714                 }
715                 memcpy(&task->external_ip_table[ret].mac, &mac, sizeof(prox_rte_ether_addr));
716                 plogx_dbg("ip "IPv4_BYTES_FMT" added in external_ip_hash with mac "MAC_BYTES_FMT"\n", IP4(ip), MAC_BYTES(mac.addr_bytes));
717                 return;
718         }
719
720         // entry found for this IP
721         uint16_t nb_requests = task->external_ip_table[ret].nb_requests;
722         if (nb_requests == 0) {
723                 return;
724         }
725
726         memcpy(&task->external_ip_table[ret].mac, &mac, sizeof(prox_rte_ether_addr));
727
728         // If we receive a request from multiple task for the same IP, then we update all tasks
729         int ret1 = rte_mempool_get(tbase->l3.arp_pool, (void **)mbufs);
730         if (unlikely(ret1 != 0)) {
731                 plog_err("Unable to allocate a mbuf for master to core communication\n");
732                 return;
733         }
734         rte_mbuf_refcnt_set(mbufs[0], nb_requests);
735         for (int i = 0; i < nb_requests; i++) {
736                 struct rte_ring *ring = task->external_ip_table[ret].rings[i];
737                 struct ether_hdr_arp *hdr = rte_pktmbuf_mtod(mbufs[0], struct ether_hdr_arp *);
738                 memcpy(&hdr->arp.data.sha, &mac, sizeof(prox_rte_ether_addr));
739                 tx_ring_ip(tbase, ring, UPDATE_FROM_CTRL, mbufs[0], ip);
740                 plog_dbg("UPDATE_FROM_CTRL ip "IPv4_BYTES_FMT" with mac "MAC_BYTES_FMT"\n", IP4(ip), MAC_BYTES(mac.addr_bytes));
741         }
742         task->external_ip_table[ret].nb_requests = 0;
743         return;
744 }
745
746 static int handle_ctrl_plane_f(struct task_base *tbase, __attribute__((unused)) struct rte_mbuf **mbuf, uint16_t n_pkts)
747 {
748         int ring_id = 0, j, ret = 0, n = 0;
749         struct rte_mbuf *mbufs[MAX_RING_BURST];
750         struct task_master *task = (struct task_master *)tbase;
751
752         /*      Handle_master works differently than other handle functions
753                 It is not handled by a DPDK dataplane core
754                 It is no thread_generic based, hence do not receive packets the same way
755         */
756
757         ret = ring_deq(task->ctrl_rx_ring, mbufs);
758         for (j = 0; j < ret; j++) {
759                 handle_message(tbase, mbufs[j], ring_id);
760         }
761         for (int vdev_id = 0; vdev_id < task->max_vdev_id; vdev_id++) {
762                 struct vdev *vdev = &task->all_vdev[vdev_id];
763                 n = rte_eth_rx_burst(vdev->port_id, 0, mbufs, MAX_PKT_BURST);
764                 for (j = 0; j < n; j++) {
765                         tx_ring(tbase, vdev->ring, PKT_FROM_TAP, mbufs[j]);
766                 }
767                 ret +=n;
768         }
769         if ((task->max_vdev_id) && (poll(&task->arp_fds, 1, prox_cfg.poll_timeout) == POLL_IN)) {
770                 handle_arp_event(tbase);
771         }
772         if (poll(&task->route_fds, 1, prox_cfg.poll_timeout) == POLL_IN) {
773                 handle_route_event(tbase);
774         }
775         return ret;
776 }
777
778 static void init_task_master(struct task_base *tbase, struct task_args *targs)
779 {
780         if (prox_cfg.flags & DSF_CTRL_PLANE_ENABLED) {
781                 struct task_master *task = (struct task_master *)tbase;
782
783                 task->ctrl_rx_ring = targs->lconf->ctrl_rings_p[0];
784                 task->ctrl_tx_rings = ctrl_rings;
785                 init_ctrl_plane(tbase);
786                 handle_ctrl_plane = handle_ctrl_plane_f;
787         }
788 }
789
790 static struct task_init task_init_master = {
791         .mode_str = "master",
792         .init = init_task_master,
793         .handle = NULL,
794         .flag_features = TASK_FEATURE_NEVER_DISCARDS,
795         .size = sizeof(struct task_master)
796 };
797
798 __attribute__((constructor)) static void reg_task_gen(void)
799 {
800         reg_task(&task_init_master);
801 }