6 "google.golang.org/grpc"
8 kexec "k8s.io/utils/exec"
11 pb "ovn4nfv-k8s-plugin/internal/pkg/nfnNotify/proto"
12 cs "ovn4nfv-k8s-plugin/internal/pkg/cniserver"
13 "ovn4nfv-k8s-plugin/internal/pkg/ovn"
14 logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
15 "k8s.io/client-go/kubernetes"
16 "k8s.io/client-go/rest"
20 //"google.golang.org/grpc/keepalive"
21 "google.golang.org/grpc/status"
22 "sigs.k8s.io/controller-runtime/pkg/log/zap"
25 var log = logf.Log.WithName("nfn-agent")
26 var errorChannel chan string
28 var pnCreateStore []*pb.Notification_ProviderNwCreate
30 // subscribe Notifications
31 func subscribeNotif(client pb.NfnNotifyClient) error {
32 log.Info("Subscribe Notification from server")
33 ctx := context.Background()
34 var n pb.SubscribeContext
35 n.NodeName = os.Getenv("NFN_NODE_NAME")
37 stream, err := client.Subscribe(ctx, &n, grpc.WaitForReady(true))
39 log.Error(err, "Subscribe", "client", client, "status", status.Code(err))
42 log.Info("Subscribe Notification success")
45 in, err := stream.Recv()
48 shutDownAgent("Stream closed")
52 log.Error(err, "Stream closed from server")
53 shutDownAgent("Stream closed from server")
56 log.Info("Got message", "msg", in)
62 func createVlanProvidernetwork(payload *pb.Notification_ProviderNwCreate) error {
64 vlanID := payload.ProviderNwCreate.GetVlan().GetVlanId()
65 ln := payload.ProviderNwCreate.GetVlan().GetLogicalIntf()
66 pn := payload.ProviderNwCreate.GetVlan().GetProviderIntf()
67 name := payload.ProviderNwCreate.GetProviderNwName()
69 ln = name + "." + vlanID
71 err = ovn.CreateVlan(vlanID, pn, ln)
73 log.Error(err, "Unable to create VLAN", "vlan", ln)
76 err = ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
78 log.Error(err, "Unable to create vlan direct bridge", "vlan", pn)
84 func createDirectProvidernetwork(payload *pb.Notification_ProviderNwCreate) error {
86 pn := payload.ProviderNwCreate.GetDirect().GetProviderIntf()
87 name := payload.ProviderNwCreate.GetProviderNwName()
88 err = ovn.CreatePnBridge("nw_"+name, "br-"+name, pn)
90 log.Error(err, "Unable to create direct bridge", "direct", pn)
96 func deleteVlanProvidernetwork(payload *pb.Notification_ProviderNwRemove) {
97 ln := payload.ProviderNwRemove.GetVlanLogicalIntf()
98 name := payload.ProviderNwRemove.GetProviderNwName()
100 ovn.DeletePnBridge("nw_"+name, "br-"+name)
103 func deleteDirectProvidernetwork(payload *pb.Notification_ProviderNwRemove) {
104 ln := payload.ProviderNwRemove.GetVlanLogicalIntf()
105 name := payload.ProviderNwRemove.GetProviderNwName()
107 ovn.DeletePnBridge("nw_"+name, "br-"+name)
110 func inSyncVlanProvidernetwork() {
112 // Read config from node
113 vlanList := ovn.GetVlan()
114 pnBridgeList := ovn.GetPnBridge("nfn")
115 diffVlan := make(map[string]bool)
116 diffPnBridge := make(map[string]bool)
118 for _, pn := range pnCreateStore {
119 if pn.ProviderNwCreate.GetVlan() != nil {
122 id := pn.ProviderNwCreate.GetVlan().GetVlanId()
123 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
124 pn := pn.ProviderNwCreate.GetVlan().GetProviderIntf()
128 for _, vlan := range vlanList {
130 // VLAN already present
131 diffVlan[vlan] = true
136 err = ovn.CreateVlan(id, pn, ln)
138 log.Error(err, "Unable to create VLAN", "vlan", ln)
143 for _, pn := range pnCreateStore {
144 if pn.ProviderNwCreate.GetVlan() != nil {
147 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
148 name := pn.ProviderNwCreate.GetProviderNwName()
149 for _, br := range pnBridgeList {
150 pnName := strings.Replace(br, "br-", "", -1)
152 diffPnBridge[br] = true
156 // Provider Network not found
157 ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
159 // Delete VLAN not in the list
160 for _, vlan := range vlanList {
161 if diffVlan[vlan] == false {
165 // Delete Provider Bridge not in the list
166 for _, br := range pnBridgeList {
167 if diffPnBridge[br] == false {
168 name := strings.Replace(br, "br-", "", -1)
169 ovn.DeletePnBridge("nw_"+name, "br-"+name)
174 func inSyncDirectProvidernetwork() {
175 // Read config from node
176 pnBridgeList := ovn.GetPnBridge("nfn")
177 diffPnBridge := make(map[string]bool)
179 for _, pn := range pnCreateStore {
180 if pn.ProviderNwCreate.GetDirect() != nil {
183 pr := pn.ProviderNwCreate.GetDirect().GetProviderIntf()
184 name := pn.ProviderNwCreate.GetProviderNwName()
185 for _, br := range pnBridgeList {
186 pnName := strings.Replace(br, "br-", "", -1)
188 diffPnBridge[br] = true
189 continue DIRECTPRNETWORK
192 // Provider Network not found
193 ovn.CreatePnBridge("nw_"+name, "br-"+name, pr)
195 // Delete Provider Bridge not in the list
196 for _, br := range pnBridgeList {
197 if diffPnBridge[br] == false {
198 name := strings.Replace(br, "br-", "", -1)
199 ovn.DeletePnBridge("nw_"+name, "br-"+name)
204 func handleNotif(msg *pb.Notification) {
205 switch msg.GetCniType() {
207 switch payload := msg.Payload.(type) {
208 case *pb.Notification_ProviderNwCreate:
211 pnCreateStore = append(pnCreateStore, payload)
214 if payload.ProviderNwCreate.GetVlan() != nil {
215 err := createVlanProvidernetwork(payload)
221 if payload.ProviderNwCreate.GetDirect() != nil {
222 err := createDirectProvidernetwork(payload)
227 case *pb.Notification_ProviderNwRemove:
229 // Unexpected Remove message
233 if payload.ProviderNwRemove.GetVlanLogicalIntf() != "" {
234 deleteVlanProvidernetwork(payload)
237 if payload.ProviderNwRemove.GetDirectProviderIntf() != "" {
238 deleteDirectProvidernetwork(payload)
241 case *pb.Notification_InSync:
242 inSyncVlanProvidernetwork()
243 inSyncDirectProvidernetwork()
248 // Add other Types here
250 log.Info("Not supported cni type", "cni type", msg.GetCniType())
254 func shutdownHandler(errorChannel <-chan string) {
255 // Register to receive term/int signal.
256 signalChan := make(chan os.Signal, 1)
257 signal.Notify(signalChan, syscall.SIGTERM)
258 signal.Notify(signalChan, syscall.SIGINT)
259 signal.Notify(signalChan, syscall.SIGHUP)
263 case sig := <-signalChan:
264 if sig == syscall.SIGHUP {
265 log.Info("Received a SIGHUP")
267 reason = fmt.Sprintf("Received OS signal %v", sig)
268 case reason = <-errorChannel:
269 log.Info("Error", "reason", reason)
271 log.Info("nfn-agent is shutting down", "reason", reason)
274 func shutDownAgent(reason string) {
275 // Send a failure message and give few seconds complete shutdown.
276 log.Info("shutDownAgent recieved")
277 errorChannel <- reason
278 time.Sleep(10 * time.Second)
279 // The graceful shutdown failed, terminate the process.
280 panic("Shutdown failed. Panicking.")
284 logf.SetLogger(zap.Logger(true))
285 log.Info("nfn-agent Started")
287 serverAddr := os.Getenv("NFN_OPERATOR_SERVICE_HOST") + ":" + os.Getenv("NFN_OPERATOR_SERVICE_PORT")
288 // Setup ovn utilities
290 err := ovn.SetExec(exec)
292 log.Error(err, "Unable to setup OVN Utils")
295 conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
297 log.Error(err, "fail to dial")
301 client := pb.NewNfnNotifyClient(conn)
302 errorChannel = make(chan string)
304 // creates the in-cluster config
305 config, err := rest.InClusterConfig()
307 log.Error(err, "Unable to create in-cluster config")
311 // creates the clientset
312 clientset, err := kubernetes.NewForConfig(config)
314 log.Error(err, "Unable to create clientset for in-cluster config")
318 cniserver := cs.NewCNIServer("",clientset)
319 err = cniserver.Start(cs.HandleCNIcommandRequest)
321 log.Error(err, "Unable to start cni server")
324 // Run client in background
325 go subscribeNotif(client)
326 shutdownHandler(errorChannel)