3 # Copyright (c) 2020 Orange and others.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
11 Define the parent for Kubernetes testing.
14 from __future__ import division
24 from jinja2 import Template
25 from kubernetes import client
26 from kubernetes import config
27 from kubernetes import watch
30 from xtesting.core import testcase
33 class SecurityTesting(testcase.TestCase):
34 # pylint: disable=too-many-instance-attributes
35 """Run Security job"""
37 dockerhub_repo = os.getenv("MIRROR_REPO", "docker.io")
39 __logger = logging.getLogger(__name__)
41 def __init__(self, **kwargs):
42 super(SecurityTesting, self).__init__(**kwargs)
43 config.load_kube_config()
44 self.corev1 = client.CoreV1Api()
45 self.batchv1 = client.BatchV1Api()
49 self.output_log_name = 'functest-kubernetes.log'
50 self.output_debug_log_name = 'functest-kubernetes.debug.log'
52 self.ns_generate_name = "security-"
57 It runs a single security job and then simply prints its output asis.
61 api_response = self.corev1.create_namespace(
62 client.V1Namespace(metadata=client.V1ObjectMeta(
63 generate_name=self.ns_generate_name)))
64 self.namespace = api_response.metadata.name
65 self.__logger.debug("create_namespace: %s", api_response)
66 with open(pkg_resources.resource_filename(
67 "functest_kubernetes",
68 "security/{}.yaml".format(self.job_name))) as yfile:
69 template = Template(yfile.read())
70 body = yaml.safe_load(template.render(
71 dockerhub_repo=os.getenv("DOCKERHUB_REPO",
72 self.dockerhub_repo)))
73 api_response = self.batchv1.create_namespaced_job(
74 body=body, namespace=self.namespace)
75 self.__logger.info("Job %s created", api_response.metadata.name)
76 self.__logger.debug("create_namespaced_job: %s", api_response)
77 watch_job = watch.Watch()
78 for event in watch_job.stream(
79 func=self.batchv1.list_namespaced_job,
80 namespace=self.namespace, timeout_seconds=self.watch_timeout):
81 if (event["object"].metadata.name == self.job_name and
82 event["object"].status.succeeded == 1):
84 "%s started in %0.2f sec", event['object'].metadata.name,
85 time.time()-self.start_time)
87 pods = self.corev1.list_namespaced_pod(
88 self.namespace, label_selector='job-name={}'.format(self.job_name))
89 self.pod = pods.items[0].metadata.name
90 self.pod_log = self.corev1.read_namespaced_pod_log(
91 name=self.pod, namespace=self.namespace)
92 self.__logger.info("\n\n%s", self.pod_log)
94 def run(self, **kwargs):
96 self.start_time = time.time()
99 except client.rest.ApiException:
100 self.__logger.exception("Cannot run %s", self.job_name)
101 self.stop_time = time.time()
106 api_response = self.corev1.delete_namespaced_pod(
107 name=self.pod, namespace=self.namespace)
108 self.__logger.debug("delete_namespaced_pod: %s", api_response)
109 except client.rest.ApiException:
113 api_response = self.batchv1.delete_namespaced_job(
114 name=self.job_name, namespace=self.namespace)
116 "delete_namespaced_deployment: %s", api_response)
117 except client.rest.ApiException:
121 api_response = self.corev1.delete_namespace(self.namespace)
122 self.__logger.debug("delete_namespace: %s", self.namespace)
123 except client.rest.ApiException:
127 class KubeHunter(SecurityTesting):
128 """kube-hunter hunts for security weaknesses in Kubernetes clusters.
130 See https://github.com/aquasecurity/kube-hunter for more details
133 __logger = logging.getLogger(__name__)
135 def __init__(self, **kwargs):
136 super(KubeHunter, self).__init__(**kwargs)
137 self.job_name = "kube-hunter"
138 self.ns_generate_name = "kube-hunter-"
140 def process_results(self, **kwargs):
141 """Process kube-hunter details"""
142 self.details = json.loads(self.pod_log.splitlines()[-1])
143 if self.details["vulnerabilities"]:
145 msg = prettytable.PrettyTable(
146 header_style='upper', padding_width=5,
147 field_names=['category', 'vulnerability', 'severity'])
148 severity = kwargs.get("severity", "high")
149 if severity == "low":
150 allowed_severity = []
151 elif severity == "medium":
152 allowed_severity = ["low"]
153 elif severity == "high":
154 allowed_severity = ["low", "medium"]
156 self.__logger.warning(
157 "Selecting high as default severity (%s is incorrect)",
158 kwargs.get("severity", "high"))
160 allowed_severity = ["low", "medium"]
161 for vulnerability in self.details["vulnerabilities"]:
162 if vulnerability["severity"] in allowed_severity:
163 self.__logger.warning(
164 "Skipping %s (severity is configured as %s)",
165 vulnerability["vulnerability"], severity)
169 [vulnerability["category"], vulnerability["vulnerability"],
170 vulnerability["severity"]])
171 self.__logger.warning("\n\n%s\n", msg.get_string())
172 if self.details["hunter_statistics"]:
173 msg = prettytable.PrettyTable(
174 header_style='upper', padding_width=5,
175 field_names=['name', 'description', 'vulnerabilities'])
176 for statistics in self.details["hunter_statistics"]:
179 textwrap.fill(statistics["description"], width=50),
180 statistics["vulnerabilities"]])
181 self.__logger.info("\n\n%s\n", msg.get_string())
183 def run(self, **kwargs):
184 super(KubeHunter, self).run(**kwargs)
186 self.process_results(**kwargs)
187 except Exception: # pylint: disable=broad-except
188 self.__logger.exception("Cannot process results")
192 class KubeBench(SecurityTesting):
193 """kube-bench checks whether Kubernetes is deployed securelyself.
195 It runs the checks documented in the CIS Kubernetes Benchmark.
197 See https://github.com/aquasecurity/kube-bench for more details
200 __logger = logging.getLogger(__name__)
202 def __init__(self, **kwargs):
203 super(KubeBench, self).__init__(**kwargs)
204 self.job_name = "kube-bench"
205 self.ns_generate_name = "kube-bench-"
207 def run(self, **kwargs):
208 self.job_name = "kube-bench-{}".format(kwargs.get("target", "node"))
209 super(KubeBench, self).run(**kwargs)
210 self.details["report"] = ast.literal_eval(self.pod_log)
211 msg = prettytable.PrettyTable(
212 header_style='upper', padding_width=5,
213 field_names=['node_type', 'version', 'test_desc', 'pass',
215 for details in self.details["report"]:
216 for test in details['tests']:
218 [details['node_type'], details['version'], test['desc'],
219 test['pass'], test['fail'], test['warn']])
220 for result in test["results"]:
221 if result['scored'] and result['status'] == 'FAIL':
223 "%s\n%s", result['test_desc'],
224 result['remediation'])
225 self.__logger.warning("Targets:\n\n%s\n", msg.get_string())