nfn agent for configuring nodes
[ovn4nfv-k8s-plugin.git] / cmd / nfn-agent / nfn-agent.go
1 package main
2
3 import (
4         "context"
5         "fmt"
6         "google.golang.org/grpc"
7         "io"
8         kexec "k8s.io/utils/exec"
9         "os"
10         "os/signal"
11         pb "ovn4nfv-k8s-plugin/internal/pkg/nfnNotify/proto"
12         "ovn4nfv-k8s-plugin/internal/pkg/ovn"
13         logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
14         "strings"
15         "syscall"
16         "time"
17         //"google.golang.org/grpc/keepalive"
18         "google.golang.org/grpc/status"
19         "sigs.k8s.io/controller-runtime/pkg/log/zap"
20 )
21
22 var log = logf.Log.WithName("nfn-agent")
23 var errorChannel chan string
24 var inSync bool
25 var pnCreateStore []*pb.Notification_ProviderNwCreate
26
27 // subscribe Notifications
28 func subscribeNotif(client pb.NfnNotifyClient) error {
29         log.Info("Subscribe Notification from server")
30         ctx := context.Background()
31         var n pb.SubscribeContext
32         n.NodeName = os.Getenv("NFN_NODE_NAME")
33         for {
34                 stream, err := client.Subscribe(ctx, &n, grpc.WaitForReady(true))
35                 if err != nil {
36                         log.Error(err, "Subscribe", "client", client, "status", status.Code(err))
37                         continue
38                 }
39                 log.Info("Subscribe Notification success")
40
41                 for {
42                         in, err := stream.Recv()
43                         if err == io.EOF {
44                                 // read done.
45                                 shutDownAgent("Stream closed")
46                                 return err
47                         }
48                         if err != nil {
49                                 log.Error(err, "Stream closed from server")
50                                 shutDownAgent("Stream closed from server")
51                                 return err
52                         }
53                         log.Info("Got message", "msg", in)
54                         handleNotif(in)
55                 }
56         }
57 }
58
59 func handleNotif(msg *pb.Notification) {
60         var err error
61         switch msg.GetCniType() {
62         case "ovn4nfv":
63                 switch payload := msg.Payload.(type) {
64                 case *pb.Notification_ProviderNwCreate:
65                         if !inSync {
66                                 // Store Msgs
67                                 pnCreateStore = append(pnCreateStore, payload)
68                                 return
69                         }
70                         vlanID := payload.ProviderNwCreate.GetVlan().GetVlanId()
71                         ln := payload.ProviderNwCreate.GetVlan().GetLogicalIntf()
72                         pn := payload.ProviderNwCreate.GetVlan().GetProviderIntf()
73                         name := payload.ProviderNwCreate.GetProviderNwName()
74                         if ln == "" {
75                                 ln = name + "." + vlanID
76                         }
77                         err = ovn.CreateVlan(vlanID, pn, ln)
78                         if err != nil {
79                                 log.Error(err, "Unable to create VLAN", "vlan", ln)
80                                 return
81                         }
82                         ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
83                 case *pb.Notification_ProviderNwRemove:
84                         if !inSync {
85                                 // Unexpected Remove message
86                                 return
87                         }
88                         ln := payload.ProviderNwRemove.GetVlanLogicalIntf()
89                         name := payload.ProviderNwRemove.GetProviderNwName()
90                         ovn.DeleteVlan(ln)
91                         ovn.DeletePnBridge("nw_"+name, "br-"+name)
92                 case *pb.Notification_InSync:
93                         // Read config from node
94                         vlanList := ovn.GetVlan()
95                         pnBridgeList := ovn.GetPnBridge("nfn")
96                         diffVlan := make(map[string]bool)
97                         diffPnBridge := make(map[string]bool)
98                 VLAN:
99                         for _, pn := range pnCreateStore {
100                                 id := pn.ProviderNwCreate.GetVlan().GetVlanId()
101                                 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
102                                 pn := pn.ProviderNwCreate.GetVlan().GetProviderIntf()
103                                 if ln == "" {
104                                         ln = pn + "." + id
105                                 }
106                                 for _, vlan := range vlanList {
107                                         if vlan == ln {
108                                                 // VLAN already present
109                                                 diffVlan[vlan] = true
110                                                 continue VLAN
111                                         }
112                                 }
113                                 // Vlan not found
114                                 err = ovn.CreateVlan(id, pn, ln)
115                                 if err != nil {
116                                         log.Error(err, "Unable to create VLAN", "vlan", ln)
117                                         return
118                                 }
119                         }
120                 PRNETWORK:
121                         for _, pn := range pnCreateStore {
122                                 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
123                                 name := pn.ProviderNwCreate.GetProviderNwName()
124                                 for _, br := range pnBridgeList {
125                                         pnName := strings.Replace(br, "br-", "", -1)
126                                         if name == pnName {
127                                                 diffPnBridge[br] = true
128                                                 continue PRNETWORK
129                                         }
130                                 }
131                                 // Provider Network not found
132                                 ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
133                         }
134                         // Delete VLAN not in the list
135                         for _, vlan := range vlanList {
136                                 if diffVlan[vlan] == false {
137                                         ovn.DeleteVlan(vlan)
138                                 }
139                         }
140                         // Delete Provider Bridge not in the list
141                         for _, br := range pnBridgeList {
142                                 if diffPnBridge[br] == false {
143                                         name := strings.Replace(br, "br-", "", -1)
144                                         ovn.DeletePnBridge("nw_"+name, "br-"+name)
145                                 }
146                         }
147
148                         pnCreateStore = nil
149                         inSync = true
150
151                 }
152         // Add other Types here
153         default:
154                 log.Info("Not supported cni type", "cni type", msg.GetCniType())
155         }
156 }
157
158 func shutdownHandler(errorChannel <-chan string) {
159         // Register to receive term/int signal.
160         signalChan := make(chan os.Signal, 1)
161         signal.Notify(signalChan, syscall.SIGTERM)
162         signal.Notify(signalChan, syscall.SIGINT)
163         signal.Notify(signalChan, syscall.SIGHUP)
164
165         var reason string
166         select {
167         case sig := <-signalChan:
168                 if sig == syscall.SIGHUP {
169                         log.Info("Received a SIGHUP")
170                 }
171                 reason = fmt.Sprintf("Received OS signal %v", sig)
172         case reason = <-errorChannel:
173                 log.Info("Error", "reason", reason)
174         }
175         log.Info("nfn-agent is shutting down", "reason", reason)
176 }
177
178 func shutDownAgent(reason string) {
179         // Send a failure message and give few seconds complete shutdown.
180         log.Info("shutDownAgent recieved")
181         errorChannel <- reason
182         time.Sleep(10 * time.Second)
183         // The graceful shutdown failed, terminate the process.
184         panic("Shutdown failed. Panicking.")
185 }
186
187 func main() {
188         logf.SetLogger(zap.Logger(true))
189         log.Info("nfn-agent Started")
190
191         serverAddr := os.Getenv("NFN_OPERATOR_SERVICE_HOST") + ":" + os.Getenv("NFN_OPERATOR_SERVICE_PORT")
192         // Setup ovn utilities
193         exec := kexec.New()
194         err := ovn.SetExec(exec)
195         if err != nil {
196                 log.Error(err, "Unable to setup OVN Utils")
197                 return
198         }
199         conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
200         if err != nil {
201                 log.Error(err, "fail to dial")
202                 return
203         }
204         defer conn.Close()
205         client := pb.NewNfnNotifyClient(conn)
206         errorChannel = make(chan string)
207
208         // Run client in background
209         go subscribeNotif(client)
210         shutdownHandler(errorChannel)
211
212 }