Fix ctrlplane handling vdev
[samplevnf.git] / VNFs / DPPD-PROX / handle_master.c
1 /*
2 // Copyright (c) 2010-2020 Intel Corporation
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 */
16
17 #include <fcntl.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <linux/netlink.h>
21 #include <linux/rtnetlink.h>
22 #include <poll.h>
23
24 #include <rte_hash.h>
25 #include <rte_hash_crc.h>
26 #include <rte_ether.h>
27
28 #include "prox_cfg.h"
29 #include "prox_globals.h"
30 #include "rx_pkt.h"
31 #include "arp.h"
32 #include "handle_master.h"
33 #include "log.h"
34 #include "mbuf_utils.h"
35 #include "etypes.h"
36 #include "defaults.h"
37 #include "prox_malloc.h"
38 #include "quit.h"
39 #include "task_init.h"
40 #include "prox_port_cfg.h"
41 #include "main.h"
42 #include "lconf.h"
43 #include "input.h"
44 #include "tx_pkt.h"
45 #include "defines.h"
46
47 #define PROX_MAX_ARP_REQUESTS   32      // Maximum number of tasks requesting the same MAC address
48 #define NETLINK_BUF_SIZE        16384
49
50 static char netlink_buf[NETLINK_BUF_SIZE];
51
52 const char *actions_string[] = {
53         "UPDATE_FROM_CTRL",             // Controlplane sending a MAC update to dataplane
54         "SEND_ARP_REQUEST_FROM_CTRL",   // Controlplane requesting dataplane to send ARP request
55         "SEND_ARP_REPLY_FROM_CTRL",     // Controlplane requesting dataplane to send ARP reply
56         "SEND_ICMP_FROM_CTRL",          // Controlplane requesting dataplane to send ICMP message
57         "ARP_TO_CTRL",                  // ARP sent by datplane to Controlpane for handling
58         "ICMP_TO_CTRL",                 // ICMP sent by datplane to Controlpane for handling
59         "REQ_MAC_TO_CTRL",              // Dataplane requesting MAC resolution to Controlplane
60         "PKT_FROM_TAP"                  // Packet received by Controlplane from kernel and forwarded to dataplane for sending
61 };
62
63 static struct my_arp_t arp_reply = {
64         .htype = 0x100,
65         .ptype = 8,
66         .hlen = 6,
67         .plen = 4,
68         .oper = 0x200
69 };
70 static struct my_arp_t arp_request = {
71         .htype = 0x100,
72         .ptype = 8,
73         .hlen = 6,
74         .plen = 4,
75         .oper = 0x100
76 };
77
78 struct ip_table {
79         prox_rte_ether_addr     mac;
80         struct rte_ring         *ring;
81 };
82
83 struct external_ip_table {
84         prox_rte_ether_addr     mac;
85         struct rte_ring         *rings[PROX_MAX_ARP_REQUESTS];
86         uint16_t                nb_requests;
87 };
88
89 struct port_table {
90         prox_rte_ether_addr     mac;
91         struct rte_ring         *ring;
92         uint32_t                ip;
93         uint8_t                 port;
94         uint8_t                 flags;
95         uint64_t last_echo_req_rcvd_tsc;
96         uint64_t last_echo_rep_rcvd_tsc;
97         uint32_t n_echo_req;
98         uint32_t n_echo_rep;
99 };
100
101 struct task_master {
102         struct task_base base;
103         struct rte_ring *ctrl_rx_ring;
104         struct rte_ring **ctrl_tx_rings;
105         struct ip_table *internal_ip_table;
106         struct external_ip_table *external_ip_table;
107         struct rte_hash  *external_ip_hash;
108         struct rte_hash  *internal_ip_hash;
109         struct port_table internal_port_table[PROX_MAX_PORTS];
110         struct vdev all_vdev[PROX_MAX_PORTS];
111         int max_vdev_id;
112         struct pollfd arp_fds;
113 };
114
115 struct ip_port {
116         uint32_t ip;
117         uint8_t port;
118 } __attribute__((packed));
119
120 static inline uint8_t get_command(struct rte_mbuf *mbuf)
121 {
122         return mbuf->udata64 & 0xFF;
123 }
124 static inline uint8_t get_task(struct rte_mbuf *mbuf)
125 {
126         return (mbuf->udata64 >> 8) & 0xFF;
127 }
128 static inline uint8_t get_core(struct rte_mbuf *mbuf)
129 {
130         return (mbuf->udata64 >> 16) & 0xFF;
131 }
132 static inline uint8_t get_port(struct rte_mbuf *mbuf)
133 {
134         return mbuf->port;
135 }
136 static inline uint32_t get_ip(struct rte_mbuf *mbuf)
137 {
138         return (mbuf->udata64 >> 32) & 0xFFFFFFFF;
139 }
140
141 void master_init_vdev(struct task_base *tbase, uint8_t port_id, uint8_t core_id, uint8_t task_id)
142 {
143         struct task_master *task = (struct task_master *)tbase;
144         uint8_t vdev_port = prox_port_cfg[port_id].dpdk_mapping;
145         int rc, i;
146         if (vdev_port != NO_VDEV_PORT) {
147                 for (i = 0; i < task->max_vdev_id; i++) {
148                         if (task->all_vdev[i].port_id == vdev_port)
149                                 break;
150                 }
151                 if (i <  task->max_vdev_id) {
152                         // Already initialized (e.g. by another core handling the same port).
153                         return;
154                 }
155                 task->all_vdev[task->max_vdev_id].port_id = vdev_port;
156                 task->all_vdev[task->max_vdev_id].ring = task->ctrl_tx_rings[core_id * MAX_TASKS_PER_CORE + task_id];
157
158                 struct sockaddr_in dst, src;
159                 src.sin_family = AF_INET;
160                 src.sin_addr.s_addr = prox_port_cfg[vdev_port].ip;
161                 src.sin_port = rte_cpu_to_be_16(5000);
162
163                 int fd = socket(AF_INET,  SOCK_DGRAM, 0);
164                 PROX_PANIC(fd < 0, "Failed to open socket(AF_INET,  SOCK_DGRAM, 0)\n");
165                 prox_port_cfg[vdev_port].fd = fd;
166                 rc = bind(fd,(struct sockaddr *)&src, sizeof(struct sockaddr_in));
167                 PROX_PANIC(rc, "Failed to bind("IPv4_BYTES_FMT":%d): errno = %d\n", IPv4_BYTES(((uint8_t*)&src.sin_addr.s_addr)), src.sin_port, errno);
168                 plog_info("DPDK port %d bound("IPv4_BYTES_FMT":%d) to fd %d\n", port_id, IPv4_BYTES(((uint8_t*)&src.sin_addr.s_addr)), src.sin_port, fd);
169                 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
170                 task->max_vdev_id++;
171         }
172 }
173
174 void register_ip_to_ctrl_plane(struct task_base *tbase, uint32_t ip, uint8_t port_id, uint8_t core_id, uint8_t task_id)
175 {
176         struct task_master *task = (struct task_master *)tbase;
177         struct ip_port key;
178         plogx_info("\tregistering IP "IPv4_BYTES_FMT" with port %d core %d and task %d\n", IP4(ip), port_id, core_id, task_id);
179
180         if (port_id >= PROX_MAX_PORTS) {
181                 plog_err("Unable to register ip "IPv4_BYTES_FMT", port %d\n", IP4(ip), port_id);
182                 return;
183         }
184
185         /* TODO - stoe multiple rings if multiple cores able to handle IP
186            Remove them when such cores are stopped and de-register IP
187         */
188         task->internal_port_table[port_id].ring = task->ctrl_tx_rings[core_id * MAX_TASKS_PER_CORE + task_id];
189         memcpy(&task->internal_port_table[port_id].mac, &prox_port_cfg[port_id].eth_addr, sizeof(prox_rte_ether_addr));
190         task->internal_port_table[port_id].ip = ip;
191
192         if (ip == RANDOM_IP) {
193                 task->internal_port_table[port_id].flags |= HANDLE_RANDOM_IP_FLAG;
194                 return;
195         }
196
197         key.ip = ip;
198         key.port = port_id;
199         int ret = rte_hash_add_key(task->internal_ip_hash, (const void *)&key);
200         if (unlikely(ret < 0)) {
201                 plog_err("Unable to register ip "IPv4_BYTES_FMT"\n", IP4(ip));
202                 return;
203         }
204         memcpy(&task->internal_ip_table[ret].mac, &prox_port_cfg[port_id].eth_addr, sizeof(prox_rte_ether_addr));
205         task->internal_ip_table[ret].ring = task->ctrl_tx_rings[core_id * MAX_TASKS_PER_CORE + task_id];
206
207 }
208
209 static inline void handle_arp_reply(struct task_base *tbase, struct rte_mbuf *mbuf)
210 {
211         struct task_master *task = (struct task_master *)tbase;
212         struct ether_hdr_arp *hdr_arp = rte_pktmbuf_mtod(mbuf, struct ether_hdr_arp *);
213         int i, ret;
214         uint32_t key = hdr_arp->arp.data.spa;
215         plogx_dbg("\tMaster handling ARP reply for ip "IPv4_BYTES_FMT"\n", IP4(key));
216
217         ret = rte_hash_lookup(task->external_ip_hash, (const void *)&key);
218         if (unlikely(ret < 0)) {
219                 // entry not found for this IP: we did not ask a request, delete the reply
220                 tx_drop(mbuf);
221         } else {
222                 // entry found for this IP
223                 uint16_t nb_requests = task->external_ip_table[ret].nb_requests;
224                 // If we receive a request from multiple task for the same IP, then we update all tasks
225                 if (task->external_ip_table[ret].nb_requests) {
226                         rte_mbuf_refcnt_set(mbuf, nb_requests);
227                         for (int i = 0; i < nb_requests; i++) {
228                                 struct rte_ring *ring = task->external_ip_table[ret].rings[i];
229                                 tx_ring_ip(tbase, ring, UPDATE_FROM_CTRL, mbuf, key);
230                         }
231                         task->external_ip_table[ret].nb_requests = 0;
232                 } else {
233                         tx_drop(mbuf);
234                 }
235         }
236 }
237
238 static inline void handle_arp_request(struct task_base *tbase, struct rte_mbuf *mbuf)
239 {
240         struct task_master *task = (struct task_master *)tbase;
241         struct ether_hdr_arp *hdr_arp = rte_pktmbuf_mtod(mbuf, struct ether_hdr_arp *);
242         int i, ret;
243         uint8_t port = get_port(mbuf);
244
245         struct ip_port key;
246         key.ip = hdr_arp->arp.data.tpa;
247         key.port = port;
248         if (task->internal_port_table[port].flags & HANDLE_RANDOM_IP_FLAG) {
249                 prox_rte_ether_addr mac;
250                 plogx_dbg("\tMaster handling ARP request for ip "IPv4_BYTES_FMT" on port %d which supports random ip\n", IP4(key.ip), key.port);
251                 struct rte_ring *ring = task->internal_port_table[port].ring;
252                 create_mac(hdr_arp, &mac);
253                 mbuf->ol_flags &= ~(PKT_TX_IP_CKSUM|PKT_TX_UDP_CKSUM);
254                 build_arp_reply(hdr_arp, &mac);
255                 tx_ring(tbase, ring, ARP_REPLY_FROM_CTRL, mbuf);
256                 return;
257         }
258
259         plogx_dbg("\tMaster handling ARP request for ip "IPv4_BYTES_FMT"\n", IP4(key.ip));
260
261         ret = rte_hash_lookup(task->internal_ip_hash, (const void *)&key);
262         if (unlikely(ret < 0)) {
263                 // entry not found for this IP.
264                 plogx_dbg("Master ignoring ARP REQUEST received on un-registered IP "IPv4_BYTES_FMT" on port %d\n", IP4(hdr_arp->arp.data.tpa), port);
265                 tx_drop(mbuf);
266         } else {
267                 struct rte_ring *ring = task->internal_ip_table[ret].ring;
268                 mbuf->ol_flags &= ~(PKT_TX_IP_CKSUM|PKT_TX_UDP_CKSUM);
269                 build_arp_reply(hdr_arp, &task->internal_ip_table[ret].mac);
270                 tx_ring(tbase, ring, ARP_REPLY_FROM_CTRL, mbuf);
271         }
272 }
273
274 static inline int record_request(struct task_base *tbase, uint32_t ip_dst, uint8_t port, struct rte_ring *ring)
275 {
276         struct task_master *task = (struct task_master *)tbase;
277         int ret = rte_hash_add_key(task->external_ip_hash, (const void *)&ip_dst);
278         int i;
279
280         if (unlikely(ret < 0)) {
281                 // entry not found for this IP: delete the reply
282                 plogx_dbg("Unable to add IP "IPv4_BYTES_FMT" in external_ip_hash\n", IP4(ip_dst));
283                 return -1;
284         }
285
286         // If multiple tasks requesting the same info, we will need to send a reply to all of them
287         // However if one task sends multiple requests to the same IP (e.g. because it is not answering)
288         // then we should not send multiple replies to the same task
289         if (task->external_ip_table[ret].nb_requests >= PROX_MAX_ARP_REQUESTS) {
290                 // This can only happen if really many tasks requests the same IP
291                 plogx_dbg("Unable to add request for IP "IPv4_BYTES_FMT" in external_ip_table\n", IP4(ip_dst));
292                 return -1;
293         }
294         for (i = 0; i < task->external_ip_table[ret].nb_requests; i++) {
295                 if (task->external_ip_table[ret].rings[i] == ring)
296                         break;
297         }
298         if (i >= task->external_ip_table[ret].nb_requests) {
299                 // If this is a new request i.e. a new task requesting a new IP
300                 task->external_ip_table[ret].rings[task->external_ip_table[ret].nb_requests] = ring;
301                 task->external_ip_table[ret].nb_requests++;
302         }
303         return 0;
304 }
305
306 static inline void handle_unknown_ip(struct task_base *tbase, struct rte_mbuf *mbuf)
307 {
308         struct task_master *task = (struct task_master *)tbase;
309         struct ether_hdr_arp *hdr_arp = rte_pktmbuf_mtod(mbuf, struct ether_hdr_arp *);
310         uint8_t port = get_port(mbuf);
311         uint32_t ip_dst = get_ip(mbuf);
312
313         plogx_dbg("\tMaster handling unknown ip "IPv4_BYTES_FMT" for port %d\n", IP4(ip_dst), port);
314         if (unlikely(port >= PROX_MAX_PORTS)) {
315                 plogx_dbg("Port %d not found", port);
316                 tx_drop(mbuf);
317                 return;
318         }
319         uint32_t ip_src = task->internal_port_table[port].ip;
320         struct rte_ring *ring = task->ctrl_tx_rings[get_core(mbuf) * MAX_TASKS_PER_CORE + get_task(mbuf)];
321
322         if (ring == NULL) {
323                 plogx_dbg("Port %d not registered", port);
324                 tx_drop(mbuf);
325                 return;
326         }
327
328         if (record_request(tbase, ip_dst, port, ring) < 0) {
329                 tx_drop(mbuf);
330                 return;
331         }
332         // We send an ARP request even if one was just sent (and not yet answered) by another task
333         mbuf->ol_flags &= ~(PKT_TX_IP_CKSUM|PKT_TX_UDP_CKSUM);
334         build_arp_request(mbuf, &task->internal_port_table[port].mac, ip_dst, ip_src);
335         tx_ring(tbase, ring, ARP_REQ_FROM_CTRL, mbuf);
336 }
337
338 static inline void build_icmp_reply_message(struct task_base *tbase, struct rte_mbuf *mbuf)
339 {
340         struct task_master *task = (struct task_master *)tbase;
341         struct ip_port key;
342         key.port = mbuf->port;
343         prox_rte_ether_hdr *hdr = rte_pktmbuf_mtod(mbuf, prox_rte_ether_hdr *);
344         prox_rte_ether_addr dst_mac;
345         prox_rte_ether_addr_copy(&hdr->s_addr, &dst_mac);
346         prox_rte_ether_addr_copy(&hdr->d_addr, &hdr->s_addr);
347         prox_rte_ether_addr_copy(&dst_mac, &hdr->d_addr);
348         prox_rte_ipv4_hdr *ip_hdr = (prox_rte_ipv4_hdr *)(hdr + 1);
349         key.ip = ip_hdr->dst_addr;
350         ip_hdr->dst_addr = ip_hdr->src_addr;
351         ip_hdr->src_addr = key.ip;
352         prox_rte_icmp_hdr *picmp = (prox_rte_icmp_hdr *)(ip_hdr + 1);
353         picmp->icmp_type = PROX_RTE_IP_ICMP_ECHO_REPLY;
354
355         int ret = rte_hash_lookup(task->internal_ip_hash, (const void *)&key);
356         if (unlikely(ret < 0)) {
357                 // entry not found for this IP.
358                 plogx_dbg("Master ignoring ICMP received on un-registered IP "IPv4_BYTES_FMT" on port %d\n", IP4(key.ip), mbuf->port);
359                 tx_drop(mbuf);
360         } else {
361                 struct rte_ring *ring = task->internal_ip_table[ret].ring;
362                 mbuf->ol_flags &= ~(PKT_TX_IP_CKSUM|PKT_TX_UDP_CKSUM);
363                 tx_ring(tbase, ring, ICMP_FROM_CTRL, mbuf);
364         }
365 }
366
367 static inline void handle_icmp(struct task_base *tbase, struct rte_mbuf *mbuf)
368 {
369         struct task_master *task = (struct task_master *)tbase;
370         uint8_t port_id = mbuf->port;
371         struct port_table *port = &task->internal_port_table[port_id];
372         prox_rte_ether_hdr *hdr = rte_pktmbuf_mtod(mbuf, prox_rte_ether_hdr *);
373         if (hdr->ether_type != ETYPE_IPv4) {
374                 tx_drop(mbuf);
375                 return;
376         }
377         prox_rte_ipv4_hdr *ip_hdr = (prox_rte_ipv4_hdr *)(hdr + 1);
378         if (ip_hdr->next_proto_id != IPPROTO_ICMP) {
379                 tx_drop(mbuf);
380                 return;
381         }
382         if (ip_hdr->dst_addr != port->ip) {
383                 tx_drop(mbuf);
384                 return;
385         }
386
387         prox_rte_icmp_hdr *picmp = (prox_rte_icmp_hdr *)(ip_hdr + 1);
388         uint8_t type = picmp->icmp_type;
389         if (type == PROX_RTE_IP_ICMP_ECHO_REQUEST) {
390                 port->n_echo_req++;
391                 if (rte_rdtsc() - port->last_echo_req_rcvd_tsc > rte_get_tsc_hz()) {
392                         plog_dbg("Received %u Echo Request on IP "IPv4_BYTES_FMT" (last received from IP "IPv4_BYTES_FMT")\n", port->n_echo_req, IPv4_BYTES(((uint8_t*)&ip_hdr->dst_addr)), IPv4_BYTES(((uint8_t*)&ip_hdr->src_addr)));
393                         port->n_echo_req = 0;
394                         port->last_echo_req_rcvd_tsc = rte_rdtsc();
395                 }
396                 build_icmp_reply_message(tbase, mbuf);
397         } else if (type == PROX_RTE_IP_ICMP_ECHO_REPLY) {
398                 port->n_echo_rep++;
399                 if (rte_rdtsc() - port->last_echo_rep_rcvd_tsc > rte_get_tsc_hz()) {
400                         plog_info("Received %u Echo Reply on IP "IPv4_BYTES_FMT" (last received from IP "IPv4_BYTES_FMT")\n", port->n_echo_rep, IPv4_BYTES(((uint8_t*)&ip_hdr->dst_addr)), IPv4_BYTES(((uint8_t*)&ip_hdr->src_addr)));
401                         port->n_echo_rep = 0;
402                         port->last_echo_rep_rcvd_tsc = rte_rdtsc();
403                 }
404         }
405         tx_drop(mbuf);
406         return;
407 }
408
409 static inline void handle_message(struct task_base *tbase, struct rte_mbuf *mbuf, int ring_id)
410 {
411         struct task_master *task = (struct task_master *)tbase;
412         struct ether_hdr_arp *hdr_arp = rte_pktmbuf_mtod(mbuf, struct ether_hdr_arp *);
413         int command = get_command(mbuf);
414         uint8_t port = get_port(mbuf);
415         uint32_t ip;
416         uint8_t vdev_port = prox_port_cfg[port].dpdk_mapping;
417         plogx_dbg("\tMaster received %s (%x) from mbuf %p\n", actions_string[command], command, mbuf);
418
419         switch(command) {
420         case ICMP_TO_CTRL:
421                 if (vdev_port != NO_VDEV_PORT) {
422                         // If a virtual (net_tap) device is attached, send the (PING) packet to this device
423                         // The kernel will receive and handle it.
424                         plogx_dbg("\tMaster forwarding packet to TAP\n");
425                         int n = rte_eth_tx_burst(prox_port_cfg[port].dpdk_mapping, 0, &mbuf, 1);
426                         return;
427                 }
428                 handle_icmp(tbase, mbuf);
429                 break;
430         case ARP_TO_CTRL:
431                 if (vdev_port != NO_VDEV_PORT) {
432                         // If a virtual (net_tap) device is attached, send the (ARP) packet to this device
433                         // The kernel will receive and handle it.
434                         plogx_dbg("\tMaster forwarding packet to TAP\n");
435                         int n = rte_eth_tx_burst(prox_port_cfg[port].dpdk_mapping, 0, &mbuf, 1);
436                         return;
437                 }
438                 if (hdr_arp->ether_hdr.ether_type != ETYPE_ARP) {
439                         plog_err("\tUnexpected message received: ARP_TO_CTRL with ether_type %x\n", hdr_arp->ether_hdr.ether_type);
440                         tx_drop(mbuf);
441                         return;
442                 } else if (arp_is_gratuitous(hdr_arp)) {
443                         plog_info("\tReceived gratuitous packet \n");
444                         tx_drop(mbuf);
445                         return;
446                 } else if (memcmp(&hdr_arp->arp, &arp_reply, 8) == 0) {
447                         uint32_t ip = hdr_arp->arp.data.spa;
448                         handle_arp_reply(tbase, mbuf);
449                 } else if (memcmp(&hdr_arp->arp, &arp_request, 8) == 0) {
450                         handle_arp_request(tbase, mbuf);
451                 } else {
452                         plog_info("\tReceived unexpected ARP operation %d\n", hdr_arp->arp.oper);
453                         tx_drop(mbuf);
454                         return;
455                 }
456                 break;
457         case REQ_MAC_TO_CTRL:
458                 if (vdev_port != NO_VDEV_PORT) {
459                         // We send a packet to the kernel with the proper destnation IP address and our src IP address
460                         // This means that if a generator sends packets from many sources all ARP will still
461                         // be sent from the same IP src. This might be a limitation.
462                         // This prevent to have to open as many sockets as there are sources MAC addresses
463                         // We also always use the same UDP ports - as the packet will finally not leave the system anyhow
464
465                         struct ether_hdr_arp *hdr_arp = rte_pktmbuf_mtod(mbuf, struct ether_hdr_arp *);
466                         uint32_t ip = get_ip(mbuf);
467                         struct rte_ring *ring = task->ctrl_tx_rings[get_core(mbuf) * MAX_TASKS_PER_CORE + get_task(mbuf)];
468
469                         // First check whether MAC address is not already in kernel MAC table.
470                         // If present in our hash with a non-null MAC, then present in kernel. A null MAC
471                         // might just mean that we sent a request.
472                         // If MAC present in kernel, do not send a packet towards the kernel to try to generate
473                         // an ARP request, as the kernel would not generate it.
474                         int ret = rte_hash_lookup(task->external_ip_hash, (const void *)&ip);
475                         if ((ret >= 0) && (!prox_rte_is_zero_ether_addr(&task->external_ip_table[ret].mac))) {
476                                 memcpy(&hdr_arp->arp.data.sha, &task->external_ip_table[ret].mac, sizeof(prox_rte_ether_addr));
477                                 plogx_dbg("\tMaster ready to send UPDATE_FROM_CTRL ip "IPv4_BYTES_FMT" with mac "MAC_BYTES_FMT"\n",
478                                         IP4(ip), MAC_BYTES(hdr_arp->arp.data.sha.addr_bytes));
479                                 tx_ring_ip(tbase, ring, UPDATE_FROM_CTRL, mbuf, ip);
480                                 return;
481                         }
482
483                         struct sockaddr_in dst;
484                         dst.sin_family = AF_INET;
485                         dst.sin_addr.s_addr = ip;
486                         dst.sin_port = rte_cpu_to_be_16(5000);
487                         int n = sendto(prox_port_cfg[vdev_port].fd, (char*)(&ip), 0, 0,  (struct sockaddr *)&dst, sizeof(struct sockaddr_in));
488                         plogx_dbg("\tSent %d bytes to TAP IP "IPv4_BYTES_FMT" using fd %d\n", n, IPv4_BYTES(((uint8_t*)&ip)), prox_port_cfg[vdev_port].fd);
489
490                         record_request(tbase, ip, port, ring);
491                         tx_drop(mbuf);
492                         break;
493                 }
494                 handle_unknown_ip(tbase, mbuf);
495                 break;
496         default:
497                 plogx_dbg("\tMaster received unexpected message\n");
498                 tx_drop(mbuf);
499                 break;
500         }
501 }
502
503 void init_ctrl_plane(struct task_base *tbase)
504 {
505         prox_cfg.flags |= DSF_CTRL_PLANE_ENABLED;
506         struct task_master *task = (struct task_master *)tbase;
507         int socket_id = rte_lcore_to_socket_id(prox_cfg.master);
508         uint32_t n_entries = MAX_ARP_ENTRIES * 4;
509         static char hash_name[30];
510
511         sprintf(hash_name, "A%03d_hash_arp_table", prox_cfg.master);
512         struct rte_hash_parameters hash_params = {
513                 .name = hash_name,
514                 .entries = n_entries,
515                 .key_len = sizeof(uint32_t),
516                 .hash_func = rte_hash_crc,
517                 .hash_func_init_val = 0,
518         };
519         task->external_ip_hash = rte_hash_create(&hash_params);
520         PROX_PANIC(task->external_ip_hash == NULL, "Failed to set up external ip hash\n");
521         plog_info("\texternal ip hash table allocated, with %d entries of size %d\n", hash_params.entries, hash_params.key_len);
522         task->external_ip_table = (struct external_ip_table *)prox_zmalloc(n_entries * sizeof(struct external_ip_table), socket_id);
523         PROX_PANIC(task->external_ip_table == NULL, "Failed to allocate memory for %u entries in external ip table\n", n_entries);
524         plog_info("\texternal ip table, with %d entries of size %ld\n", n_entries, sizeof(struct external_ip_table));
525
526         hash_name[0]++;
527         hash_params.key_len = sizeof(struct ip_port);
528         task->internal_ip_hash = rte_hash_create(&hash_params);
529         PROX_PANIC(task->internal_ip_hash == NULL, "Failed to set up internal ip hash\n");
530         plog_info("\tinternal ip hash table allocated, with %d entries of size %d\n", hash_params.entries, hash_params.key_len);
531         task->internal_ip_table = (struct ip_table *)prox_zmalloc(n_entries * sizeof(struct ip_table), socket_id);
532         PROX_PANIC(task->internal_ip_table == NULL, "Failed to allocate memory for %u entries in internal ip table\n", n_entries);
533         plog_info("\tinternal ip table, with %d entries of size %ld\n", n_entries, sizeof(struct ip_table));
534
535         int fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE);
536         PROX_PANIC(fd < 0, "Failed to open netlink socket: %d\n", errno);
537         fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
538
539         struct sockaddr_nl sockaddr;
540         memset(&sockaddr, 0, sizeof(struct sockaddr_nl));
541         sockaddr.nl_family = AF_NETLINK;
542         sockaddr.nl_groups = RTMGRP_NEIGH | RTMGRP_NOTIFY;
543         int rc = bind(fd, (struct sockaddr *)&sockaddr, sizeof(struct sockaddr_nl));
544         PROX_PANIC(rc < 0, "Failed to bind to RTMGRP_NEIGH netlink group\n");
545         task->arp_fds.fd = fd;
546         task->arp_fds.events = POLL_IN;
547         plog_info("\tRTMGRP_NEIGH netlink group bound; fd = %d\n", fd);
548         static char name[] = "master_arp_pool";
549         const int NB_ARP_MBUF = 1024;
550         const int ARP_MBUF_SIZE = 2048;
551         const int NB_CACHE_ARP_MBUF = 256;
552         struct rte_mempool *ret = rte_mempool_create(name, NB_ARP_MBUF, ARP_MBUF_SIZE, NB_CACHE_ARP_MBUF,
553                 sizeof(struct rte_pktmbuf_pool_private), rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, 0,
554                 rte_socket_id(), 0);
555         PROX_PANIC(ret == NULL, "Failed to allocate ARP memory pool on socket %u with %u elements\n",
556                 rte_socket_id(), NB_ARP_MBUF);
557         plog_info("\t\tMempool %p (%s) size = %u * %u cache %u, socket %d\n", ret, name, NB_ARP_MBUF,
558                 ARP_MBUF_SIZE, NB_CACHE_ARP_MBUF, rte_socket_id());
559         tbase->l3.arp_pool = ret;
560 }
561
562 static int handle_ctrl_plane_f(struct task_base *tbase, __attribute__((unused)) struct rte_mbuf **mbuf, uint16_t n_pkts)
563 {
564         int ring_id = 0, j, ret = 0, n = 0;
565         struct rte_mbuf *mbufs[MAX_RING_BURST];
566         struct task_master *task = (struct task_master *)tbase;
567
568         /*      Handle_master works differently than other handle functions
569                 It is not handled by a DPDK dataplane core
570                 It is no thread_generic based, hence do not receive packets the same way
571         */
572
573         ret = ring_deq(task->ctrl_rx_ring, mbufs);
574         for (j = 0; j < ret; j++) {
575                 handle_message(tbase, mbufs[j], ring_id);
576         }
577         for (int vdev_id = 0; vdev_id < task->max_vdev_id; vdev_id++) {
578                 struct vdev *vdev = &task->all_vdev[vdev_id];
579                 n = rte_eth_rx_burst(vdev->port_id, 0, mbufs, MAX_PKT_BURST);
580                 for (j = 0; j < n; j++) {
581                         tx_ring(tbase, vdev->ring, PKT_FROM_TAP, mbufs[j]);
582                 }
583                 ret +=n;
584         }
585         if ((task->max_vdev_id) && (poll(&task->arp_fds, 1, prox_cfg.poll_timeout) == POLL_IN)) {
586                 struct nlmsghdr * nl_hdr;
587                 int fd = task->arp_fds.fd;
588                 int len;
589                 uint32_t ip = 0;
590                 prox_rte_ether_addr mac;
591                 memset(&mac, 0, sizeof(mac));
592                 len = recv(fd, netlink_buf, sizeof(netlink_buf), 0);
593                 if (len < 0) {
594                         plog_err("Failed to recv from netlink: %d\n", errno);
595                         return ret;
596                 }
597                 nl_hdr = (struct nlmsghdr *)netlink_buf;
598                 if (nl_hdr->nlmsg_flags & NLM_F_MULTI) {
599                         plog_err("Unexpected multipart netlink message\n");
600                         return ret;
601                 }
602                 if ((nl_hdr->nlmsg_type != RTM_NEWNEIGH) && (nl_hdr->nlmsg_type != RTM_DELNEIGH))
603                         return ret;
604
605                 struct ndmsg *ndmsg = (struct ndmsg *)NLMSG_DATA(nl_hdr);
606                 int ndm_family = ndmsg->ndm_family;
607                 struct rtattr *rta = (struct rtattr *)RTM_RTA(ndmsg);
608                 int rtl = RTM_PAYLOAD(nl_hdr);
609                 for (; RTA_OK(rta, rtl); rta = RTA_NEXT(rta, rtl)) {
610                         switch (rta->rta_type) {
611                         case NDA_DST:
612                                 ip = *((uint32_t *)RTA_DATA(rta));
613                                 break;
614                         case NDA_LLADDR:
615                                 mac = *((prox_rte_ether_addr *)(uint64_t *)RTA_DATA(rta));
616                                 break;
617                         default:
618                                 break;
619                         }
620                 }
621                 int idx = rte_hash_lookup(task->external_ip_hash, (const void *)&ip);
622                 if (unlikely(idx < 0)) {
623                         // entry not found for this IP: we did not ask a request.
624                         // This can happen if the kernel updated the ARP table when receiving an ARP_REQUEST
625                         // We must record this, as the ARP entry is now in the kernel table
626                         if (prox_rte_is_zero_ether_addr(&mac)) {
627                                 // Timeout or MAC deleted from kernel MAC table
628                                 idx = rte_hash_del_key(task->external_ip_hash, (const void *)&ip);
629                                 plogx_dbg("ip "IPv4_BYTES_FMT" removed from external_ip_hash\n", IP4(ip));
630                                 return ret;
631                         }
632                         idx = rte_hash_add_key(task->external_ip_hash, (const void *)&ip);
633                         if (unlikely(idx < 0)) {
634                                 // entry not found for this IP: Ignore the reply. This can happen for instance for
635                                 // an IP used by management plane.
636                                 plogx_dbg("IP "IPv4_BYTES_FMT" not found in external_ip_hash and unable to add it\n", IP4(ip));
637                                 return ret;
638                         }
639                         memcpy(&task->external_ip_table[idx].mac, &mac, sizeof(prox_rte_ether_addr));
640                         plogx_dbg("ip "IPv4_BYTES_FMT" added in external_ip_hash with mac "MAC_BYTES_FMT"\n", IP4(ip), MAC_BYTES(mac.addr_bytes));
641                         return ret;
642                 }
643
644                 // entry found for this IP
645                 uint16_t nb_requests = task->external_ip_table[idx].nb_requests;
646                 if (nb_requests == 0) {
647                         return ret;
648                 }
649
650                 memcpy(&task->external_ip_table[idx].mac, &mac, sizeof(prox_rte_ether_addr));
651
652                 // If we receive a request from multiple task for the same IP, then we update all tasks
653                 if (unlikely(rte_mempool_get(tbase->l3.arp_pool, (void **)mbufs) != 0)) {
654                         plog_err("Unable to allocate a mbuf for master to core communication\n");
655                         return ret;
656                 }
657                 rte_mbuf_refcnt_set(mbufs[0], nb_requests);
658                 for (int i = 0; i < nb_requests; i++) {
659                         struct rte_ring *ring = task->external_ip_table[idx].rings[i];
660                         struct ether_hdr_arp *hdr = rte_pktmbuf_mtod(mbufs[0], struct ether_hdr_arp *);
661                         memcpy(&hdr->arp.data.sha, &mac, sizeof(prox_rte_ether_addr));
662                         tx_ring_ip(tbase, ring, UPDATE_FROM_CTRL, mbufs[0], ip);
663                         plog_dbg("UPDATE_FROM_CTRL ip "IPv4_BYTES_FMT" with mac "MAC_BYTES_FMT"\n", IP4(ip), MAC_BYTES(mac.addr_bytes));
664                 }
665                 task->external_ip_table[idx].nb_requests = 0;
666         }
667         return ret;
668 }
669
670 static void init_task_master(struct task_base *tbase, struct task_args *targs)
671 {
672         if (prox_cfg.flags & DSF_CTRL_PLANE_ENABLED) {
673                 struct task_master *task = (struct task_master *)tbase;
674
675                 task->ctrl_rx_ring = targs->lconf->ctrl_rings_p[0];
676                 task->ctrl_tx_rings = ctrl_rings;
677                 init_ctrl_plane(tbase);
678                 handle_ctrl_plane = handle_ctrl_plane_f;
679         }
680 }
681
682 static struct task_init task_init_master = {
683         .mode_str = "master",
684         .init = init_task_master,
685         .handle = NULL,
686         .flag_features = TASK_FEATURE_NEVER_DISCARDS,
687         .size = sizeof(struct task_master)
688 };
689
690 __attribute__((constructor)) static void reg_task_gen(void)
691 {
692         reg_task(&task_init_master);
693 }