Adding cnishim and cniserver
[ovn4nfv-k8s-plugin.git] / cmd / nfn-agent / nfn-agent.go
1 package main
2
3 import (
4         "context"
5         "fmt"
6         "google.golang.org/grpc"
7         "io"
8         kexec "k8s.io/utils/exec"
9         "os"
10         "os/signal"
11         pb "ovn4nfv-k8s-plugin/internal/pkg/nfnNotify/proto"
12         cs "ovn4nfv-k8s-plugin/internal/pkg/cniserver"
13         "ovn4nfv-k8s-plugin/internal/pkg/ovn"
14         logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
15         "k8s.io/client-go/kubernetes"
16         "k8s.io/client-go/rest"
17         "strings"
18         "syscall"
19         "time"
20         //"google.golang.org/grpc/keepalive"
21         "google.golang.org/grpc/status"
22         "sigs.k8s.io/controller-runtime/pkg/log/zap"
23 )
24
25 var log = logf.Log.WithName("nfn-agent")
26 var errorChannel chan string
27 var inSync bool
28 var pnCreateStore []*pb.Notification_ProviderNwCreate
29
30 // subscribe Notifications
31 func subscribeNotif(client pb.NfnNotifyClient) error {
32         log.Info("Subscribe Notification from server")
33         ctx := context.Background()
34         var n pb.SubscribeContext
35         n.NodeName = os.Getenv("NFN_NODE_NAME")
36         for {
37                 stream, err := client.Subscribe(ctx, &n, grpc.WaitForReady(true))
38                 if err != nil {
39                         log.Error(err, "Subscribe", "client", client, "status", status.Code(err))
40                         continue
41                 }
42                 log.Info("Subscribe Notification success")
43
44                 for {
45                         in, err := stream.Recv()
46                         if err == io.EOF {
47                                 // read done.
48                                 shutDownAgent("Stream closed")
49                                 return err
50                         }
51                         if err != nil {
52                                 log.Error(err, "Stream closed from server")
53                                 shutDownAgent("Stream closed from server")
54                                 return err
55                         }
56                         log.Info("Got message", "msg", in)
57                         handleNotif(in)
58                 }
59         }
60 }
61
62 func createVlanProvidernetwork(payload *pb.Notification_ProviderNwCreate) error {
63         var err error
64         vlanID := payload.ProviderNwCreate.GetVlan().GetVlanId()
65         ln := payload.ProviderNwCreate.GetVlan().GetLogicalIntf()
66         pn := payload.ProviderNwCreate.GetVlan().GetProviderIntf()
67         name := payload.ProviderNwCreate.GetProviderNwName()
68         if ln == "" {
69                 ln = name + "." + vlanID
70         }
71         err = ovn.CreateVlan(vlanID, pn, ln)
72         if err != nil {
73                 log.Error(err, "Unable to create VLAN", "vlan", ln)
74                 return err
75         }
76         err = ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
77         if err != nil {
78                 log.Error(err, "Unable to create vlan direct bridge", "vlan", pn)
79                 return err
80         }
81         return nil
82 }
83
84 func createDirectProvidernetwork(payload *pb.Notification_ProviderNwCreate) error {
85         var err error
86         pn := payload.ProviderNwCreate.GetDirect().GetProviderIntf()
87         name := payload.ProviderNwCreate.GetProviderNwName()
88         err = ovn.CreatePnBridge("nw_"+name, "br-"+name, pn)
89         if err != nil {
90                 log.Error(err, "Unable to create direct bridge", "direct", pn)
91                 return err
92         }
93         return nil
94 }
95
96 func deleteVlanProvidernetwork(payload *pb.Notification_ProviderNwRemove) {
97         ln := payload.ProviderNwRemove.GetVlanLogicalIntf()
98         name := payload.ProviderNwRemove.GetProviderNwName()
99         ovn.DeleteVlan(ln)
100         ovn.DeletePnBridge("nw_"+name, "br-"+name)
101 }
102
103 func deleteDirectProvidernetwork(payload *pb.Notification_ProviderNwRemove) {
104         ln := payload.ProviderNwRemove.GetVlanLogicalIntf()
105         name := payload.ProviderNwRemove.GetProviderNwName()
106         ovn.DeleteVlan(ln)
107         ovn.DeletePnBridge("nw_"+name, "br-"+name)
108 }
109
110 func inSyncVlanProvidernetwork() {
111         var err error
112         // Read config from node
113         vlanList := ovn.GetVlan()
114         pnBridgeList := ovn.GetPnBridge("nfn")
115         diffVlan := make(map[string]bool)
116         diffPnBridge := make(map[string]bool)
117 VLAN:
118         for _, pn := range pnCreateStore {
119                 if pn.ProviderNwCreate.GetVlan() != nil {
120                         continue
121                 }
122                 id := pn.ProviderNwCreate.GetVlan().GetVlanId()
123                 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
124                 pn := pn.ProviderNwCreate.GetVlan().GetProviderIntf()
125                 if ln == "" {
126                         ln = pn + "." + id
127                 }
128                 for _, vlan := range vlanList {
129                         if vlan == ln {
130                                 // VLAN already present
131                                 diffVlan[vlan] = true
132                                 continue VLAN
133                         }
134                 }
135                 // Vlan not found
136                 err = ovn.CreateVlan(id, pn, ln)
137                 if err != nil {
138                         log.Error(err, "Unable to create VLAN", "vlan", ln)
139                         return
140                 }
141         }
142 PRNETWORK:
143         for _, pn := range pnCreateStore {
144                 if pn.ProviderNwCreate.GetVlan() != nil {
145                         continue
146                 }
147                 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
148                 name := pn.ProviderNwCreate.GetProviderNwName()
149                 for _, br := range pnBridgeList {
150                         pnName := strings.Replace(br, "br-", "", -1)
151                         if name == pnName {
152                                 diffPnBridge[br] = true
153                                 continue PRNETWORK
154                         }
155                 }
156                 // Provider Network not found
157                 ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
158         }
159         // Delete VLAN not in the list
160         for _, vlan := range vlanList {
161                 if diffVlan[vlan] == false {
162                         ovn.DeleteVlan(vlan)
163                 }
164         }
165         // Delete Provider Bridge not in the list
166         for _, br := range pnBridgeList {
167                 if diffPnBridge[br] == false {
168                         name := strings.Replace(br, "br-", "", -1)
169                         ovn.DeletePnBridge("nw_"+name, "br-"+name)
170                 }
171         }
172 }
173
174 func inSyncDirectProvidernetwork() {
175         // Read config from node
176         pnBridgeList := ovn.GetPnBridge("nfn")
177         diffPnBridge := make(map[string]bool)
178 DIRECTPRNETWORK:
179         for _, pn := range pnCreateStore {
180                 if pn.ProviderNwCreate.GetDirect() != nil {
181                         continue
182                 }
183                 pr := pn.ProviderNwCreate.GetDirect().GetProviderIntf()
184                 name := pn.ProviderNwCreate.GetProviderNwName()
185                 for _, br := range pnBridgeList {
186                         pnName := strings.Replace(br, "br-", "", -1)
187                         if name == pnName {
188                                 diffPnBridge[br] = true
189                                 continue DIRECTPRNETWORK
190                         }
191                 }
192                 // Provider Network not found
193                 ovn.CreatePnBridge("nw_"+name, "br-"+name, pr)
194         }
195         // Delete Provider Bridge not in the list
196         for _, br := range pnBridgeList {
197                 if diffPnBridge[br] == false {
198                         name := strings.Replace(br, "br-", "", -1)
199                         ovn.DeletePnBridge("nw_"+name, "br-"+name)
200                 }
201         }
202 }
203
204 func handleNotif(msg *pb.Notification) {
205         switch msg.GetCniType() {
206         case "ovn4nfv":
207                 switch payload := msg.Payload.(type) {
208                 case *pb.Notification_ProviderNwCreate:
209                         if !inSync {
210                                 // Store Msgs
211                                 pnCreateStore = append(pnCreateStore, payload)
212                                 return
213                         }
214                         if payload.ProviderNwCreate.GetVlan() != nil {
215                                 err := createVlanProvidernetwork(payload)
216                                 if err != nil {
217                                         return
218                                 }
219                         }
220
221                         if payload.ProviderNwCreate.GetDirect() != nil {
222                                 err := createDirectProvidernetwork(payload)
223                                 if err != nil {
224                                         return
225                                 }
226                         }
227                 case *pb.Notification_ProviderNwRemove:
228                         if !inSync {
229                                 // Unexpected Remove message
230                                 return
231                         }
232
233                         if payload.ProviderNwRemove.GetVlanLogicalIntf() != "" {
234                                 deleteVlanProvidernetwork(payload)
235                         }
236
237                         if payload.ProviderNwRemove.GetDirectProviderIntf() != "" {
238                                 deleteDirectProvidernetwork(payload)
239                         }
240
241                 case *pb.Notification_InSync:
242                         inSyncVlanProvidernetwork()
243                         inSyncDirectProvidernetwork()
244                         pnCreateStore = nil
245                         inSync = true
246
247                 }
248         // Add other Types here
249         default:
250                 log.Info("Not supported cni type", "cni type", msg.GetCniType())
251         }
252 }
253
254 func shutdownHandler(errorChannel <-chan string) {
255         // Register to receive term/int signal.
256         signalChan := make(chan os.Signal, 1)
257         signal.Notify(signalChan, syscall.SIGTERM)
258         signal.Notify(signalChan, syscall.SIGINT)
259         signal.Notify(signalChan, syscall.SIGHUP)
260
261         var reason string
262         select {
263         case sig := <-signalChan:
264                 if sig == syscall.SIGHUP {
265                         log.Info("Received a SIGHUP")
266                 }
267                 reason = fmt.Sprintf("Received OS signal %v", sig)
268         case reason = <-errorChannel:
269                 log.Info("Error", "reason", reason)
270         }
271         log.Info("nfn-agent is shutting down", "reason", reason)
272 }
273
274 func shutDownAgent(reason string) {
275         // Send a failure message and give few seconds complete shutdown.
276         log.Info("shutDownAgent recieved")
277         errorChannel <- reason
278         time.Sleep(10 * time.Second)
279         // The graceful shutdown failed, terminate the process.
280         panic("Shutdown failed. Panicking.")
281 }
282
283 func main() {
284         logf.SetLogger(zap.Logger(true))
285         log.Info("nfn-agent Started")
286
287         serverAddr := os.Getenv("NFN_OPERATOR_SERVICE_HOST") + ":" + os.Getenv("NFN_OPERATOR_SERVICE_PORT")
288         // Setup ovn utilities
289         exec := kexec.New()
290         err := ovn.SetExec(exec)
291         if err != nil {
292                 log.Error(err, "Unable to setup OVN Utils")
293                 return
294         }
295         conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
296         if err != nil {
297                 log.Error(err, "fail to dial")
298                 return
299         }
300         defer conn.Close()
301         client := pb.NewNfnNotifyClient(conn)
302         errorChannel = make(chan string)
303
304         // creates the in-cluster config
305         config, err := rest.InClusterConfig()
306         if err != nil {
307                 log.Error(err, "Unable to create in-cluster config")
308                 return
309         }
310
311         // creates the clientset
312         clientset, err := kubernetes.NewForConfig(config)
313         if err != nil {
314                 log.Error(err, "Unable to create clientset for in-cluster config")
315                 return
316         }
317
318         cniserver := cs.NewCNIServer("",clientset)
319         err = cniserver.Start(cs.HandleCNIcommandRequest)
320         if err != nil {
321                 log.Error(err, "Unable to start cni server")
322                 return
323         }
324         // Run client in background
325         go subscribeNotif(client)
326         shutdownHandler(errorChannel)
327
328 }