Use controller runtime and operator sdk
[ovn4nfv-k8s-plugin.git] / pkg / controller / pod / pod_controller.go
1 package pod
2
3 import (
4         "context"
5         "encoding/json"
6         "fmt"
7         corev1 "k8s.io/api/core/v1"
8         "k8s.io/apimachinery/pkg/api/errors"
9         "k8s.io/apimachinery/pkg/runtime"
10         "k8s.io/apimachinery/pkg/types"
11         "ovn4nfv-k8s-plugin/internal/pkg/ovn"
12         "sigs.k8s.io/controller-runtime/pkg/client"
13         "sigs.k8s.io/controller-runtime/pkg/controller"
14         "sigs.k8s.io/controller-runtime/pkg/event"
15         "sigs.k8s.io/controller-runtime/pkg/handler"
16         "sigs.k8s.io/controller-runtime/pkg/manager"
17         "sigs.k8s.io/controller-runtime/pkg/predicate"
18         "sigs.k8s.io/controller-runtime/pkg/reconcile"
19         logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
20         "sigs.k8s.io/controller-runtime/pkg/source"
21 )
22
23 var log = logf.Log.WithName("controller_pod")
24
25 const (
26         nfnNetworkAnnotation = "k8s.plugin.opnfv.org/nfn-network"
27 )
28
29 type nfnNetwork struct {
30         Type      string                   "json:\"type\""
31         Interface []map[string]interface{} "json:\"interface\""
32 }
33
34 // Add creates a new Pod Controller and adds it to the Manager. The Manager will set fields on the Controller
35 // and Start it when the Manager is Started.
36 func Add(mgr manager.Manager) error {
37         return add(mgr, newReconciler(mgr))
38 }
39
40 // newReconciler returns a new reconcile.Reconciler
41 func newReconciler(mgr manager.Manager) reconcile.Reconciler {
42         return &ReconcilePod{client: mgr.GetClient(), scheme: mgr.GetScheme()}
43 }
44
45 // add adds a new Controller to mgr with r as the reconcile.Reconciler
46 func add(mgr manager.Manager, r reconcile.Reconciler) error {
47
48         // Create a new Controller that will call the provided Reconciler function in response
49         // to events.
50         c, err := controller.New("pod-controller", mgr, controller.Options{Reconciler: r})
51         if err != nil {
52                 return err
53         }
54         // Define Predicates On Create and Update function
55         p := predicate.Funcs{
56                 UpdateFunc: func(e event.UpdateEvent) bool {
57                         annotaion := e.MetaNew.GetAnnotations()
58                         // The object doesn't contain annotation ,nfnNetworkAnnotation so the event will be
59                         // ignored.
60                         if _, ok := annotaion[nfnNetworkAnnotation]; !ok {
61                                 return false
62                         }
63                         // If pod is already processed by OVN don't add event
64                         if _, ok := annotaion[ovn.Ovn4nfvAnnotationTag]; ok {
65                                 return false
66                         }
67                         return true
68                 },
69                 CreateFunc: func(e event.CreateEvent) bool {
70                         // The object doesn't contain annotation ,nfnNetworkAnnotation so the event will be
71                         // ignored.
72                         annotaion := e.Meta.GetAnnotations()
73                         if _, ok := annotaion[nfnNetworkAnnotation]; !ok {
74                                 return false
75                         }
76                         return true
77                 },
78                 DeleteFunc: func(e event.DeleteEvent) bool {
79                         // The object doesn't contain annotation ,nfnNetworkAnnotation so the event will be
80                         // ignored.
81                         annotaion := e.Meta.GetAnnotations()
82                         if _, ok := annotaion[nfnNetworkAnnotation]; !ok {
83                                 return false
84                         }
85                         return true
86                 },
87         }
88
89         // Watch for Pod create / update / delete events and call Reconcile
90         err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, p)
91         if err != nil {
92                 return err
93         }
94         return nil
95 }
96
97 // blank assignment to verify that ReconcuilePod implements reconcile.Reconciler
98 var _ reconcile.Reconciler = &ReconcilePod{}
99
100 // ReconcilePod reconciles a ProviderNetwork object
101 type ReconcilePod struct {
102         // This client, initialized using mgr.Client() above, is a split client
103         // that reads objects from the cache and writes to the apiserver
104         client client.Client
105         scheme *runtime.Scheme
106 }
107
108 // Reconcile function
109 // The Controller will requeue the Request to be processed again if the returned error is non-nil or
110 // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
111 func (r *ReconcilePod) Reconcile(request reconcile.Request) (reconcile.Result, error) {
112         reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
113         reqLogger.Info("Enter Reconciling Pod")
114
115         // Fetch the Pod instance
116         instance := &corev1.Pod{}
117         err := r.client.Get(context.TODO(), request.NamespacedName, instance)
118
119         if err != nil {
120                 if errors.IsNotFound(err) {
121                         // Request object not found, could have been deleted after reconcile request.
122                         // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
123                         // Return and don't requeue
124                         if instance.Name == "" || instance.Namespace == "" {
125                                 return reconcile.Result{}, nil
126                         }
127                         r.deleteLogicalPorts(request.Name, request.Namespace)
128                         return reconcile.Result{}, nil
129                 }
130                 // Error reading the object - requeue the request.
131                 return reconcile.Result{}, err
132         }
133         if instance.Name == "" || instance.Namespace == "" {
134                 return reconcile.Result{}, nil
135         }
136         err = r.addLogicalPorts(instance)
137         if err != nil && err.Error() == "Failed to add ports" {
138                 // Requeue the object
139                 return reconcile.Result{}, err
140         }
141         reqLogger.Info("Exit Reconciling Pod")
142         return reconcile.Result{}, nil
143 }
144
145 // annotatePod annotates pod with the given annotations
146 func (r *ReconcilePod) setPodAnnotation(pod *corev1.Pod, key, value string) error {
147
148         patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value)
149         err := r.client.Patch(context.TODO(), pod, client.ConstantPatch(types.MergePatchType, []byte(patchData)))
150         if err != nil {
151                 log.Error(err, "Updating pod failed", "pod", pod, "key", key, "value", value)
152                 return err
153         }
154         return nil
155 }
156
157 func (r *ReconcilePod) addLogicalPorts(pod *corev1.Pod) error {
158
159         nfn, err := r.readPodAnnotation(pod)
160         if err != nil {
161                 return err
162         }
163
164         switch {
165         case nfn.Type == "ovn4nfv":
166                 ovnCtl, err := ovn.GetOvnController()
167                 if err != nil {
168                         return err
169                 }
170                 key, value := ovnCtl.AddLogicalPorts(pod, nfn.Interface)
171                 if len(key) > 0 {
172                         return r.setPodAnnotation(pod, key, value)
173                 }
174                 return fmt.Errorf("Failed to add ports")
175         default:
176                 return fmt.Errorf("Unsupported Networking type %s", nfn.Type)
177         // Add other types here
178         }
179 }
180
181 func (r *ReconcilePod) deleteLogicalPorts(name, namesapce string) error {
182
183         // Run delete for all controllers; pod annonations inaccessible
184         ovnCtl, err := ovn.GetOvnController()
185         if err != nil {
186                 return err
187         }
188         ovnCtl.DeleteLogicalPorts(name, namesapce)
189         return nil
190         // Add other types here
191 }
192
193 func (r *ReconcilePod) readPodAnnotation(pod *corev1.Pod) (*nfnNetwork, error) {
194         annotaion, ok := pod.Annotations[nfnNetworkAnnotation]
195         if !ok {
196                 return nil, fmt.Errorf("Invalid annotations")
197         }
198         var nfn nfnNetwork
199         err := json.Unmarshal([]byte(annotaion), &nfn)
200         if err != nil {
201                 log.Error(err, "Invalid nfn annotaion", "annotaiton", annotaion)
202                 return nil, err
203         }
204         return &nfn, nil
205 }