3 # Copyright (c) 2020 Orange and others.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
11 Define the parent for Kubernetes testing.
14 from __future__ import division
23 from kubernetes import client
24 from kubernetes import config
25 from kubernetes import watch
28 from xtesting.core import testcase
31 class SecurityTesting(testcase.TestCase):
32 # pylint: disable=too-many-instance-attributes
33 """Run Security job"""
36 __logger = logging.getLogger(__name__)
38 def __init__(self, **kwargs):
39 super(SecurityTesting, self).__init__(**kwargs)
40 config.load_kube_config()
41 self.corev1 = client.CoreV1Api()
42 self.batchv1 = client.BatchV1Api()
46 self.output_log_name = 'functest-kubernetes.log'
47 self.output_debug_log_name = 'functest-kubernetes.debug.log'
53 It runs a single security job and then simply prints its output asis.
57 api_response = self.corev1.create_namespace(
58 client.V1Namespace(metadata=client.V1ObjectMeta(
59 generate_name="ims-")))
60 self.namespace = api_response.metadata.name
61 self.__logger.debug("create_namespace: %s", api_response)
62 # pylint: disable=bad-continuation
63 with open(pkg_resources.resource_filename(
64 "functest_kubernetes",
65 "security/{}.yaml".format(self.job_name))) as yfile:
66 body = yaml.safe_load(yfile)
67 api_response = self.batchv1.create_namespaced_job(
68 body=body, namespace=self.namespace)
69 self.__logger.info("Job %s created", api_response.metadata.name)
70 self.__logger.debug("create_namespaced_job: %s", api_response)
71 watch_job = watch.Watch()
72 for event in watch_job.stream(
73 func=self.batchv1.list_namespaced_job,
74 namespace=self.namespace, timeout_seconds=self.watch_timeout):
75 if (event["object"].metadata.name == self.job_name and
76 event["object"].status.succeeded == 1):
78 "%s started in %0.2f sec", event['object'].metadata.name,
79 time.time()-self.start_time)
81 pods = self.corev1.list_namespaced_pod(
82 self.namespace, label_selector='job-name={}'.format(self.job_name))
83 self.pod = pods.items[0].metadata.name
84 self.pod_log = self.corev1.read_namespaced_pod_log(
85 name=self.pod, namespace=self.namespace)
86 self.__logger.info("\n\n%s", self.pod_log)
88 def run(self, **kwargs):
90 self.start_time = time.time()
93 except client.rest.ApiException:
94 self.__logger.exception("Cannot run %s", self.job_name)
95 self.stop_time = time.time()
100 api_response = self.corev1.delete_namespaced_pod(
101 name=self.pod, namespace=self.namespace)
102 self.__logger.debug("delete_namespaced_pod: %s", api_response)
103 except client.rest.ApiException:
107 api_response = self.batchv1.delete_namespaced_job(
108 name=self.job_name, namespace=self.namespace)
110 "delete_namespaced_deployment: %s", api_response)
111 except client.rest.ApiException:
115 api_response = self.corev1.delete_namespace(self.namespace)
116 self.__logger.debug("delete_namespace: %s", self.namespace)
117 except client.rest.ApiException:
121 class KubeHunter(SecurityTesting):
122 """kube-hunter hunts for security weaknesses in Kubernetes clusters.
124 See https://github.com/aquasecurity/kube-hunter for more details
127 __logger = logging.getLogger(__name__)
129 def __init__(self, **kwargs):
130 super(KubeHunter, self).__init__(**kwargs)
131 self.job_name = "kube-hunter"
133 def process_results(self, **kwargs):
134 """Process kube-hunter details"""
135 self.details = json.loads(self.pod_log.splitlines()[-1])
136 if self.details["vulnerabilities"]:
138 msg = prettytable.PrettyTable(
139 header_style='upper', padding_width=5,
140 field_names=['category', 'vulnerability', 'severity'])
141 severity = kwargs.get("severity", "high")
142 if severity == "low":
143 allowed_severity = []
144 elif severity == "medium":
145 allowed_severity = ["low"]
146 elif severity == "high":
147 allowed_severity = ["low", "medium"]
149 self.__logger.warning(
150 "Selecting high as default severity (%s is incorrect)",
151 kwargs.get("severity", "high"))
153 allowed_severity = ["low", "medium"]
154 for vulnerability in self.details["vulnerabilities"]:
155 if vulnerability["severity"] in allowed_severity:
156 self.__logger.warning(
157 "Skipping %s (severity is configured as %s)",
158 vulnerability["vulnerability"], severity)
162 [vulnerability["category"], vulnerability["vulnerability"],
163 vulnerability["severity"]])
164 self.__logger.warning("\n\n%s\n", msg.get_string())
165 if self.details["hunter_statistics"]:
166 msg = prettytable.PrettyTable(
167 header_style='upper', padding_width=5,
168 field_names=['name', 'description', 'vulnerabilities'])
169 for statistics in self.details["hunter_statistics"]:
172 textwrap.fill(statistics["description"], width=50),
173 statistics["vulnerabilities"]])
174 self.__logger.info("\n\n%s\n", msg.get_string())
176 def run(self, **kwargs):
177 super(KubeHunter, self).run(**kwargs)
179 self.process_results(**kwargs)
180 except Exception: # pylint: disable=broad-except
181 self.__logger.exception("Cannot process results")
185 class KubeBench(SecurityTesting):
186 """kube-bench checks whether Kubernetes is deployed securelyself.
188 It runs the checks documented in the CIS Kubernetes Benchmark.
190 See https://github.com/aquasecurity/kube-bench for more details
193 __logger = logging.getLogger(__name__)
195 def run(self, **kwargs):
196 self.job_name = "kube-bench-{}".format(kwargs.get("target", "node"))
197 super(KubeBench, self).run(**kwargs)
198 self.details["report"] = ast.literal_eval(self.pod_log)
199 msg = prettytable.PrettyTable(
200 header_style='upper', padding_width=5,
201 field_names=['node_type', 'version', 'test_desc', 'pass',
203 for details in self.details["report"]:
204 for test in details['tests']:
206 [details['node_type'], details['version'], test['desc'],
207 test['pass'], test['fail'], test['warn']])
208 for result in test["results"]:
209 if result['scored'] and result['status'] == 'FAIL':
211 "%s\n%s", result['test_desc'],
212 result['remediation'])
213 self.__logger.warning("Targets:\n\n%s\n", msg.get_string())