Make K8s security tests namespace aware 67/70767/1
authorCédric Ollivier <cedric.ollivier@orange.com>
Thu, 13 Aug 2020 10:51:55 +0000 (12:51 +0200)
committerCédric Ollivier <cedric.ollivier@orange.com>
Thu, 13 Aug 2020 10:57:48 +0000 (12:57 +0200)
It now creates a namespace to allow running the test cases twice in
parallel. It also overprotects clean operations to force a full delete.

Change-Id: Ie0becd8ea9126328e7280591bacc0d88e14dd031
Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
(cherry picked from commit 8e2a7dbee8f134dbe9022683d40e2328e5e50fe6)

functest_kubernetes/security/security.py

index 437bdf3..16171eb 100644 (file)
@@ -27,7 +27,6 @@ from xtesting.core import testcase
 class SecurityTesting(testcase.TestCase):
     # pylint: disable=too-many-instance-attributes
     """Run Security job"""
-    namespace = 'default'
     watch_timeout = 1200
 
     __logger = logging.getLogger(__name__)
@@ -41,6 +40,7 @@ class SecurityTesting(testcase.TestCase):
         self.job_name = None
         self.output_log_name = 'functest-kubernetes.log'
         self.output_debug_log_name = 'functest-kubernetes.debug.log'
+        self.namespace = ""
 
     def deploy_job(self):
         """Run Security job
@@ -49,13 +49,18 @@ class SecurityTesting(testcase.TestCase):
         """
 
         assert self.job_name
+        api_response = self.corev1.create_namespace(
+            client.V1Namespace(metadata=client.V1ObjectMeta(
+                generate_name="ims-")))
+        self.namespace = api_response.metadata.name
+        self.__logger.debug("create_namespace: %s", api_response)
         # pylint: disable=bad-continuation
         with open(pkg_resources.resource_filename(
                 "functest_kubernetes",
                 "security/{}.yaml".format(self.job_name))) as yfile:
             body = yaml.safe_load(yfile)
             api_response = self.batchv1.create_namespaced_job(
-                body=body, namespace="default")
+                body=body, namespace=self.namespace)
             self.__logger.info("Job %s created", api_response.metadata.name)
             self.__logger.debug("create_namespaced_job: %s", api_response)
         watch_job = watch.Watch()
@@ -86,19 +91,27 @@ class SecurityTesting(testcase.TestCase):
         self.stop_time = time.time()
 
     def clean(self):
-        try:
-            api_response = self.corev1.delete_namespaced_pod(
-                name=self.pod, namespace=self.namespace)
-            self.__logger.debug("delete_namespaced_pod: %s", api_response)
-        except client.rest.ApiException:
-            pass
-        try:
-            api_response = self.batchv1.delete_namespaced_job(
-                name=self.job_name, namespace=self.namespace)
-            self.__logger.debug(
-                "delete_namespaced_deployment: %s", api_response)
-        except client.rest.ApiException:
-            pass
+        if self.pod:
+            try:
+                api_response = self.corev1.delete_namespaced_pod(
+                    name=self.pod, namespace=self.namespace)
+                self.__logger.debug("delete_namespaced_pod: %s", api_response)
+            except client.rest.ApiException:
+                pass
+        if self.job_name:
+            try:
+                api_response = self.batchv1.delete_namespaced_job(
+                    name=self.job_name, namespace=self.namespace)
+                self.__logger.debug(
+                    "delete_namespaced_deployment: %s", api_response)
+            except client.rest.ApiException:
+                pass
+        if self.namespace:
+            try:
+                api_response = self.corev1.delete_namespace(self.namespace)
+                self.__logger.debug("delete_namespace: %s", self.namespace)
+            except client.rest.ApiException:
+                pass
 
 
 class KubeHunter(SecurityTesting):