Enforce baseline Pod Security Standard
[functest-kubernetes.git] / functest_kubernetes / security / security.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2020 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """
11 Define the parent for Kubernetes testing.
12 """
13
14 from __future__ import division
15
16 import ast
17 import json
18 import logging
19 import os
20 import time
21 import textwrap
22 import yaml
23
24 from jinja2 import Template
25 from kubernetes import client
26 from kubernetes import config
27 from kubernetes import watch
28 import pkg_resources
29 import prettytable
30
31 from xtesting.core import testcase
32
33
34 class SecurityTesting(testcase.TestCase):
35     # pylint: disable=too-many-instance-attributes
36     """Run Security job"""
37     watch_timeout = 1200
38     dockerhub_repo = os.getenv("MIRROR_REPO", "docker.io")
39
40     __logger = logging.getLogger(__name__)
41
42     def __init__(self, **kwargs):
43         super().__init__(**kwargs)
44         config.load_kube_config()
45         self.corev1 = client.CoreV1Api()
46         self.batchv1 = client.BatchV1Api()
47         self.pod = None
48         self.pod_log = ""
49         self.job_name = None
50         self.output_log_name = 'functest-kubernetes.log'
51         self.output_debug_log_name = 'functest-kubernetes.debug.log'
52         self.namespace = ""
53         self.ns_generate_name = "security-"
54
55     def deploy_job(self):
56         """Run Security job
57
58         It runs a single security job and then simply prints its output asis.
59         """
60
61         assert self.job_name
62         api_response = self.corev1.create_namespace(
63             client.V1Namespace(metadata=client.V1ObjectMeta(
64                 generate_name=self.ns_generate_name,
65                 labels={"pod-security.kubernetes.io/enforce": "baseline"})))
66         self.namespace = api_response.metadata.name
67         self.__logger.debug("create_namespace: %s", api_response)
68         with open(pkg_resources.resource_filename(
69                 "functest_kubernetes",
70                 f"security/{self.job_name}.yaml"),
71                 encoding='utf-8') as yfile:
72             template = Template(yfile.read())
73             body = yaml.safe_load(template.render(
74                 dockerhub_repo=os.getenv("DOCKERHUB_REPO",
75                                          self.dockerhub_repo)))
76             api_response = self.batchv1.create_namespaced_job(
77                 body=body, namespace=self.namespace)
78             self.__logger.info("Job %s created", api_response.metadata.name)
79             self.__logger.debug("create_namespaced_job: %s", api_response)
80         watch_job = watch.Watch()
81         for event in watch_job.stream(
82                 func=self.batchv1.list_namespaced_job,
83                 namespace=self.namespace, timeout_seconds=self.watch_timeout):
84             if (event["object"].metadata.name == self.job_name and
85                     event["object"].status.succeeded == 1):
86                 self.__logger.info(
87                     "%s started in %0.2f sec", event['object'].metadata.name,
88                     time.time()-self.start_time)
89                 watch_job.stop()
90         pods = self.corev1.list_namespaced_pod(
91             self.namespace, label_selector=f'job-name={self.job_name}')
92         self.pod = pods.items[0].metadata.name
93         self.pod_log = self.corev1.read_namespaced_pod_log(
94             name=self.pod, namespace=self.namespace)
95         self.__logger.info("\n\n%s", self.pod_log)
96
97     def run(self, **kwargs):
98         assert self.job_name
99         self.start_time = time.time()
100         try:
101             self.deploy_job()
102         except client.rest.ApiException:
103             self.__logger.exception("Cannot run %s", self.job_name)
104         self.stop_time = time.time()
105
106     def clean(self):
107         if self.pod:
108             try:
109                 api_response = self.corev1.delete_namespaced_pod(
110                     name=self.pod, namespace=self.namespace)
111                 self.__logger.debug("delete_namespaced_pod: %s", api_response)
112             except client.rest.ApiException:
113                 pass
114         if self.job_name:
115             try:
116                 api_response = self.batchv1.delete_namespaced_job(
117                     name=self.job_name, namespace=self.namespace)
118                 self.__logger.debug(
119                     "delete_namespaced_deployment: %s", api_response)
120             except client.rest.ApiException:
121                 pass
122         if self.namespace:
123             try:
124                 api_response = self.corev1.delete_namespace(self.namespace)
125                 self.__logger.debug("delete_namespace: %s", self.namespace)
126             except client.rest.ApiException:
127                 pass
128
129
130 class KubeHunter(SecurityTesting):
131     """kube-hunter hunts for security weaknesses in Kubernetes clusters.
132
133     See https://github.com/aquasecurity/kube-hunter for more details
134     """
135
136     __logger = logging.getLogger(__name__)
137
138     def __init__(self, **kwargs):
139         super().__init__(**kwargs)
140         self.job_name = "kube-hunter"
141         self.ns_generate_name = "kube-hunter-"
142
143     def process_results(self, **kwargs):
144         """Process kube-hunter details"""
145         self.details = json.loads(self.pod_log.splitlines()[-1])
146         if self.details["vulnerabilities"]:
147             self.result = 100
148             msg = prettytable.PrettyTable(
149                 header_style='upper', padding_width=5,
150                 field_names=['category', 'vulnerability', 'severity'])
151             severity = kwargs.get("severity", "none")
152             if severity == "low":
153                 allowed_severity = []
154             elif severity == "medium":
155                 allowed_severity = ["low"]
156             elif severity == "high":
157                 allowed_severity = ["low", "medium"]
158             else:
159                 self.__logger.warning(
160                     "Just printing all vulnerabilities as "
161                     "no severity criteria given")
162                 allowed_severity = ["low", "medium", "high"]
163             for vulnerability in self.details["vulnerabilities"]:
164                 if vulnerability["severity"] not in allowed_severity:
165                     self.result = 0
166                 msg.add_row(
167                     [vulnerability["category"], vulnerability["vulnerability"],
168                      vulnerability["severity"]])
169             self.__logger.warning("\n\n%s\n", msg.get_string())
170         if self.details["hunter_statistics"]:
171             msg = prettytable.PrettyTable(
172                 header_style='upper', padding_width=5,
173                 field_names=['name', 'description', 'vulnerabilities'])
174             for statistics in self.details["hunter_statistics"]:
175                 msg.add_row(
176                     [statistics["name"],
177                      textwrap.fill(statistics["description"], width=50),
178                      statistics["vulnerabilities"]])
179             self.__logger.info("\n\n%s\n", msg.get_string())
180
181     def run(self, **kwargs):
182         super().run(**kwargs)
183         try:
184             self.process_results(**kwargs)
185         except Exception:  # pylint: disable=broad-except
186             self.__logger.exception("Cannot process results")
187             self.result = 0
188
189
190 class KubeBench(SecurityTesting):
191     """kube-bench checks whether Kubernetes is deployed securelyself.
192
193     It runs the checks documented in the CIS Kubernetes Benchmark.
194
195     See https://github.com/aquasecurity/kube-bench for more details
196     """
197
198     __logger = logging.getLogger(__name__)
199
200     def __init__(self, **kwargs):
201         super().__init__(**kwargs)
202         self.job_name = "kube-bench"
203         self.ns_generate_name = "kube-bench-"
204
205     def run(self, **kwargs):
206         self.job_name = f'kube-bench-{kwargs.get("target", "node")}'
207         super().run(**kwargs)
208         self.details["report"] = ast.literal_eval(self.pod_log)
209         msg = prettytable.PrettyTable(
210             header_style='upper', padding_width=5,
211             field_names=['node_type', 'version', 'test_desc', 'pass',
212                          'fail', 'warn'])
213         for details in self.details["report"]["Controls"]:
214             for test in details['tests']:
215                 msg.add_row(
216                     [details['node_type'], details['version'], test['desc'],
217                      test['pass'], test['fail'], test['warn']])
218                 for result in test["results"]:
219                     if result['scored'] and result['status'] == 'FAIL':
220                         self.__logger.error(
221                             "%s\n%s", result['test_desc'],
222                             result['remediation'])
223         self.__logger.warning("Targets:\n\n%s\n", msg.get_string())
224         self.result = 100