Apply privileged pod security standard to kube-bench
[functest-kubernetes.git] / functest_kubernetes / security / security.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2020 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """
11 Define the parent for Kubernetes testing.
12 """
13
14 from __future__ import division
15
16 import ast
17 import json
18 import logging
19 import os
20 import time
21 import textwrap
22 import yaml
23
24 from jinja2 import Template
25 from kubernetes import client
26 from kubernetes import config
27 from kubernetes import watch
28 import pkg_resources
29 import prettytable
30
31 from xtesting.core import testcase
32
33
34 class SecurityTesting(testcase.TestCase):
35     # pylint: disable=too-many-instance-attributes
36     """Run Security job"""
37     watch_timeout = 1200
38     dockerhub_repo = os.getenv("MIRROR_REPO", "docker.io")
39
40     __logger = logging.getLogger(__name__)
41
42     def __init__(self, **kwargs):
43         super().__init__(**kwargs)
44         config.load_kube_config()
45         self.corev1 = client.CoreV1Api()
46         self.batchv1 = client.BatchV1Api()
47         self.pod = None
48         self.pod_log = ""
49         self.job_name = None
50         self.output_log_name = 'functest-kubernetes.log'
51         self.output_debug_log_name = 'functest-kubernetes.debug.log'
52         self.namespace = ""
53         self.ns_generate_name = "security-"
54         self.pss = "baseline"
55
56     def deploy_job(self):
57         """Run Security job
58
59         It runs a single security job and then simply prints its output asis.
60         """
61
62         assert self.job_name
63         api_response = self.corev1.create_namespace(
64             client.V1Namespace(metadata=client.V1ObjectMeta(
65                 generate_name=self.ns_generate_name,
66                 labels={"pod-security.kubernetes.io/enforce": self.pss})))
67         self.namespace = api_response.metadata.name
68         self.__logger.debug("create_namespace: %s", api_response)
69         with open(pkg_resources.resource_filename(
70                 "functest_kubernetes",
71                 f"security/{self.job_name}.yaml"),
72                 encoding='utf-8') as yfile:
73             template = Template(yfile.read())
74             body = yaml.safe_load(template.render(
75                 dockerhub_repo=os.getenv("DOCKERHUB_REPO",
76                                          self.dockerhub_repo)))
77             api_response = self.batchv1.create_namespaced_job(
78                 body=body, namespace=self.namespace)
79             self.__logger.info("Job %s created", api_response.metadata.name)
80             self.__logger.debug("create_namespaced_job: %s", api_response)
81         watch_job = watch.Watch()
82         for event in watch_job.stream(
83                 func=self.batchv1.list_namespaced_job,
84                 namespace=self.namespace, timeout_seconds=self.watch_timeout):
85             if (event["object"].metadata.name == self.job_name and
86                     event["object"].status.succeeded == 1):
87                 self.__logger.info(
88                     "%s started in %0.2f sec", event['object'].metadata.name,
89                     time.time()-self.start_time)
90                 watch_job.stop()
91         pods = self.corev1.list_namespaced_pod(
92             self.namespace, label_selector=f'job-name={self.job_name}')
93         self.pod = pods.items[0].metadata.name
94         self.pod_log = self.corev1.read_namespaced_pod_log(
95             name=self.pod, namespace=self.namespace)
96         self.__logger.info("\n\n%s", self.pod_log)
97
98     def run(self, **kwargs):
99         assert self.job_name
100         self.start_time = time.time()
101         try:
102             self.deploy_job()
103         except client.rest.ApiException:
104             self.__logger.exception("Cannot run %s", self.job_name)
105         self.stop_time = time.time()
106
107     def clean(self):
108         if self.pod:
109             try:
110                 api_response = self.corev1.delete_namespaced_pod(
111                     name=self.pod, namespace=self.namespace)
112                 self.__logger.debug("delete_namespaced_pod: %s", api_response)
113             except client.rest.ApiException:
114                 pass
115         if self.job_name:
116             try:
117                 api_response = self.batchv1.delete_namespaced_job(
118                     name=self.job_name, namespace=self.namespace)
119                 self.__logger.debug(
120                     "delete_namespaced_deployment: %s", api_response)
121             except client.rest.ApiException:
122                 pass
123         if self.namespace:
124             try:
125                 api_response = self.corev1.delete_namespace(self.namespace)
126                 self.__logger.debug("delete_namespace: %s", self.namespace)
127             except client.rest.ApiException:
128                 pass
129
130
131 class KubeHunter(SecurityTesting):
132     """kube-hunter hunts for security weaknesses in Kubernetes clusters.
133
134     See https://github.com/aquasecurity/kube-hunter for more details
135     """
136
137     __logger = logging.getLogger(__name__)
138
139     def __init__(self, **kwargs):
140         super().__init__(**kwargs)
141         self.job_name = "kube-hunter"
142         self.ns_generate_name = "kube-hunter-"
143
144     def process_results(self, **kwargs):
145         """Process kube-hunter details"""
146         self.details = json.loads(self.pod_log.splitlines()[-1])
147         if self.details["vulnerabilities"]:
148             self.result = 100
149             msg = prettytable.PrettyTable(
150                 header_style='upper', padding_width=5,
151                 field_names=['category', 'vulnerability', 'severity'])
152             severity = kwargs.get("severity", "none")
153             if severity == "low":
154                 allowed_severity = []
155             elif severity == "medium":
156                 allowed_severity = ["low"]
157             elif severity == "high":
158                 allowed_severity = ["low", "medium"]
159             else:
160                 self.__logger.warning(
161                     "Just printing all vulnerabilities as "
162                     "no severity criteria given")
163                 allowed_severity = ["low", "medium", "high"]
164             for vulnerability in self.details["vulnerabilities"]:
165                 if vulnerability["severity"] not in allowed_severity:
166                     self.result = 0
167                 msg.add_row(
168                     [vulnerability["category"], vulnerability["vulnerability"],
169                      vulnerability["severity"]])
170             self.__logger.warning("\n\n%s\n", msg.get_string())
171         if self.details["hunter_statistics"]:
172             msg = prettytable.PrettyTable(
173                 header_style='upper', padding_width=5,
174                 field_names=['name', 'description', 'vulnerabilities'])
175             for statistics in self.details["hunter_statistics"]:
176                 msg.add_row(
177                     [statistics["name"],
178                      textwrap.fill(statistics["description"], width=50),
179                      statistics["vulnerabilities"]])
180             self.__logger.info("\n\n%s\n", msg.get_string())
181
182     def run(self, **kwargs):
183         super().run(**kwargs)
184         try:
185             self.process_results(**kwargs)
186         except Exception:  # pylint: disable=broad-except
187             self.__logger.exception("Cannot process results")
188             self.result = 0
189
190
191 class KubeBench(SecurityTesting):
192     """kube-bench checks whether Kubernetes is deployed securelyself.
193
194     It runs the checks documented in the CIS Kubernetes Benchmark.
195
196     See https://github.com/aquasecurity/kube-bench for more details
197     """
198
199     __logger = logging.getLogger(__name__)
200
201     def __init__(self, **kwargs):
202         super().__init__(**kwargs)
203         self.job_name = "kube-bench"
204         self.ns_generate_name = "kube-bench-"
205         self.pss = "privileged"
206
207     def run(self, **kwargs):
208         self.job_name = f'kube-bench-{kwargs.get("target", "node")}'
209         super().run(**kwargs)
210         self.details["report"] = ast.literal_eval(self.pod_log)
211         msg = prettytable.PrettyTable(
212             header_style='upper', padding_width=5,
213             field_names=['node_type', 'version', 'test_desc', 'pass',
214                          'fail', 'warn'])
215         for details in self.details["report"]["Controls"]:
216             for test in details['tests']:
217                 msg.add_row(
218                     [details['node_type'], details['version'], test['desc'],
219                      test['pass'], test['fail'], test['warn']])
220                 for result in test["results"]:
221                     if result['scored'] and result['status'] == 'FAIL':
222                         self.__logger.error(
223                             "%s\n%s", result['test_desc'],
224                             result['remediation'])
225         self.__logger.warning("Targets:\n\n%s\n", msg.get_string())
226         self.result = 100