Merge "Add a check for network creation"
[ovn4nfv-k8s-plugin.git] / internal / pkg / nfnNotify / server.go
1 package nfn
2
3 import (
4         "fmt"
5         "google.golang.org/grpc"
6         "google.golang.org/grpc/reflection"
7         "net"
8         pb "ovn4nfv-k8s-plugin/internal/pkg/nfnNotify/proto"
9         v1alpha1 "ovn4nfv-k8s-plugin/pkg/apis/k8s/v1alpha1"
10         logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
11         "strings"
12         "k8s.io/apimachinery/pkg/apis/meta/v1"
13         "k8s.io/client-go/kubernetes"
14         "k8s.io/client-go/rest"
15         clientset "ovn4nfv-k8s-plugin/pkg/generated/clientset/versioned"
16 )
17
18 var log = logf.Log.WithName("rpc-server")
19
20 type client struct {
21         context *pb.SubscribeContext
22         stream  pb.NfnNotify_SubscribeServer
23 }
24
25 type serverDB struct {
26         name       string
27         clientList map[string]client
28 }
29
30 var notifServer *serverDB
31 var stopChan chan interface{}
32
33 var pnClientset *clientset.Clientset
34 var kubeClientset *kubernetes.Clientset
35
36 func newServer() *serverDB {
37         return &serverDB{name: "nfnNotifServer", clientList: make(map[string]client)}
38 }
39
40 // Subscribe stores the client information & sends data
41 func (s *serverDB) Subscribe(sc *pb.SubscribeContext, ss pb.NfnNotify_SubscribeServer) error {
42         nodeName := sc.GetNodeName()
43         log.Info("Subscribe request from node", "Node Name", nodeName)
44         if nodeName == "" {
45                 return fmt.Errorf("Node name can't be empty")
46         }
47         cp := client{
48                 context: sc,
49                 stream:  ss,
50         }
51         s.clientList[nodeName] = cp
52
53         providerNetworklist, err := pnClientset.K8sV1alpha1().ProviderNetworks("default").List(v1.ListOptions{})
54         if err == nil {
55                 for _, pn := range providerNetworklist.Items {
56                         log.Info("Send message", "Provider Network", pn.GetName())
57                         SendNotif(&pn, "create", nodeName)
58                 }
59         }
60         inSyncMsg := pb.Notification{
61                 CniType: "ovn4nfv",
62                 Payload: &pb.Notification_InSync{
63                         InSync: &pb.InSync{},
64                 },
65         }
66         log.Info("Send Insync")
67         if err = cp.stream.Send(&inSyncMsg); err != nil {
68                 log.Error(err, "Unable to send sync", "node name", nodeName)
69         }
70         log.Info("Subscribe Completed")
71         // Keep stream open
72         for {
73                 select {
74                 case <-stopChan:
75                 }
76         }
77 }
78
79 func (s *serverDB) GetClient(nodeName string) client {
80         if val, ok := s.clientList[nodeName]; ok {
81                 return val
82         }
83         return client{}
84 }
85
86 func updatePnStatus(pn *v1alpha1.ProviderNetwork, status string) error {
87         pnCopy := pn.DeepCopy()
88         pnCopy.Status.State = status
89         _, err := pnClientset.K8sV1alpha1().ProviderNetworks(pn.Namespace).Update(pnCopy)
90         return err
91 }
92
93 func createMsg(pn *v1alpha1.ProviderNetwork) pb.Notification {
94         msg := pb.Notification{
95                 CniType: "ovn4nfv",
96                 Payload: &pb.Notification_ProviderNwCreate{
97                         ProviderNwCreate: &pb.ProviderNetworkCreate{
98                                 ProviderNwName: pn.Name,
99                                 Vlan: &pb.VlanInfo{
100                                         VlanId:       pn.Spec.Vlan.VlanId,
101                                         ProviderIntf: pn.Spec.Vlan.ProviderInterfaceName,
102                                         LogicalIntf:  pn.Spec.Vlan.LogicalInterfaceName,
103                                 },
104                         },
105                 },
106         }
107         return msg
108 }
109
110 func deleteMsg(pn *v1alpha1.ProviderNetwork) pb.Notification {
111         msg := pb.Notification{
112                 CniType: "ovn4nfv",
113                 Payload: &pb.Notification_ProviderNwRemove{
114                         ProviderNwRemove: &pb.ProviderNetworkRemove{
115                                 ProviderNwName:  pn.Name,
116                                 VlanLogicalIntf: pn.Spec.Vlan.LogicalInterfaceName,
117                         },
118                 },
119         }
120         return msg
121 }
122
123 //SendNotif to client
124 func SendNotif(pn *v1alpha1.ProviderNetwork, msgType string, nodeReq string) error {
125         var msg pb.Notification
126         var err error
127
128         switch {
129         case pn.Spec.CniType == "ovn4nfv":
130                 switch {
131                 case pn.Spec.ProviderNetType == "VLAN":
132                         if msgType == "create" {
133                                 msg = createMsg(pn)
134                         } else if msgType == "delete" {
135                                 msg = deleteMsg(pn)
136                         }
137                         if strings.EqualFold(pn.Spec.Vlan.VlanNodeSelector, "SPECIFIC") {
138                                 for _, label := range pn.Spec.Vlan.NodeLabelList {
139                                         l := strings.Split(label, "=")
140                                         if len(l) == 0 {
141                                                 log.Error(fmt.Errorf("Syntax error label: %v", label), "NodeListIterator")
142                                                 return nil
143                                         }
144                                 }
145                                 labels := strings.Join(pn.Spec.Vlan.NodeLabelList[:], ",")
146                                 err = sendMsg(msg, labels, "specific", nodeReq)
147                         } else if strings.EqualFold(pn.Spec.Vlan.VlanNodeSelector, "ALL") {
148                                 err = sendMsg(msg, "", "all", nodeReq)
149                         } else if strings.EqualFold(pn.Spec.Vlan.VlanNodeSelector, "ANY") {
150                                 if pn.Status.State != v1alpha1.Created {
151                                         err = sendMsg(msg, "", "any", nodeReq)
152                                         if err == nil {
153                                                 updatePnStatus(pn, v1alpha1.Created)
154                                         }
155                                 }
156                         }
157                 default:
158                         return fmt.Errorf("Unsupported Provider Network type")
159                 }
160         default:
161                 return fmt.Errorf("Unsupported CNI type")
162         }
163         return err
164 }
165
166 // sendMsg send notification to client
167 func sendMsg(msg pb.Notification, labels string, option string, nodeReq string) error {
168         if option == "all" {
169                 for name, client := range notifServer.clientList {
170                         if nodeReq != "" && nodeReq != name {
171                                 continue
172                         }
173                         if client.stream != nil {
174                                 if err := client.stream.Send(&msg); err != nil {
175                                         log.Error(err, "Msg Send failed", "Node name", name)
176                                 }
177                         }
178                 }
179                 return nil
180         } else if option == "any" {
181                 // Always select the first
182                 for _, client := range notifServer.clientList {
183                         if client.stream != nil {
184                                 if err := client.stream.Send(&msg); err != nil {
185                                         return err
186                                 }
187                                 // return after first successful send
188                                 return nil
189                         }
190                 }
191                 return nil
192         }
193         // This is specific case
194         for name := range nodeListIterator(labels) {
195                 if nodeReq != "" && nodeReq != name {
196                         continue
197                 }
198                 client := notifServer.GetClient(name)
199                 if client.stream != nil {
200                         if err := client.stream.Send(&msg); err != nil {
201                                 return err
202                         }
203                 }
204         }
205         return nil
206 }
207
208 func nodeListIterator(labels string) <-chan string {
209         ch := make(chan string)
210
211         lo := v1.ListOptions{LabelSelector: labels}
212         // List the Nodes matching the Labels
213         nodes, err := kubeClientset.CoreV1().Nodes().List(lo)
214         if err != nil {
215                 log.Info("No Nodes found with labels", "list:", lo)
216                 return nil
217         }
218         go func() {
219                 for _, node := range nodes.Items {
220                         log.Info("Send message to", " node:", node.ObjectMeta.Name)
221                         ch <- node.ObjectMeta.Name
222                 }
223                 close(ch)
224         }()
225         return ch
226 }
227
228 //SetupNotifServer initilizes the gRpc nfn notif server
229 func SetupNotifServer(kConfig *rest.Config) {
230
231         log.Info("Starting Notif Server")
232         var err error
233
234         // creates the clientset
235         pnClientset, err = clientset.NewForConfig(kConfig)
236         if err != nil {
237                 log.Error(err, "Error building clientset")
238         }
239         kubeClientset, err = kubernetes.NewForConfig(kConfig)
240         if err != nil {
241                 log.Error(err, "Error building Kuberenetes clientset")
242         }
243
244         stopChan = make(chan interface{})
245
246         // Start GRPC server
247         lis, err := net.Listen("tcp", ":50000")
248         if err != nil {
249                 log.Error(err, "failed to listen")
250         }
251
252         s := grpc.NewServer()
253         // Intialize Notify server
254         notifServer = newServer()
255         pb.RegisterNfnNotifyServer(s, notifServer)
256
257         reflection.Register(s)
258         log.Info("Initialization Completed")
259         if err := s.Serve(lis); err != nil {
260                 log.Error(err, "failed to serve")
261         }
262 }