3 # Copyright (c) 2020 Orange and others.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
11 Define the parent for Kubernetes testing.
14 from __future__ import division
24 from jinja2 import Template
25 from kubernetes import client
26 from kubernetes import config
27 from kubernetes import watch
31 from xtesting.core import testcase
34 class SecurityTesting(testcase.TestCase):
35 # pylint: disable=too-many-instance-attributes
36 """Run Security job"""
38 dockerhub_repo = os.getenv("MIRROR_REPO", "docker.io")
40 __logger = logging.getLogger(__name__)
42 def __init__(self, **kwargs):
43 super().__init__(**kwargs)
44 config.load_kube_config()
45 self.corev1 = client.CoreV1Api()
46 self.batchv1 = client.BatchV1Api()
50 self.output_log_name = 'functest-kubernetes.log'
51 self.output_debug_log_name = 'functest-kubernetes.debug.log'
53 self.ns_generate_name = "security-"
58 It runs a single security job and then simply prints its output asis.
62 api_response = self.corev1.create_namespace(
63 client.V1Namespace(metadata=client.V1ObjectMeta(
64 generate_name=self.ns_generate_name,
65 labels={"pod-security.kubernetes.io/enforce": "baseline"})))
66 self.namespace = api_response.metadata.name
67 self.__logger.debug("create_namespace: %s", api_response)
68 with open(pkg_resources.resource_filename(
69 "functest_kubernetes",
70 f"security/{self.job_name}.yaml"),
71 encoding='utf-8') as yfile:
72 template = Template(yfile.read())
73 body = yaml.safe_load(template.render(
74 dockerhub_repo=os.getenv("DOCKERHUB_REPO",
75 self.dockerhub_repo)))
76 api_response = self.batchv1.create_namespaced_job(
77 body=body, namespace=self.namespace)
78 self.__logger.info("Job %s created", api_response.metadata.name)
79 self.__logger.debug("create_namespaced_job: %s", api_response)
80 watch_job = watch.Watch()
81 for event in watch_job.stream(
82 func=self.batchv1.list_namespaced_job,
83 namespace=self.namespace, timeout_seconds=self.watch_timeout):
84 if (event["object"].metadata.name == self.job_name and
85 event["object"].status.succeeded == 1):
87 "%s started in %0.2f sec", event['object'].metadata.name,
88 time.time()-self.start_time)
90 pods = self.corev1.list_namespaced_pod(
91 self.namespace, label_selector=f'job-name={self.job_name}')
92 self.pod = pods.items[0].metadata.name
93 self.pod_log = self.corev1.read_namespaced_pod_log(
94 name=self.pod, namespace=self.namespace)
95 self.__logger.info("\n\n%s", self.pod_log)
97 def run(self, **kwargs):
99 self.start_time = time.time()
102 except client.rest.ApiException:
103 self.__logger.exception("Cannot run %s", self.job_name)
104 self.stop_time = time.time()
109 api_response = self.corev1.delete_namespaced_pod(
110 name=self.pod, namespace=self.namespace)
111 self.__logger.debug("delete_namespaced_pod: %s", api_response)
112 except client.rest.ApiException:
116 api_response = self.batchv1.delete_namespaced_job(
117 name=self.job_name, namespace=self.namespace)
119 "delete_namespaced_deployment: %s", api_response)
120 except client.rest.ApiException:
124 api_response = self.corev1.delete_namespace(self.namespace)
125 self.__logger.debug("delete_namespace: %s", self.namespace)
126 except client.rest.ApiException:
130 class KubeHunter(SecurityTesting):
131 """kube-hunter hunts for security weaknesses in Kubernetes clusters.
133 See https://github.com/aquasecurity/kube-hunter for more details
136 __logger = logging.getLogger(__name__)
138 def __init__(self, **kwargs):
139 super().__init__(**kwargs)
140 self.job_name = "kube-hunter"
141 self.ns_generate_name = "kube-hunter-"
143 def process_results(self, **kwargs):
144 """Process kube-hunter details"""
145 self.details = json.loads(self.pod_log.splitlines()[-1])
146 if self.details["vulnerabilities"]:
148 msg = prettytable.PrettyTable(
149 header_style='upper', padding_width=5,
150 field_names=['category', 'vulnerability', 'severity'])
151 severity = kwargs.get("severity", "none")
152 if severity == "low":
153 allowed_severity = []
154 elif severity == "medium":
155 allowed_severity = ["low"]
156 elif severity == "high":
157 allowed_severity = ["low", "medium"]
159 self.__logger.warning(
160 "Just printing all vulnerabilities as "
161 "no severity criteria given")
162 allowed_severity = ["low", "medium", "high"]
163 for vulnerability in self.details["vulnerabilities"]:
164 if vulnerability["severity"] not in allowed_severity:
167 [vulnerability["category"], vulnerability["vulnerability"],
168 vulnerability["severity"]])
169 self.__logger.warning("\n\n%s\n", msg.get_string())
170 if self.details["hunter_statistics"]:
171 msg = prettytable.PrettyTable(
172 header_style='upper', padding_width=5,
173 field_names=['name', 'description', 'vulnerabilities'])
174 for statistics in self.details["hunter_statistics"]:
177 textwrap.fill(statistics["description"], width=50),
178 statistics["vulnerabilities"]])
179 self.__logger.info("\n\n%s\n", msg.get_string())
181 def run(self, **kwargs):
182 super().run(**kwargs)
184 self.process_results(**kwargs)
185 except Exception: # pylint: disable=broad-except
186 self.__logger.exception("Cannot process results")
190 class KubeBench(SecurityTesting):
191 """kube-bench checks whether Kubernetes is deployed securelyself.
193 It runs the checks documented in the CIS Kubernetes Benchmark.
195 See https://github.com/aquasecurity/kube-bench for more details
198 __logger = logging.getLogger(__name__)
200 def __init__(self, **kwargs):
201 super().__init__(**kwargs)
202 self.job_name = "kube-bench"
203 self.ns_generate_name = "kube-bench-"
205 def run(self, **kwargs):
206 self.job_name = f'kube-bench-{kwargs.get("target", "node")}'
207 super().run(**kwargs)
208 self.details["report"] = ast.literal_eval(self.pod_log)
209 msg = prettytable.PrettyTable(
210 header_style='upper', padding_width=5,
211 field_names=['node_type', 'version', 'test_desc', 'pass',
213 for details in self.details["report"]["Controls"]:
214 for test in details['tests']:
216 [details['node_type'], details['version'], test['desc'],
217 test['pass'], test['fail'], test['warn']])
218 for result in test["results"]:
219 if result['scored'] and result['status'] == 'FAIL':
221 "%s\n%s", result['test_desc'],
222 result['remediation'])
223 self.__logger.warning("Targets:\n\n%s\n", msg.get_string())