3 # Copyright (c) 2020 Orange and others.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
11 Define the parent for Kubernetes testing.
14 from __future__ import division
20 from kubernetes import client
21 from kubernetes import config
22 from kubernetes import watch
24 from xtesting.core import testcase
27 class SecurityTesting(testcase.TestCase):
28 # pylint: disable=too-many-instance-attributes
29 """Run Security job"""
32 __logger = logging.getLogger(__name__)
34 def __init__(self, **kwargs):
35 super(SecurityTesting, self).__init__(**kwargs)
36 config.load_kube_config()
37 self.corev1 = client.CoreV1Api()
38 self.batchv1 = client.BatchV1Api()
41 self.output_log_name = 'functest-kubernetes.log'
42 self.output_debug_log_name = 'functest-kubernetes.debug.log'
48 It runs a single security job and then simply prints its output asis.
52 api_response = self.corev1.create_namespace(
53 client.V1Namespace(metadata=client.V1ObjectMeta(
54 generate_name="ims-")))
55 self.namespace = api_response.metadata.name
56 self.__logger.debug("create_namespace: %s", api_response)
57 # pylint: disable=bad-continuation
58 with open(pkg_resources.resource_filename(
59 "functest_kubernetes",
60 "security/{}.yaml".format(self.job_name))) as yfile:
61 body = yaml.safe_load(yfile)
62 api_response = self.batchv1.create_namespaced_job(
63 body=body, namespace=self.namespace)
64 self.__logger.info("Job %s created", api_response.metadata.name)
65 self.__logger.debug("create_namespaced_job: %s", api_response)
66 watch_job = watch.Watch()
67 for event in watch_job.stream(
68 func=self.batchv1.list_namespaced_job,
69 namespace=self.namespace, timeout_seconds=self.watch_timeout):
70 if (event["object"].metadata.name == self.job_name and
71 event["object"].status.succeeded == 1):
73 "%s started in %0.2f sec", event['object'].metadata.name,
74 time.time()-self.start_time)
76 pods = self.corev1.list_namespaced_pod(
77 self.namespace, label_selector='job-name={}'.format(self.job_name))
78 self.pod = pods.items[0].metadata.name
79 api_response = self.corev1.read_namespaced_pod_log(
80 name=self.pod, namespace=self.namespace)
81 self.__logger.warning("\n\n%s", api_response)
84 def run(self, **kwargs):
86 self.start_time = time.time()
89 except client.rest.ApiException:
90 self.__logger.exception("Cannot run %s", self.job_name)
91 self.stop_time = time.time()
96 api_response = self.corev1.delete_namespaced_pod(
97 name=self.pod, namespace=self.namespace)
98 self.__logger.debug("delete_namespaced_pod: %s", api_response)
99 except client.rest.ApiException:
103 api_response = self.batchv1.delete_namespaced_job(
104 name=self.job_name, namespace=self.namespace)
106 "delete_namespaced_deployment: %s", api_response)
107 except client.rest.ApiException:
111 api_response = self.corev1.delete_namespace(self.namespace)
112 self.__logger.debug("delete_namespace: %s", self.namespace)
113 except client.rest.ApiException:
117 class KubeHunter(SecurityTesting):
118 """kube-hunter hunts for security weaknesses in Kubernetes clusters.
120 See https://github.com/aquasecurity/kube-hunter for more details
123 def __init__(self, **kwargs):
124 super(KubeHunter, self).__init__(**kwargs)
125 self.job_name = "kube-hunter"
128 class KubeBench(SecurityTesting):
129 """kube-bench checks whether Kubernetes is deployed securelyself.
131 It runs the checks documented in the CIS Kubernetes Benchmark.
133 See https://github.com/aquasecurity/kube-bench for more details
136 def __init__(self, **kwargs):
137 super(KubeBench, self).__init__(**kwargs)
138 self.job_name = "kube-bench"