9 cs "ovn4nfv-k8s-plugin/internal/pkg/cniserver"
10 pb "ovn4nfv-k8s-plugin/internal/pkg/nfnNotify/proto"
11 "ovn4nfv-k8s-plugin/internal/pkg/ovn"
16 "google.golang.org/grpc"
17 "k8s.io/client-go/kubernetes"
18 "k8s.io/client-go/rest"
19 kexec "k8s.io/utils/exec"
20 logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
22 //"google.golang.org/grpc/keepalive"
24 "ovn4nfv-k8s-plugin/cmd/ovn4nfvk8s-cni/app"
26 "google.golang.org/grpc/status"
27 "sigs.k8s.io/controller-runtime/pkg/log/zap"
30 var log = logf.Log.WithName("nfn-agent")
31 var errorChannel chan string
33 var pnCreateStore []*pb.Notification_ProviderNwCreate
35 // subscribe Notifications
36 func subscribeNotif(client pb.NfnNotifyClient) error {
37 log.Info("Subscribe Notification from server")
38 ctx := context.Background()
39 var n pb.SubscribeContext
40 n.NodeName = os.Getenv("NFN_NODE_NAME")
42 stream, err := client.Subscribe(ctx, &n, grpc.WaitForReady(true))
44 log.Error(err, "Subscribe", "client", client, "status", status.Code(err))
47 log.Info("Subscribe Notification success")
50 in, err := stream.Recv()
53 shutDownAgent("Stream closed")
57 log.Error(err, "Stream closed from server")
58 shutDownAgent("Stream closed from server")
61 log.Info("Got message", "msg", in)
67 func createVlanProvidernetwork(payload *pb.Notification_ProviderNwCreate) error {
69 vlanID := payload.ProviderNwCreate.GetVlan().GetVlanId()
70 ln := payload.ProviderNwCreate.GetVlan().GetLogicalIntf()
71 pn := payload.ProviderNwCreate.GetVlan().GetProviderIntf()
72 name := payload.ProviderNwCreate.GetProviderNwName()
74 ln = name + "." + vlanID
76 err = ovn.CreateVlan(vlanID, pn, ln)
78 log.Error(err, "Unable to create VLAN", "vlan", ln)
81 err = ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
83 log.Error(err, "Unable to create vlan direct bridge", "vlan", pn)
89 func createDirectProvidernetwork(payload *pb.Notification_ProviderNwCreate) error {
91 pn := payload.ProviderNwCreate.GetDirect().GetProviderIntf()
92 name := payload.ProviderNwCreate.GetProviderNwName()
93 err = ovn.CreatePnBridge("nw_"+name, "br-"+name, pn)
95 log.Error(err, "Unable to create direct bridge", "direct", pn)
101 func deleteVlanProvidernetwork(payload *pb.Notification_ProviderNwRemove) {
102 ln := payload.ProviderNwRemove.GetVlanLogicalIntf()
103 name := payload.ProviderNwRemove.GetProviderNwName()
105 ovn.DeletePnBridge("nw_"+name, "br-"+name)
108 func deleteDirectProvidernetwork(payload *pb.Notification_ProviderNwRemove) {
109 ln := payload.ProviderNwRemove.GetVlanLogicalIntf()
110 name := payload.ProviderNwRemove.GetProviderNwName()
112 ovn.DeletePnBridge("nw_"+name, "br-"+name)
115 func inSyncVlanProvidernetwork() {
117 // Read config from node
118 vlanList := ovn.GetVlan()
119 pnBridgeList := ovn.GetPnBridge("nfn")
120 diffVlan := make(map[string]bool)
121 diffPnBridge := make(map[string]bool)
123 for _, pn := range pnCreateStore {
124 if pn.ProviderNwCreate.GetVlan() != nil {
127 id := pn.ProviderNwCreate.GetVlan().GetVlanId()
128 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
129 pn := pn.ProviderNwCreate.GetVlan().GetProviderIntf()
133 for _, vlan := range vlanList {
135 // VLAN already present
136 diffVlan[vlan] = true
141 err = ovn.CreateVlan(id, pn, ln)
143 log.Error(err, "Unable to create VLAN", "vlan", ln)
148 for _, pn := range pnCreateStore {
149 if pn.ProviderNwCreate.GetVlan() != nil {
152 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
153 name := pn.ProviderNwCreate.GetProviderNwName()
154 for _, br := range pnBridgeList {
155 pnName := strings.Replace(br, "br-", "", -1)
157 diffPnBridge[br] = true
161 // Provider Network not found
162 ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
164 // Delete VLAN not in the list
165 for _, vlan := range vlanList {
166 if diffVlan[vlan] == false {
170 // Delete Provider Bridge not in the list
171 for _, br := range pnBridgeList {
172 if diffPnBridge[br] == false {
173 name := strings.Replace(br, "br-", "", -1)
174 ovn.DeletePnBridge("nw_"+name, "br-"+name)
179 func inSyncDirectProvidernetwork() {
180 // Read config from node
181 pnBridgeList := ovn.GetPnBridge("nfn")
182 diffPnBridge := make(map[string]bool)
184 for _, pn := range pnCreateStore {
185 if pn.ProviderNwCreate.GetDirect() != nil {
188 pr := pn.ProviderNwCreate.GetDirect().GetProviderIntf()
189 name := pn.ProviderNwCreate.GetProviderNwName()
190 for _, br := range pnBridgeList {
191 pnName := strings.Replace(br, "br-", "", -1)
193 diffPnBridge[br] = true
194 continue DIRECTPRNETWORK
197 // Provider Network not found
198 ovn.CreatePnBridge("nw_"+name, "br-"+name, pr)
200 // Delete Provider Bridge not in the list
201 for _, br := range pnBridgeList {
202 if diffPnBridge[br] == false {
203 name := strings.Replace(br, "br-", "", -1)
204 ovn.DeletePnBridge("nw_"+name, "br-"+name)
209 func createNodeOVSInternalPort(payload *pb.Notification_InSync) error {
210 nodeIntfIPAddr := strings.Trim(strings.TrimSpace(payload.InSync.GetNodeIntfIpAddress()), "\"")
211 nodeIntfMacAddr := strings.Trim(strings.TrimSpace(payload.InSync.GetNodeIntfMacAddress()), "\"")
212 nodeName := os.Getenv("NFN_NODE_NAME")
214 err := app.CreateNodeOVSInternalPort(nodeIntfIPAddr, nodeIntfMacAddr, nodeName)
222 func handleNotif(msg *pb.Notification) {
223 switch msg.GetCniType() {
225 switch payload := msg.Payload.(type) {
226 case *pb.Notification_ProviderNwCreate:
229 pnCreateStore = append(pnCreateStore, payload)
232 if payload.ProviderNwCreate.GetVlan() != nil {
233 err := createVlanProvidernetwork(payload)
239 if payload.ProviderNwCreate.GetDirect() != nil {
240 err := createDirectProvidernetwork(payload)
245 case *pb.Notification_ProviderNwRemove:
247 // Unexpected Remove message
251 if payload.ProviderNwRemove.GetVlanLogicalIntf() != "" {
252 deleteVlanProvidernetwork(payload)
255 if payload.ProviderNwRemove.GetDirectProviderIntf() != "" {
256 deleteDirectProvidernetwork(payload)
259 case *pb.Notification_InSync:
260 if payload.InSync.GetNodeIntfIpAddress() != "" && payload.InSync.GetNodeIntfMacAddress() != "" {
261 err := createNodeOVSInternalPort(payload)
266 inSyncVlanProvidernetwork()
267 inSyncDirectProvidernetwork()
272 // Add other Types here
274 log.Info("Not supported cni type", "cni type", msg.GetCniType())
278 func shutdownHandler(errorChannel <-chan string) {
279 // Register to receive term/int signal.
280 signalChan := make(chan os.Signal, 1)
281 signal.Notify(signalChan, syscall.SIGTERM)
282 signal.Notify(signalChan, syscall.SIGINT)
283 signal.Notify(signalChan, syscall.SIGHUP)
287 case sig := <-signalChan:
288 if sig == syscall.SIGHUP {
289 log.Info("Received a SIGHUP")
291 reason = fmt.Sprintf("Received OS signal %v", sig)
292 case reason = <-errorChannel:
293 log.Info("Error", "reason", reason)
295 log.Info("nfn-agent is shutting down", "reason", reason)
298 func shutDownAgent(reason string) {
299 // Send a failure message and give few seconds complete shutdown.
300 log.Info("shutDownAgent recieved")
301 errorChannel <- reason
302 time.Sleep(10 * time.Second)
303 // The graceful shutdown failed, terminate the process.
304 panic("Shutdown failed. Panicking.")
308 logf.SetLogger(zap.Logger(true))
309 log.Info("nfn-agent Started")
311 serverAddr := os.Getenv("NFN_OPERATOR_SERVICE_HOST") + ":" + os.Getenv("NFN_OPERATOR_SERVICE_PORT")
312 // Setup ovn utilities
314 err := ovn.SetExec(exec)
316 log.Error(err, "Unable to setup OVN Utils")
319 conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
321 log.Error(err, "fail to dial")
325 client := pb.NewNfnNotifyClient(conn)
326 errorChannel = make(chan string)
328 // creates the in-cluster config
329 config, err := rest.InClusterConfig()
331 log.Error(err, "Unable to create in-cluster config")
335 // creates the clientset
336 clientset, err := kubernetes.NewForConfig(config)
338 log.Error(err, "Unable to create clientset for in-cluster config")
342 cniserver := cs.NewCNIServer("", clientset)
343 err = cniserver.Start(cs.HandleCNIcommandRequest)
345 log.Error(err, "Unable to start cni server")
348 // Run client in background
349 go subscribeNotif(client)
350 shutdownHandler(errorChannel)