Adding node interface, SNAT and OVN Node switch port
[ovn4nfv-k8s-plugin.git] / cmd / nfn-agent / nfn-agent.go
1 package main
2
3 import (
4         "context"
5         "fmt"
6         "io"
7         "os"
8         "os/signal"
9         cs "ovn4nfv-k8s-plugin/internal/pkg/cniserver"
10         pb "ovn4nfv-k8s-plugin/internal/pkg/nfnNotify/proto"
11         "ovn4nfv-k8s-plugin/internal/pkg/ovn"
12         "strings"
13         "syscall"
14         "time"
15
16         "google.golang.org/grpc"
17         "k8s.io/client-go/kubernetes"
18         "k8s.io/client-go/rest"
19         kexec "k8s.io/utils/exec"
20         logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
21
22         //"google.golang.org/grpc/keepalive"
23
24         "ovn4nfv-k8s-plugin/cmd/ovn4nfvk8s-cni/app"
25
26         "google.golang.org/grpc/status"
27         "sigs.k8s.io/controller-runtime/pkg/log/zap"
28 )
29
30 var log = logf.Log.WithName("nfn-agent")
31 var errorChannel chan string
32 var inSync bool
33 var pnCreateStore []*pb.Notification_ProviderNwCreate
34
35 // subscribe Notifications
36 func subscribeNotif(client pb.NfnNotifyClient) error {
37         log.Info("Subscribe Notification from server")
38         ctx := context.Background()
39         var n pb.SubscribeContext
40         n.NodeName = os.Getenv("NFN_NODE_NAME")
41         for {
42                 stream, err := client.Subscribe(ctx, &n, grpc.WaitForReady(true))
43                 if err != nil {
44                         log.Error(err, "Subscribe", "client", client, "status", status.Code(err))
45                         continue
46                 }
47                 log.Info("Subscribe Notification success")
48
49                 for {
50                         in, err := stream.Recv()
51                         if err == io.EOF {
52                                 // read done.
53                                 shutDownAgent("Stream closed")
54                                 return err
55                         }
56                         if err != nil {
57                                 log.Error(err, "Stream closed from server")
58                                 shutDownAgent("Stream closed from server")
59                                 return err
60                         }
61                         log.Info("Got message", "msg", in)
62                         handleNotif(in)
63                 }
64         }
65 }
66
67 func createVlanProvidernetwork(payload *pb.Notification_ProviderNwCreate) error {
68         var err error
69         vlanID := payload.ProviderNwCreate.GetVlan().GetVlanId()
70         ln := payload.ProviderNwCreate.GetVlan().GetLogicalIntf()
71         pn := payload.ProviderNwCreate.GetVlan().GetProviderIntf()
72         name := payload.ProviderNwCreate.GetProviderNwName()
73         if ln == "" {
74                 ln = name + "." + vlanID
75         }
76         err = ovn.CreateVlan(vlanID, pn, ln)
77         if err != nil {
78                 log.Error(err, "Unable to create VLAN", "vlan", ln)
79                 return err
80         }
81         err = ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
82         if err != nil {
83                 log.Error(err, "Unable to create vlan direct bridge", "vlan", pn)
84                 return err
85         }
86         return nil
87 }
88
89 func createDirectProvidernetwork(payload *pb.Notification_ProviderNwCreate) error {
90         var err error
91         pn := payload.ProviderNwCreate.GetDirect().GetProviderIntf()
92         name := payload.ProviderNwCreate.GetProviderNwName()
93         err = ovn.CreatePnBridge("nw_"+name, "br-"+name, pn)
94         if err != nil {
95                 log.Error(err, "Unable to create direct bridge", "direct", pn)
96                 return err
97         }
98         return nil
99 }
100
101 func deleteVlanProvidernetwork(payload *pb.Notification_ProviderNwRemove) {
102         ln := payload.ProviderNwRemove.GetVlanLogicalIntf()
103         name := payload.ProviderNwRemove.GetProviderNwName()
104         ovn.DeleteVlan(ln)
105         ovn.DeletePnBridge("nw_"+name, "br-"+name)
106 }
107
108 func deleteDirectProvidernetwork(payload *pb.Notification_ProviderNwRemove) {
109         ln := payload.ProviderNwRemove.GetVlanLogicalIntf()
110         name := payload.ProviderNwRemove.GetProviderNwName()
111         ovn.DeleteVlan(ln)
112         ovn.DeletePnBridge("nw_"+name, "br-"+name)
113 }
114
115 func inSyncVlanProvidernetwork() {
116         var err error
117         // Read config from node
118         vlanList := ovn.GetVlan()
119         pnBridgeList := ovn.GetPnBridge("nfn")
120         diffVlan := make(map[string]bool)
121         diffPnBridge := make(map[string]bool)
122 VLAN:
123         for _, pn := range pnCreateStore {
124                 if pn.ProviderNwCreate.GetVlan() != nil {
125                         continue
126                 }
127                 id := pn.ProviderNwCreate.GetVlan().GetVlanId()
128                 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
129                 pn := pn.ProviderNwCreate.GetVlan().GetProviderIntf()
130                 if ln == "" {
131                         ln = pn + "." + id
132                 }
133                 for _, vlan := range vlanList {
134                         if vlan == ln {
135                                 // VLAN already present
136                                 diffVlan[vlan] = true
137                                 continue VLAN
138                         }
139                 }
140                 // Vlan not found
141                 err = ovn.CreateVlan(id, pn, ln)
142                 if err != nil {
143                         log.Error(err, "Unable to create VLAN", "vlan", ln)
144                         return
145                 }
146         }
147 PRNETWORK:
148         for _, pn := range pnCreateStore {
149                 if pn.ProviderNwCreate.GetVlan() != nil {
150                         continue
151                 }
152                 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
153                 name := pn.ProviderNwCreate.GetProviderNwName()
154                 for _, br := range pnBridgeList {
155                         pnName := strings.Replace(br, "br-", "", -1)
156                         if name == pnName {
157                                 diffPnBridge[br] = true
158                                 continue PRNETWORK
159                         }
160                 }
161                 // Provider Network not found
162                 ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
163         }
164         // Delete VLAN not in the list
165         for _, vlan := range vlanList {
166                 if diffVlan[vlan] == false {
167                         ovn.DeleteVlan(vlan)
168                 }
169         }
170         // Delete Provider Bridge not in the list
171         for _, br := range pnBridgeList {
172                 if diffPnBridge[br] == false {
173                         name := strings.Replace(br, "br-", "", -1)
174                         ovn.DeletePnBridge("nw_"+name, "br-"+name)
175                 }
176         }
177 }
178
179 func inSyncDirectProvidernetwork() {
180         // Read config from node
181         pnBridgeList := ovn.GetPnBridge("nfn")
182         diffPnBridge := make(map[string]bool)
183 DIRECTPRNETWORK:
184         for _, pn := range pnCreateStore {
185                 if pn.ProviderNwCreate.GetDirect() != nil {
186                         continue
187                 }
188                 pr := pn.ProviderNwCreate.GetDirect().GetProviderIntf()
189                 name := pn.ProviderNwCreate.GetProviderNwName()
190                 for _, br := range pnBridgeList {
191                         pnName := strings.Replace(br, "br-", "", -1)
192                         if name == pnName {
193                                 diffPnBridge[br] = true
194                                 continue DIRECTPRNETWORK
195                         }
196                 }
197                 // Provider Network not found
198                 ovn.CreatePnBridge("nw_"+name, "br-"+name, pr)
199         }
200         // Delete Provider Bridge not in the list
201         for _, br := range pnBridgeList {
202                 if diffPnBridge[br] == false {
203                         name := strings.Replace(br, "br-", "", -1)
204                         ovn.DeletePnBridge("nw_"+name, "br-"+name)
205                 }
206         }
207 }
208
209 func createNodeOVSInternalPort(payload *pb.Notification_InSync) error {
210         nodeIntfIPAddr := strings.Trim(strings.TrimSpace(payload.InSync.GetNodeIntfIpAddress()), "\"")
211         nodeIntfMacAddr := strings.Trim(strings.TrimSpace(payload.InSync.GetNodeIntfMacAddress()), "\"")
212         nodeName := os.Getenv("NFN_NODE_NAME")
213
214         err := app.CreateNodeOVSInternalPort(nodeIntfIPAddr, nodeIntfMacAddr, nodeName)
215         if err != nil {
216                 return err
217         }
218
219         return nil
220 }
221
222 func handleNotif(msg *pb.Notification) {
223         switch msg.GetCniType() {
224         case "ovn4nfv":
225                 switch payload := msg.Payload.(type) {
226                 case *pb.Notification_ProviderNwCreate:
227                         if !inSync {
228                                 // Store Msgs
229                                 pnCreateStore = append(pnCreateStore, payload)
230                                 return
231                         }
232                         if payload.ProviderNwCreate.GetVlan() != nil {
233                                 err := createVlanProvidernetwork(payload)
234                                 if err != nil {
235                                         return
236                                 }
237                         }
238
239                         if payload.ProviderNwCreate.GetDirect() != nil {
240                                 err := createDirectProvidernetwork(payload)
241                                 if err != nil {
242                                         return
243                                 }
244                         }
245                 case *pb.Notification_ProviderNwRemove:
246                         if !inSync {
247                                 // Unexpected Remove message
248                                 return
249                         }
250
251                         if payload.ProviderNwRemove.GetVlanLogicalIntf() != "" {
252                                 deleteVlanProvidernetwork(payload)
253                         }
254
255                         if payload.ProviderNwRemove.GetDirectProviderIntf() != "" {
256                                 deleteDirectProvidernetwork(payload)
257                         }
258
259                 case *pb.Notification_InSync:
260                         if payload.InSync.GetNodeIntfIpAddress() != "" && payload.InSync.GetNodeIntfMacAddress() != "" {
261                                 err := createNodeOVSInternalPort(payload)
262                                 if err != nil {
263                                         return
264                                 }
265                         }
266                         inSyncVlanProvidernetwork()
267                         inSyncDirectProvidernetwork()
268                         pnCreateStore = nil
269                         inSync = true
270
271                 }
272         // Add other Types here
273         default:
274                 log.Info("Not supported cni type", "cni type", msg.GetCniType())
275         }
276 }
277
278 func shutdownHandler(errorChannel <-chan string) {
279         // Register to receive term/int signal.
280         signalChan := make(chan os.Signal, 1)
281         signal.Notify(signalChan, syscall.SIGTERM)
282         signal.Notify(signalChan, syscall.SIGINT)
283         signal.Notify(signalChan, syscall.SIGHUP)
284
285         var reason string
286         select {
287         case sig := <-signalChan:
288                 if sig == syscall.SIGHUP {
289                         log.Info("Received a SIGHUP")
290                 }
291                 reason = fmt.Sprintf("Received OS signal %v", sig)
292         case reason = <-errorChannel:
293                 log.Info("Error", "reason", reason)
294         }
295         log.Info("nfn-agent is shutting down", "reason", reason)
296 }
297
298 func shutDownAgent(reason string) {
299         // Send a failure message and give few seconds complete shutdown.
300         log.Info("shutDownAgent recieved")
301         errorChannel <- reason
302         time.Sleep(10 * time.Second)
303         // The graceful shutdown failed, terminate the process.
304         panic("Shutdown failed. Panicking.")
305 }
306
307 func main() {
308         logf.SetLogger(zap.Logger(true))
309         log.Info("nfn-agent Started")
310
311         serverAddr := os.Getenv("NFN_OPERATOR_SERVICE_HOST") + ":" + os.Getenv("NFN_OPERATOR_SERVICE_PORT")
312         // Setup ovn utilities
313         exec := kexec.New()
314         err := ovn.SetExec(exec)
315         if err != nil {
316                 log.Error(err, "Unable to setup OVN Utils")
317                 return
318         }
319         conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
320         if err != nil {
321                 log.Error(err, "fail to dial")
322                 return
323         }
324         defer conn.Close()
325         client := pb.NewNfnNotifyClient(conn)
326         errorChannel = make(chan string)
327
328         // creates the in-cluster config
329         config, err := rest.InClusterConfig()
330         if err != nil {
331                 log.Error(err, "Unable to create in-cluster config")
332                 return
333         }
334
335         // creates the clientset
336         clientset, err := kubernetes.NewForConfig(config)
337         if err != nil {
338                 log.Error(err, "Unable to create clientset for in-cluster config")
339                 return
340         }
341
342         cniserver := cs.NewCNIServer("", clientset)
343         err = cniserver.Start(cs.HandleCNIcommandRequest)
344         if err != nil {
345                 log.Error(err, "Unable to start cni server")
346                 return
347         }
348         // Run client in background
349         go subscribeNotif(client)
350         shutdownHandler(errorChannel)
351
352 }