3 # Copyright (c) 2020 Orange and others.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
11 Define the parent for Kubernetes testing.
14 from __future__ import division
24 from jinja2 import Template
25 from kubernetes import client
26 from kubernetes import config
27 from kubernetes import watch
31 from xtesting.core import testcase
34 class SecurityTesting(testcase.TestCase):
35 # pylint: disable=too-many-instance-attributes
36 """Run Security job"""
38 dockerhub_repo = os.getenv("MIRROR_REPO", "docker.io")
40 __logger = logging.getLogger(__name__)
42 def __init__(self, **kwargs):
43 super().__init__(**kwargs)
44 config.load_kube_config()
45 self.corev1 = client.CoreV1Api()
46 self.batchv1 = client.BatchV1Api()
50 self.output_log_name = 'functest-kubernetes.log'
51 self.output_debug_log_name = 'functest-kubernetes.debug.log'
53 self.ns_generate_name = "security-"
58 It runs a single security job and then simply prints its output asis.
62 api_response = self.corev1.create_namespace(
63 client.V1Namespace(metadata=client.V1ObjectMeta(
64 generate_name=self.ns_generate_name)))
65 self.namespace = api_response.metadata.name
66 self.__logger.debug("create_namespace: %s", api_response)
67 with open(pkg_resources.resource_filename(
68 "functest_kubernetes",
69 f"security/{self.job_name}.yaml"),
70 encoding='utf-8') as yfile:
71 template = Template(yfile.read())
72 body = yaml.safe_load(template.render(
73 dockerhub_repo=os.getenv("DOCKERHUB_REPO",
74 self.dockerhub_repo)))
75 api_response = self.batchv1.create_namespaced_job(
76 body=body, namespace=self.namespace)
77 self.__logger.info("Job %s created", api_response.metadata.name)
78 self.__logger.debug("create_namespaced_job: %s", api_response)
79 watch_job = watch.Watch()
80 for event in watch_job.stream(
81 func=self.batchv1.list_namespaced_job,
82 namespace=self.namespace, timeout_seconds=self.watch_timeout):
83 if (event["object"].metadata.name == self.job_name and
84 event["object"].status.succeeded == 1):
86 "%s started in %0.2f sec", event['object'].metadata.name,
87 time.time()-self.start_time)
89 pods = self.corev1.list_namespaced_pod(
90 self.namespace, label_selector=f'job-name={self.job_name}')
91 self.pod = pods.items[0].metadata.name
92 self.pod_log = self.corev1.read_namespaced_pod_log(
93 name=self.pod, namespace=self.namespace)
94 self.__logger.info("\n\n%s", self.pod_log)
96 def run(self, **kwargs):
98 self.start_time = time.time()
101 except client.rest.ApiException:
102 self.__logger.exception("Cannot run %s", self.job_name)
103 self.stop_time = time.time()
108 api_response = self.corev1.delete_namespaced_pod(
109 name=self.pod, namespace=self.namespace)
110 self.__logger.debug("delete_namespaced_pod: %s", api_response)
111 except client.rest.ApiException:
115 api_response = self.batchv1.delete_namespaced_job(
116 name=self.job_name, namespace=self.namespace)
118 "delete_namespaced_deployment: %s", api_response)
119 except client.rest.ApiException:
123 api_response = self.corev1.delete_namespace(self.namespace)
124 self.__logger.debug("delete_namespace: %s", self.namespace)
125 except client.rest.ApiException:
129 class KubeHunter(SecurityTesting):
130 """kube-hunter hunts for security weaknesses in Kubernetes clusters.
132 See https://github.com/aquasecurity/kube-hunter for more details
135 __logger = logging.getLogger(__name__)
137 def __init__(self, **kwargs):
138 super().__init__(**kwargs)
139 self.job_name = "kube-hunter"
140 self.ns_generate_name = "kube-hunter-"
142 def process_results(self, **kwargs):
143 """Process kube-hunter details"""
144 self.details = json.loads(self.pod_log.splitlines()[-1])
145 if self.details["vulnerabilities"]:
147 msg = prettytable.PrettyTable(
148 header_style='upper', padding_width=5,
149 field_names=['category', 'vulnerability', 'severity'])
150 severity = kwargs.get("severity", "none")
151 if severity == "low":
152 allowed_severity = []
153 elif severity == "medium":
154 allowed_severity = ["low"]
155 elif severity == "high":
156 allowed_severity = ["low", "medium"]
158 self.__logger.warning(
159 "Just printing all vulnerabilities as "
160 "no severity criteria given")
161 allowed_severity = ["low", "medium", "high"]
162 for vulnerability in self.details["vulnerabilities"]:
163 if vulnerability["severity"] not in allowed_severity:
166 [vulnerability["category"], vulnerability["vulnerability"],
167 vulnerability["severity"]])
168 self.__logger.warning("\n\n%s\n", msg.get_string())
169 if self.details["hunter_statistics"]:
170 msg = prettytable.PrettyTable(
171 header_style='upper', padding_width=5,
172 field_names=['name', 'description', 'vulnerabilities'])
173 for statistics in self.details["hunter_statistics"]:
176 textwrap.fill(statistics["description"], width=50),
177 statistics["vulnerabilities"]])
178 self.__logger.info("\n\n%s\n", msg.get_string())
180 def run(self, **kwargs):
181 super().run(**kwargs)
183 self.process_results(**kwargs)
184 except Exception: # pylint: disable=broad-except
185 self.__logger.exception("Cannot process results")
189 class KubeBench(SecurityTesting):
190 """kube-bench checks whether Kubernetes is deployed securelyself.
192 It runs the checks documented in the CIS Kubernetes Benchmark.
194 See https://github.com/aquasecurity/kube-bench for more details
197 __logger = logging.getLogger(__name__)
199 def __init__(self, **kwargs):
200 super().__init__(**kwargs)
201 self.job_name = "kube-bench"
202 self.ns_generate_name = "kube-bench-"
204 def run(self, **kwargs):
205 self.job_name = f'kube-bench-{kwargs.get("target", "node")}'
206 super().run(**kwargs)
207 self.details["report"] = ast.literal_eval(self.pod_log)
208 msg = prettytable.PrettyTable(
209 header_style='upper', padding_width=5,
210 field_names=['node_type', 'version', 'test_desc', 'pass',
212 for details in self.details["report"]["Controls"]:
213 for test in details['tests']:
215 [details['node_type'], details['version'], test['desc'],
216 test['pass'], test['fail'], test['warn']])
217 for result in test["results"]:
218 if result['scored'] and result['status'] == 'FAIL':
220 "%s\n%s", result['test_desc'],
221 result['remediation'])
222 self.__logger.warning("Targets:\n\n%s\n", msg.get_string())