3 # Copyright (c) 2020 Orange and others.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
11 Define the parent for Kubernetes testing.
14 from __future__ import division
24 from jinja2 import Template
25 from kubernetes import client
26 from kubernetes import config
27 from kubernetes import watch
31 from xtesting.core import testcase
34 class SecurityTesting(testcase.TestCase):
35 # pylint: disable=too-many-instance-attributes
36 """Run Security job"""
38 dockerhub_repo = os.getenv("MIRROR_REPO", "docker.io")
40 __logger = logging.getLogger(__name__)
42 def __init__(self, **kwargs):
43 super().__init__(**kwargs)
44 config.load_kube_config()
45 self.corev1 = client.CoreV1Api()
46 self.batchv1 = client.BatchV1Api()
50 self.output_log_name = 'functest-kubernetes.log'
51 self.output_debug_log_name = 'functest-kubernetes.debug.log'
53 self.ns_generate_name = "security-"
59 It runs a single security job and then simply prints its output asis.
63 api_response = self.corev1.create_namespace(
64 client.V1Namespace(metadata=client.V1ObjectMeta(
65 generate_name=self.ns_generate_name,
66 labels={"pod-security.kubernetes.io/enforce": self.pss})))
67 self.namespace = api_response.metadata.name
68 self.__logger.debug("create_namespace: %s", api_response)
69 with open(pkg_resources.resource_filename(
70 "functest_kubernetes",
71 f"security/{self.job_name}.yaml"),
72 encoding='utf-8') as yfile:
73 template = Template(yfile.read())
74 body = yaml.safe_load(template.render(
75 dockerhub_repo=os.getenv("DOCKERHUB_REPO",
76 self.dockerhub_repo)))
77 api_response = self.batchv1.create_namespaced_job(
78 body=body, namespace=self.namespace)
79 self.__logger.info("Job %s created", api_response.metadata.name)
80 self.__logger.debug("create_namespaced_job: %s", api_response)
81 watch_job = watch.Watch()
82 for event in watch_job.stream(
83 func=self.batchv1.list_namespaced_job,
84 namespace=self.namespace, timeout_seconds=self.watch_timeout):
85 if (event["object"].metadata.name == self.job_name and
86 event["object"].status.succeeded == 1):
88 "%s started in %0.2f sec", event['object'].metadata.name,
89 time.time()-self.start_time)
91 pods = self.corev1.list_namespaced_pod(
92 self.namespace, label_selector=f'job-name={self.job_name}')
93 self.pod = pods.items[0].metadata.name
94 self.pod_log = self.corev1.read_namespaced_pod_log(
95 name=self.pod, namespace=self.namespace)
96 self.__logger.info("\n\n%s", self.pod_log)
98 def run(self, **kwargs):
100 self.start_time = time.time()
103 except client.rest.ApiException:
104 self.__logger.exception("Cannot run %s", self.job_name)
105 self.stop_time = time.time()
110 api_response = self.corev1.delete_namespaced_pod(
111 name=self.pod, namespace=self.namespace)
112 self.__logger.debug("delete_namespaced_pod: %s", api_response)
113 except client.rest.ApiException:
117 api_response = self.batchv1.delete_namespaced_job(
118 name=self.job_name, namespace=self.namespace)
120 "delete_namespaced_deployment: %s", api_response)
121 except client.rest.ApiException:
125 api_response = self.corev1.delete_namespace(self.namespace)
126 self.__logger.debug("delete_namespace: %s", self.namespace)
127 except client.rest.ApiException:
131 class KubeHunter(SecurityTesting):
132 """kube-hunter hunts for security weaknesses in Kubernetes clusters.
134 See https://github.com/aquasecurity/kube-hunter for more details
137 __logger = logging.getLogger(__name__)
139 def __init__(self, **kwargs):
140 super().__init__(**kwargs)
141 self.job_name = "kube-hunter"
142 self.ns_generate_name = "kube-hunter-"
144 def process_results(self, **kwargs):
145 """Process kube-hunter details"""
146 self.details = json.loads(self.pod_log.splitlines()[-1])
147 if self.details["vulnerabilities"]:
149 msg = prettytable.PrettyTable(
150 header_style='upper', padding_width=5,
151 field_names=['category', 'vulnerability', 'severity'])
152 severity = kwargs.get("severity", "none")
153 if severity == "low":
154 allowed_severity = []
155 elif severity == "medium":
156 allowed_severity = ["low"]
157 elif severity == "high":
158 allowed_severity = ["low", "medium"]
160 self.__logger.warning(
161 "Just printing all vulnerabilities as "
162 "no severity criteria given")
163 allowed_severity = ["low", "medium", "high"]
164 for vulnerability in self.details["vulnerabilities"]:
165 if vulnerability["severity"] not in allowed_severity:
168 [vulnerability["category"], vulnerability["vulnerability"],
169 vulnerability["severity"]])
170 self.__logger.warning("\n\n%s\n", msg.get_string())
171 if self.details["hunter_statistics"]:
172 msg = prettytable.PrettyTable(
173 header_style='upper', padding_width=5,
174 field_names=['name', 'description', 'vulnerabilities'])
175 for statistics in self.details["hunter_statistics"]:
178 textwrap.fill(statistics["description"], width=50),
179 statistics["vulnerabilities"]])
180 self.__logger.info("\n\n%s\n", msg.get_string())
182 def run(self, **kwargs):
183 super().run(**kwargs)
185 self.process_results(**kwargs)
186 except Exception: # pylint: disable=broad-except
187 self.__logger.exception("Cannot process results")
191 class KubeBench(SecurityTesting):
192 """kube-bench checks whether Kubernetes is deployed securelyself.
194 It runs the checks documented in the CIS Kubernetes Benchmark.
196 See https://github.com/aquasecurity/kube-bench for more details
199 __logger = logging.getLogger(__name__)
201 def __init__(self, **kwargs):
202 super().__init__(**kwargs)
203 self.job_name = "kube-bench"
204 self.ns_generate_name = "kube-bench-"
205 self.pss = "privileged"
207 def run(self, **kwargs):
208 self.job_name = f'kube-bench-{kwargs.get("target", "node")}'
209 super().run(**kwargs)
210 self.details["report"] = ast.literal_eval(self.pod_log)
211 msg = prettytable.PrettyTable(
212 header_style='upper', padding_width=5,
213 field_names=['node_type', 'version', 'test_desc', 'pass',
215 for details in self.details["report"]["Controls"]:
216 for test in details['tests']:
218 [details['node_type'], details['version'], test['desc'],
219 test['pass'], test['fail'], test['warn']])
220 for result in test["results"]:
221 if result['scored'] and result['status'] == 'FAIL':
223 "%s\n%s", result['test_desc'],
224 result['remediation'])
225 self.__logger.warning("Targets:\n\n%s\n", msg.get_string())