3 # Copyright (c) 2020 Orange and others.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
11 Define the parent for Kubernetes testing.
14 from __future__ import division
22 from kubernetes import client
23 from kubernetes import config
24 from kubernetes import watch
27 from xtesting.core import testcase
30 class SecurityTesting(testcase.TestCase):
31 # pylint: disable=too-many-instance-attributes
32 """Run Security job"""
35 __logger = logging.getLogger(__name__)
37 def __init__(self, **kwargs):
38 super(SecurityTesting, self).__init__(**kwargs)
39 config.load_kube_config()
40 self.corev1 = client.CoreV1Api()
41 self.batchv1 = client.BatchV1Api()
45 self.output_log_name = 'functest-kubernetes.log'
46 self.output_debug_log_name = 'functest-kubernetes.debug.log'
52 It runs a single security job and then simply prints its output asis.
56 api_response = self.corev1.create_namespace(
57 client.V1Namespace(metadata=client.V1ObjectMeta(
58 generate_name="ims-")))
59 self.namespace = api_response.metadata.name
60 self.__logger.debug("create_namespace: %s", api_response)
61 with open(pkg_resources.resource_filename(
62 "functest_kubernetes",
63 "security/{}.yaml".format(self.job_name))) as yfile:
64 body = yaml.safe_load(yfile)
65 api_response = self.batchv1.create_namespaced_job(
66 body=body, namespace=self.namespace)
67 self.__logger.info("Job %s created", api_response.metadata.name)
68 self.__logger.debug("create_namespaced_job: %s", api_response)
69 watch_job = watch.Watch()
70 for event in watch_job.stream(
71 func=self.batchv1.list_namespaced_job,
72 namespace=self.namespace, timeout_seconds=self.watch_timeout):
73 if (event["object"].metadata.name == self.job_name and
74 event["object"].status.succeeded == 1):
76 "%s started in %0.2f sec", event['object'].metadata.name,
77 time.time()-self.start_time)
79 pods = self.corev1.list_namespaced_pod(
80 self.namespace, label_selector='job-name={}'.format(self.job_name))
81 self.pod = pods.items[0].metadata.name
82 self.pod_log = self.corev1.read_namespaced_pod_log(
83 name=self.pod, namespace=self.namespace)
84 self.__logger.info("\n\n%s", self.pod_log)
86 def run(self, **kwargs):
88 self.start_time = time.time()
91 except client.rest.ApiException:
92 self.__logger.exception("Cannot run %s", self.job_name)
93 self.stop_time = time.time()
98 api_response = self.corev1.delete_namespaced_pod(
99 name=self.pod, namespace=self.namespace)
100 self.__logger.debug("delete_namespaced_pod: %s", api_response)
101 except client.rest.ApiException:
105 api_response = self.batchv1.delete_namespaced_job(
106 name=self.job_name, namespace=self.namespace)
108 "delete_namespaced_deployment: %s", api_response)
109 except client.rest.ApiException:
113 api_response = self.corev1.delete_namespace(self.namespace)
114 self.__logger.debug("delete_namespace: %s", self.namespace)
115 except client.rest.ApiException:
119 class KubeHunter(SecurityTesting):
120 """kube-hunter hunts for security weaknesses in Kubernetes clusters.
122 See https://github.com/aquasecurity/kube-hunter for more details
125 __logger = logging.getLogger(__name__)
127 def __init__(self, **kwargs):
128 super(KubeHunter, self).__init__(**kwargs)
129 self.job_name = "kube-hunter"
131 def process_results(self, **kwargs):
132 """Process kube-hunter details"""
133 self.details = json.loads(self.pod_log.splitlines()[-1])
134 if self.details["vulnerabilities"]:
136 msg = prettytable.PrettyTable(
137 header_style='upper', padding_width=5,
138 field_names=['category', 'vulnerability', 'severity'])
139 severity = kwargs.get("severity", "high")
140 if severity == "low":
141 allowed_severity = []
142 elif severity == "medium":
143 allowed_severity = ["low"]
144 elif severity == "high":
145 allowed_severity = ["low", "medium"]
147 self.__logger.warning(
148 "Selecting high as default severity (%s is incorrect)",
149 kwargs.get("severity", "high"))
151 allowed_severity = ["low", "medium"]
152 for vulnerability in self.details["vulnerabilities"]:
153 if vulnerability["severity"] in allowed_severity:
154 self.__logger.warning(
155 "Skipping %s (severity is configured as %s)",
156 vulnerability["vulnerability"], severity)
160 [vulnerability["category"], vulnerability["vulnerability"],
161 vulnerability["severity"]])
162 self.__logger.warning("\n\n%s\n", msg.get_string())
163 if self.details["hunter_statistics"]:
164 msg = prettytable.PrettyTable(
165 header_style='upper', padding_width=5,
166 field_names=['name', 'description', 'vulnerabilities'])
167 for statistics in self.details["hunter_statistics"]:
170 textwrap.fill(statistics["description"], width=50),
171 statistics["vulnerabilities"]])
172 self.__logger.info("\n\n%s\n", msg.get_string())
174 def run(self, **kwargs):
175 super(KubeHunter, self).run(**kwargs)
177 self.process_results(**kwargs)
178 except Exception: # pylint: disable=broad-except
179 self.__logger.exception("Cannot process results")
183 class KubeBench(SecurityTesting):
184 """kube-bench checks whether Kubernetes is deployed securelyself.
186 It runs the checks documented in the CIS Kubernetes Benchmark.
188 See https://github.com/aquasecurity/kube-bench for more details
191 def __init__(self, **kwargs):
192 super(KubeBench, self).__init__(**kwargs)
193 self.job_name = "kube-bench"
195 def run(self, **kwargs):
196 super(KubeBench, self).run(**kwargs)