fixing centos container images to point cni-proxy fix
[ovn4nfv-k8s-plugin.git] / cmd / nfn-agent / nfn-agent.go
1 /*
2  * Copyright 2020 Intel Corporation, Inc
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package main
18
19 import (
20         "context"
21         "fmt"
22         "io"
23         "os"
24         "os/signal"
25         cs "ovn4nfv-k8s-plugin/internal/pkg/cniserver"
26         pb "ovn4nfv-k8s-plugin/internal/pkg/nfnNotify/proto"
27         "ovn4nfv-k8s-plugin/internal/pkg/ovn"
28         chaining "ovn4nfv-k8s-plugin/internal/pkg/utils"
29         "strings"
30         "syscall"
31         "time"
32
33         "google.golang.org/grpc"
34         "k8s.io/client-go/kubernetes"
35         "k8s.io/client-go/rest"
36         kexec "k8s.io/utils/exec"
37         logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
38
39         //"google.golang.org/grpc/keepalive"
40
41         "ovn4nfv-k8s-plugin/cmd/ovn4nfvk8s-cni/app"
42
43         "google.golang.org/grpc/status"
44         "sigs.k8s.io/controller-runtime/pkg/log/zap"
45 )
46
47 var log = logf.Log.WithName("nfn-agent")
48 var errorChannel chan string
49 var inSync bool
50 var pnCreateStore []*pb.Notification_ProviderNwCreate
51
52 // subscribe Notifications
53 func subscribeNotif(client pb.NfnNotifyClient) error {
54         log.Info("Subscribe Notification from server")
55         ctx := context.Background()
56         var n pb.SubscribeContext
57         n.NodeName = os.Getenv("NFN_NODE_NAME")
58         for {
59                 stream, err := client.Subscribe(ctx, &n, grpc.WaitForReady(true))
60                 if err != nil {
61                         log.Error(err, "Subscribe", "client", client, "status", status.Code(err))
62                         continue
63                 }
64                 log.Info("Subscribe Notification success")
65
66                 for {
67                         in, err := stream.Recv()
68                         if err == io.EOF {
69                                 // read done.
70                                 shutDownAgent("Stream closed")
71                                 return err
72                         }
73                         if err != nil {
74                                 log.Error(err, "Stream closed from server")
75                                 shutDownAgent("Stream closed from server")
76                                 return err
77                         }
78                         log.Info("Got message", "msg", in)
79                         handleNotif(in)
80                 }
81         }
82 }
83
84 func createVlanProvidernetwork(payload *pb.Notification_ProviderNwCreate) error {
85         var err error
86         vlanID := payload.ProviderNwCreate.GetVlan().GetVlanId()
87         ln := payload.ProviderNwCreate.GetVlan().GetLogicalIntf()
88         pn := payload.ProviderNwCreate.GetVlan().GetProviderIntf()
89         name := payload.ProviderNwCreate.GetProviderNwName()
90         if ln == "" {
91                 ln = name + "." + vlanID
92         }
93         err = ovn.CreateVlan(vlanID, pn, ln)
94         if err != nil {
95                 log.Error(err, "Unable to create VLAN", "vlan", ln)
96                 return err
97         }
98         err = ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
99         if err != nil {
100                 log.Error(err, "Unable to create vlan direct bridge", "vlan", pn)
101                 return err
102         }
103         return nil
104 }
105
106 func createDirectProvidernetwork(payload *pb.Notification_ProviderNwCreate) error {
107         var err error
108         pn := payload.ProviderNwCreate.GetDirect().GetProviderIntf()
109         name := payload.ProviderNwCreate.GetProviderNwName()
110         err = ovn.CreatePnBridge("nw_"+name, "br-"+name, pn)
111         if err != nil {
112                 log.Error(err, "Unable to create direct bridge", "direct", pn)
113                 return err
114         }
115         return nil
116 }
117
118 func deleteVlanProvidernetwork(payload *pb.Notification_ProviderNwRemove) {
119         ln := payload.ProviderNwRemove.GetVlanLogicalIntf()
120         name := payload.ProviderNwRemove.GetProviderNwName()
121         ovn.DeleteVlan(ln)
122         ovn.DeletePnBridge("nw_"+name, "br-"+name)
123 }
124
125 func deleteDirectProvidernetwork(payload *pb.Notification_ProviderNwRemove) {
126         ln := payload.ProviderNwRemove.GetVlanLogicalIntf()
127         name := payload.ProviderNwRemove.GetProviderNwName()
128         ovn.DeleteVlan(ln)
129         ovn.DeletePnBridge("nw_"+name, "br-"+name)
130 }
131
132 func inSyncVlanProvidernetwork() {
133         var err error
134         // Read config from node
135         vlanList := ovn.GetVlan()
136         pnBridgeList := ovn.GetPnBridge("nfn")
137         diffVlan := make(map[string]bool)
138         diffPnBridge := make(map[string]bool)
139 VLAN:
140         for _, pn := range pnCreateStore {
141                 if pn.ProviderNwCreate.GetVlan() != nil {
142                         continue
143                 }
144                 id := pn.ProviderNwCreate.GetVlan().GetVlanId()
145                 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
146                 pn := pn.ProviderNwCreate.GetVlan().GetProviderIntf()
147                 if ln == "" {
148                         ln = pn + "." + id
149                 }
150                 for _, vlan := range vlanList {
151                         if vlan == ln {
152                                 // VLAN already present
153                                 diffVlan[vlan] = true
154                                 continue VLAN
155                         }
156                 }
157                 // Vlan not found
158                 err = ovn.CreateVlan(id, pn, ln)
159                 if err != nil {
160                         log.Error(err, "Unable to create VLAN", "vlan", ln)
161                         return
162                 }
163         }
164 PRNETWORK:
165         for _, pn := range pnCreateStore {
166                 if pn.ProviderNwCreate.GetVlan() != nil {
167                         continue
168                 }
169                 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
170                 name := pn.ProviderNwCreate.GetProviderNwName()
171                 for _, br := range pnBridgeList {
172                         pnName := strings.Replace(br, "br-", "", -1)
173                         if name == pnName {
174                                 diffPnBridge[br] = true
175                                 continue PRNETWORK
176                         }
177                 }
178                 // Provider Network not found
179                 ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
180         }
181         // Delete VLAN not in the list
182         for _, vlan := range vlanList {
183                 if diffVlan[vlan] == false {
184                         ovn.DeleteVlan(vlan)
185                 }
186         }
187         // Delete Provider Bridge not in the list
188         for _, br := range pnBridgeList {
189                 if diffPnBridge[br] == false {
190                         name := strings.Replace(br, "br-", "", -1)
191                         ovn.DeletePnBridge("nw_"+name, "br-"+name)
192                 }
193         }
194 }
195
196 func inSyncDirectProvidernetwork() {
197         // Read config from node
198         pnBridgeList := ovn.GetPnBridge("nfn")
199         diffPnBridge := make(map[string]bool)
200 DIRECTPRNETWORK:
201         for _, pn := range pnCreateStore {
202                 if pn.ProviderNwCreate.GetDirect() != nil {
203                         continue
204                 }
205                 pr := pn.ProviderNwCreate.GetDirect().GetProviderIntf()
206                 name := pn.ProviderNwCreate.GetProviderNwName()
207                 for _, br := range pnBridgeList {
208                         pnName := strings.Replace(br, "br-", "", -1)
209                         if name == pnName {
210                                 diffPnBridge[br] = true
211                                 continue DIRECTPRNETWORK
212                         }
213                 }
214                 // Provider Network not found
215                 ovn.CreatePnBridge("nw_"+name, "br-"+name, pr)
216         }
217         // Delete Provider Bridge not in the list
218         for _, br := range pnBridgeList {
219                 if diffPnBridge[br] == false {
220                         name := strings.Replace(br, "br-", "", -1)
221                         ovn.DeletePnBridge("nw_"+name, "br-"+name)
222                 }
223         }
224 }
225
226 func createNodeOVSInternalPort(payload *pb.Notification_InSync) error {
227         nodeIntfIPAddr := strings.Trim(strings.TrimSpace(payload.InSync.GetNodeIntfIpAddress()), "\"")
228         nodeIntfMacAddr := strings.Trim(strings.TrimSpace(payload.InSync.GetNodeIntfMacAddress()), "\"")
229         nodeName := os.Getenv("NFN_NODE_NAME")
230
231         err := app.CreateNodeOVSInternalPort(nodeIntfIPAddr, nodeIntfMacAddr, nodeName)
232         if err != nil {
233                 return err
234         }
235
236         return nil
237 }
238
239 func handleNotif(msg *pb.Notification) {
240         switch msg.GetCniType() {
241         case "ovn4nfv":
242                 switch payload := msg.Payload.(type) {
243                 case *pb.Notification_ProviderNwCreate:
244                         if !inSync {
245                                 // Store Msgs
246                                 pnCreateStore = append(pnCreateStore, payload)
247                                 return
248                         }
249                         if payload.ProviderNwCreate.GetVlan() != nil {
250                                 err := createVlanProvidernetwork(payload)
251                                 if err != nil {
252                                         return
253                                 }
254                         }
255
256                         if payload.ProviderNwCreate.GetDirect() != nil {
257                                 err := createDirectProvidernetwork(payload)
258                                 if err != nil {
259                                         return
260                                 }
261                         }
262                 case *pb.Notification_ProviderNwRemove:
263                         if !inSync {
264                                 // Unexpected Remove message
265                                 return
266                         }
267
268                         if payload.ProviderNwRemove.GetVlanLogicalIntf() != "" {
269                                 deleteVlanProvidernetwork(payload)
270                         }
271
272                         if payload.ProviderNwRemove.GetDirectProviderIntf() != "" {
273                                 deleteDirectProvidernetwork(payload)
274                         }
275
276                 case *pb.Notification_ContainterRtInsert:
277                         id := payload.ContainterRtInsert.GetContainerId()
278                         pid, err := chaining.GetPidForContainer(id)
279                         if err != nil {
280                                 log.Error(err, "Failed to get pid", "containerID", id)
281                                 return
282                         }
283                         err = chaining.ContainerAddRoute(pid, payload.ContainterRtInsert.GetRoute())
284                         if err != nil {
285                                 return
286                         }
287
288                 case *pb.Notification_InSync:
289                         inSyncVlanProvidernetwork()
290                         inSyncDirectProvidernetwork()
291                         pnCreateStore = nil
292                         inSync = true
293                         if payload.InSync.GetNodeIntfIpAddress() != "" && payload.InSync.GetNodeIntfMacAddress() != "" {
294                                 err := createNodeOVSInternalPort(payload)
295                                 if err != nil {
296                                         return
297                                 }
298                         }
299
300                 }
301         // Add other Types here
302         default:
303                 log.Info("Not supported cni type", "cni type", msg.GetCniType())
304         }
305 }
306
307 func shutdownHandler(errorChannel <-chan string) {
308         // Register to receive term/int signal.
309         signalChan := make(chan os.Signal, 1)
310         signal.Notify(signalChan, syscall.SIGTERM)
311         signal.Notify(signalChan, syscall.SIGINT)
312         signal.Notify(signalChan, syscall.SIGHUP)
313
314         var reason string
315         select {
316         case sig := <-signalChan:
317                 if sig == syscall.SIGHUP {
318                         log.Info("Received a SIGHUP")
319                 }
320                 reason = fmt.Sprintf("Received OS signal %v", sig)
321         case reason = <-errorChannel:
322                 log.Info("Error", "reason", reason)
323         }
324         log.Info("nfn-agent is shutting down", "reason", reason)
325 }
326
327 func shutDownAgent(reason string) {
328         // Send a failure message and give few seconds complete shutdown.
329         log.Info("shutDownAgent recieved")
330         errorChannel <- reason
331         time.Sleep(10 * time.Second)
332         // The graceful shutdown failed, terminate the process.
333         panic("Shutdown failed. Panicking.")
334 }
335
336 func main() {
337         logf.SetLogger(zap.Logger(true))
338         log.Info("nfn-agent Started")
339
340         serverAddr := os.Getenv("NFN_OPERATOR_SERVICE_HOST") + ":" + os.Getenv("NFN_OPERATOR_SERVICE_PORT")
341         // Setup ovn utilities
342         exec := kexec.New()
343         err := ovn.SetExec(exec)
344         if err != nil {
345                 log.Error(err, "Unable to setup OVN Utils")
346                 return
347         }
348         conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
349         if err != nil {
350                 log.Error(err, "fail to dial")
351                 return
352         }
353         defer conn.Close()
354         client := pb.NewNfnNotifyClient(conn)
355         errorChannel = make(chan string)
356
357         // creates the in-cluster config
358         config, err := rest.InClusterConfig()
359         if err != nil {
360                 log.Error(err, "Unable to create in-cluster config")
361                 return
362         }
363
364         // creates the clientset
365         clientset, err := kubernetes.NewForConfig(config)
366         if err != nil {
367                 log.Error(err, "Unable to create clientset for in-cluster config")
368                 return
369         }
370
371         cniserver := cs.NewCNIServer("", clientset)
372         err = cniserver.Start(cs.HandleCNIcommandRequest)
373         if err != nil {
374                 log.Error(err, "Unable to start cni server")
375                 return
376         }
377         // Run client in background
378         go subscribeNotif(client)
379         shutdownHandler(errorChannel)
380
381 }