2 * Copyright 2020 Intel Corporation, Inc
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
25 cs "ovn4nfv-k8s-plugin/internal/pkg/cniserver"
26 pb "ovn4nfv-k8s-plugin/internal/pkg/nfnNotify/proto"
27 "ovn4nfv-k8s-plugin/internal/pkg/ovn"
28 chaining "ovn4nfv-k8s-plugin/internal/pkg/utils"
33 "google.golang.org/grpc"
34 "k8s.io/client-go/kubernetes"
35 "k8s.io/client-go/rest"
36 kexec "k8s.io/utils/exec"
37 logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
39 //"google.golang.org/grpc/keepalive"
41 "ovn4nfv-k8s-plugin/cmd/ovn4nfvk8s-cni/app"
43 "google.golang.org/grpc/status"
44 "sigs.k8s.io/controller-runtime/pkg/log/zap"
47 var log = logf.Log.WithName("nfn-agent")
48 var errorChannel chan string
50 var pnCreateStore []*pb.Notification_ProviderNwCreate
52 // subscribe Notifications
53 func subscribeNotif(client pb.NfnNotifyClient) error {
54 log.Info("Subscribe Notification from server")
55 ctx := context.Background()
56 var n pb.SubscribeContext
57 n.NodeName = os.Getenv("NFN_NODE_NAME")
59 stream, err := client.Subscribe(ctx, &n, grpc.WaitForReady(true))
61 log.Error(err, "Subscribe", "client", client, "status", status.Code(err))
64 log.Info("Subscribe Notification success")
67 in, err := stream.Recv()
70 shutDownAgent("Stream closed")
74 log.Error(err, "Stream closed from server")
75 shutDownAgent("Stream closed from server")
78 log.Info("Got message", "msg", in)
84 func createVlanProvidernetwork(payload *pb.Notification_ProviderNwCreate) error {
86 vlanID := payload.ProviderNwCreate.GetVlan().GetVlanId()
87 ln := payload.ProviderNwCreate.GetVlan().GetLogicalIntf()
88 pn := payload.ProviderNwCreate.GetVlan().GetProviderIntf()
89 name := payload.ProviderNwCreate.GetProviderNwName()
91 ln = name + "." + vlanID
93 err = ovn.CreateVlan(vlanID, pn, ln)
95 log.Error(err, "Unable to create VLAN", "vlan", ln)
98 err = ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
100 log.Error(err, "Unable to create vlan direct bridge", "vlan", pn)
106 func createDirectProvidernetwork(payload *pb.Notification_ProviderNwCreate) error {
108 pn := payload.ProviderNwCreate.GetDirect().GetProviderIntf()
109 name := payload.ProviderNwCreate.GetProviderNwName()
110 err = ovn.CreatePnBridge("nw_"+name, "br-"+name, pn)
112 log.Error(err, "Unable to create direct bridge", "direct", pn)
118 func deleteVlanProvidernetwork(payload *pb.Notification_ProviderNwRemove) {
119 ln := payload.ProviderNwRemove.GetVlanLogicalIntf()
120 name := payload.ProviderNwRemove.GetProviderNwName()
122 ovn.DeletePnBridge("nw_"+name, "br-"+name)
125 func deleteDirectProvidernetwork(payload *pb.Notification_ProviderNwRemove) {
126 ln := payload.ProviderNwRemove.GetVlanLogicalIntf()
127 name := payload.ProviderNwRemove.GetProviderNwName()
129 ovn.DeletePnBridge("nw_"+name, "br-"+name)
132 func inSyncVlanProvidernetwork() {
134 // Read config from node
135 vlanList := ovn.GetVlan()
136 pnBridgeList := ovn.GetPnBridge("nfn")
137 diffVlan := make(map[string]bool)
138 diffPnBridge := make(map[string]bool)
140 for _, pn := range pnCreateStore {
141 if pn.ProviderNwCreate.GetVlan() != nil {
144 id := pn.ProviderNwCreate.GetVlan().GetVlanId()
145 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
146 pn := pn.ProviderNwCreate.GetVlan().GetProviderIntf()
150 for _, vlan := range vlanList {
152 // VLAN already present
153 diffVlan[vlan] = true
158 err = ovn.CreateVlan(id, pn, ln)
160 log.Error(err, "Unable to create VLAN", "vlan", ln)
165 for _, pn := range pnCreateStore {
166 if pn.ProviderNwCreate.GetVlan() != nil {
169 ln := pn.ProviderNwCreate.GetVlan().GetLogicalIntf()
170 name := pn.ProviderNwCreate.GetProviderNwName()
171 for _, br := range pnBridgeList {
172 pnName := strings.Replace(br, "br-", "", -1)
174 diffPnBridge[br] = true
178 // Provider Network not found
179 ovn.CreatePnBridge("nw_"+name, "br-"+name, ln)
181 // Delete VLAN not in the list
182 for _, vlan := range vlanList {
183 if diffVlan[vlan] == false {
187 // Delete Provider Bridge not in the list
188 for _, br := range pnBridgeList {
189 if diffPnBridge[br] == false {
190 name := strings.Replace(br, "br-", "", -1)
191 ovn.DeletePnBridge("nw_"+name, "br-"+name)
196 func inSyncDirectProvidernetwork() {
197 // Read config from node
198 pnBridgeList := ovn.GetPnBridge("nfn")
199 diffPnBridge := make(map[string]bool)
201 for _, pn := range pnCreateStore {
202 if pn.ProviderNwCreate.GetDirect() != nil {
205 pr := pn.ProviderNwCreate.GetDirect().GetProviderIntf()
206 name := pn.ProviderNwCreate.GetProviderNwName()
207 for _, br := range pnBridgeList {
208 pnName := strings.Replace(br, "br-", "", -1)
210 diffPnBridge[br] = true
211 continue DIRECTPRNETWORK
214 // Provider Network not found
215 ovn.CreatePnBridge("nw_"+name, "br-"+name, pr)
217 // Delete Provider Bridge not in the list
218 for _, br := range pnBridgeList {
219 if diffPnBridge[br] == false {
220 name := strings.Replace(br, "br-", "", -1)
221 ovn.DeletePnBridge("nw_"+name, "br-"+name)
226 func createNodeOVSInternalPort(payload *pb.Notification_InSync) error {
227 nodeIntfIPAddr := strings.Trim(strings.TrimSpace(payload.InSync.GetNodeIntfIpAddress()), "\"")
228 nodeIntfMacAddr := strings.Trim(strings.TrimSpace(payload.InSync.GetNodeIntfMacAddress()), "\"")
229 nodeName := os.Getenv("NFN_NODE_NAME")
231 err := app.CreateNodeOVSInternalPort(nodeIntfIPAddr, nodeIntfMacAddr, nodeName)
239 func handleNotif(msg *pb.Notification) {
240 switch msg.GetCniType() {
242 switch payload := msg.Payload.(type) {
243 case *pb.Notification_ProviderNwCreate:
246 pnCreateStore = append(pnCreateStore, payload)
249 if payload.ProviderNwCreate.GetVlan() != nil {
250 err := createVlanProvidernetwork(payload)
256 if payload.ProviderNwCreate.GetDirect() != nil {
257 err := createDirectProvidernetwork(payload)
262 case *pb.Notification_ProviderNwRemove:
264 // Unexpected Remove message
268 if payload.ProviderNwRemove.GetVlanLogicalIntf() != "" {
269 deleteVlanProvidernetwork(payload)
272 if payload.ProviderNwRemove.GetDirectProviderIntf() != "" {
273 deleteDirectProvidernetwork(payload)
276 case *pb.Notification_ContainterRtInsert:
277 id := payload.ContainterRtInsert.GetContainerId()
278 pid, err := chaining.GetPidForContainer(id)
280 log.Error(err, "Failed to get pid", "containerID", id)
283 err = chaining.ContainerAddRoute(pid, payload.ContainterRtInsert.GetRoute())
288 case *pb.Notification_InSync:
289 inSyncVlanProvidernetwork()
290 inSyncDirectProvidernetwork()
293 if payload.InSync.GetNodeIntfIpAddress() != "" && payload.InSync.GetNodeIntfMacAddress() != "" {
294 err := createNodeOVSInternalPort(payload)
301 // Add other Types here
303 log.Info("Not supported cni type", "cni type", msg.GetCniType())
307 func shutdownHandler(errorChannel <-chan string) {
308 // Register to receive term/int signal.
309 signalChan := make(chan os.Signal, 1)
310 signal.Notify(signalChan, syscall.SIGTERM)
311 signal.Notify(signalChan, syscall.SIGINT)
312 signal.Notify(signalChan, syscall.SIGHUP)
316 case sig := <-signalChan:
317 if sig == syscall.SIGHUP {
318 log.Info("Received a SIGHUP")
320 reason = fmt.Sprintf("Received OS signal %v", sig)
321 case reason = <-errorChannel:
322 log.Info("Error", "reason", reason)
324 log.Info("nfn-agent is shutting down", "reason", reason)
327 func shutDownAgent(reason string) {
328 // Send a failure message and give few seconds complete shutdown.
329 log.Info("shutDownAgent recieved")
330 errorChannel <- reason
331 time.Sleep(10 * time.Second)
332 // The graceful shutdown failed, terminate the process.
333 panic("Shutdown failed. Panicking.")
337 logf.SetLogger(zap.Logger(true))
338 log.Info("nfn-agent Started")
340 serverAddr := os.Getenv("NFN_OPERATOR_SERVICE_HOST") + ":" + os.Getenv("NFN_OPERATOR_SERVICE_PORT")
341 // Setup ovn utilities
343 err := ovn.SetExec(exec)
345 log.Error(err, "Unable to setup OVN Utils")
348 conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
350 log.Error(err, "fail to dial")
354 client := pb.NewNfnNotifyClient(conn)
355 errorChannel = make(chan string)
357 // creates the in-cluster config
358 config, err := rest.InClusterConfig()
360 log.Error(err, "Unable to create in-cluster config")
364 // creates the clientset
365 clientset, err := kubernetes.NewForConfig(config)
367 log.Error(err, "Unable to create clientset for in-cluster config")
371 cniserver := cs.NewCNIServer("", clientset)
372 err = cniserver.Start(cs.HandleCNIcommandRequest)
374 log.Error(err, "Unable to start cni server")
377 // Run client in background
378 go subscribeNotif(client)
379 shutdownHandler(errorChannel)