6 "google.golang.org/grpc"
8 kexec "k8s.io/utils/exec"
11 pb "ovn4nfv-k8s-plugin/internal/pkg/nfnNotify/proto"
12 "ovn4nfv-k8s-plugin/internal/pkg/ovn"
13 logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
17 //"google.golang.org/grpc/keepalive"
18 "google.golang.org/grpc/status"
19 "sigs.k8s.io/controller-runtime/pkg/log/zap"
22 var log = logf.Log.WithName("nfn-agent")
23 var errorChannel chan string
25 var pnCreateStore []*pb.Notification_ProviderNwCreate
27 // subscribe Notifications
28 func subscribeNotif(client pb.NfnNotifyClient) error {
29 log.Info("Subscribe Notification from server")
30 ctx := context.Background()
31 var n pb.SubscribeContext
32 n.NodeName = os.Getenv("NFN_NODE_NAME")
34 stream, err := client.Subscribe(ctx, &n, grpc.WaitForReady(true))
36 log.Error(err, "Subscribe", "client", client, "status", status.Code(err))
39 log.Info("Subscribe Notification success")
42 in, err := stream.Recv()
45 shutDownAgent("Stream closed")
49 log.Error(err, "Stream closed from server")
50 shutDownAgent("Stream closed from server")
53 log.Info("Got message", "msg", in)
59 func handleNotif(msg *pb.Notification) {
61 switch msg.GetCniType() {
63 switch payload := msg.Payload.(type) {
64 case *pb.Notification_ProviderNwCreate:
67 pnCreateStore = append(pnCreateStore, payload)
70 vlanID := payload.ProviderNwCreate.GetVlan().GetVlanId()
71 ln := payload.ProviderNwCreate.GetVlan().GetLogicalIntf()
72 pn := payload.ProviderNwCreate.GetVlan().GetProviderIntf()
73 name := payload.ProviderNwCreate.GetProviderNwName()
75 ln = name + "." + vlanID
77 err = ovn.CreateVlan(vlanID, pn, ln)
79 log.Error(err, "Unable to create VLAN", "vlan", ln)
82 ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
83 case *pb.Notification_ProviderNwRemove:
85 // Unexpected Remove message
88 ln := payload.ProviderNwRemove.GetVlanLogicalIntf()
89 name := payload.ProviderNwRemove.GetProviderNwName()
91 ovn.DeletePnBridge("nw_"+name, "br-"+name)
92 case *pb.Notification_InSync:
93 // Read config from node
94 vlanList := ovn.GetVlan()
95 pnBridgeList := ovn.GetPnBridge("nfn")
96 diffVlan := make(map[string]bool)
97 diffPnBridge := make(map[string]bool)
99 for _, pn := range pnCreateStore {
100 id := pn.ProviderNwCreate.GetVlan().GetVlanId()
101 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
102 pn := pn.ProviderNwCreate.GetVlan().GetProviderIntf()
106 for _, vlan := range vlanList {
108 // VLAN already present
109 diffVlan[vlan] = true
114 err = ovn.CreateVlan(id, pn, ln)
116 log.Error(err, "Unable to create VLAN", "vlan", ln)
121 for _, pn := range pnCreateStore {
122 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
123 name := pn.ProviderNwCreate.GetProviderNwName()
124 for _, br := range pnBridgeList {
125 pnName := strings.Replace(br, "br-", "", -1)
127 diffPnBridge[br] = true
131 // Provider Network not found
132 ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
134 // Delete VLAN not in the list
135 for _, vlan := range vlanList {
136 if diffVlan[vlan] == false {
140 // Delete Provider Bridge not in the list
141 for _, br := range pnBridgeList {
142 if diffPnBridge[br] == false {
143 name := strings.Replace(br, "br-", "", -1)
144 ovn.DeletePnBridge("nw_"+name, "br-"+name)
152 // Add other Types here
154 log.Info("Not supported cni type", "cni type", msg.GetCniType())
158 func shutdownHandler(errorChannel <-chan string) {
159 // Register to receive term/int signal.
160 signalChan := make(chan os.Signal, 1)
161 signal.Notify(signalChan, syscall.SIGTERM)
162 signal.Notify(signalChan, syscall.SIGINT)
163 signal.Notify(signalChan, syscall.SIGHUP)
167 case sig := <-signalChan:
168 if sig == syscall.SIGHUP {
169 log.Info("Received a SIGHUP")
171 reason = fmt.Sprintf("Received OS signal %v", sig)
172 case reason = <-errorChannel:
173 log.Info("Error", "reason", reason)
175 log.Info("nfn-agent is shutting down", "reason", reason)
178 func shutDownAgent(reason string) {
179 // Send a failure message and give few seconds complete shutdown.
180 log.Info("shutDownAgent recieved")
181 errorChannel <- reason
182 time.Sleep(10 * time.Second)
183 // The graceful shutdown failed, terminate the process.
184 panic("Shutdown failed. Panicking.")
188 logf.SetLogger(zap.Logger(true))
189 log.Info("nfn-agent Started")
191 serverAddr := os.Getenv("NFN_OPERATOR_SERVICE_HOST") + ":" + os.Getenv("NFN_OPERATOR_SERVICE_PORT")
192 // Setup ovn utilities
194 err := ovn.SetExec(exec)
196 log.Error(err, "Unable to setup OVN Utils")
199 conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
201 log.Error(err, "fail to dial")
205 client := pb.NewNfnNotifyClient(conn)
206 errorChannel = make(chan string)
208 // Run client in background
209 go subscribeNotif(client)
210 shutdownHandler(errorChannel)