Split kube-bench master and node
[functest-kubernetes.git] / functest_kubernetes / security / security.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2020 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """
11 Define the parent for Kubernetes testing.
12 """
13
14 from __future__ import division
15
16 import ast
17 import json
18 import logging
19 import time
20 import textwrap
21 import yaml
22
23 from kubernetes import client
24 from kubernetes import config
25 from kubernetes import watch
26 import pkg_resources
27 import prettytable
28 from xtesting.core import testcase
29
30
31 class SecurityTesting(testcase.TestCase):
32     # pylint: disable=too-many-instance-attributes
33     """Run Security job"""
34     watch_timeout = 1200
35
36     __logger = logging.getLogger(__name__)
37
38     def __init__(self, **kwargs):
39         super(SecurityTesting, self).__init__(**kwargs)
40         config.load_kube_config()
41         self.corev1 = client.CoreV1Api()
42         self.batchv1 = client.BatchV1Api()
43         self.pod = None
44         self.pod_log = ""
45         self.job_name = None
46         self.output_log_name = 'functest-kubernetes.log'
47         self.output_debug_log_name = 'functest-kubernetes.debug.log'
48         self.namespace = ""
49
50     def deploy_job(self):
51         """Run Security job
52
53         It runs a single security job and then simply prints its output asis.
54         """
55
56         assert self.job_name
57         api_response = self.corev1.create_namespace(
58             client.V1Namespace(metadata=client.V1ObjectMeta(
59                 generate_name="ims-")))
60         self.namespace = api_response.metadata.name
61         self.__logger.debug("create_namespace: %s", api_response)
62         # pylint: disable=bad-continuation
63         with open(pkg_resources.resource_filename(
64                 "functest_kubernetes",
65                 "security/{}.yaml".format(self.job_name))) as yfile:
66             body = yaml.safe_load(yfile)
67             api_response = self.batchv1.create_namespaced_job(
68                 body=body, namespace=self.namespace)
69             self.__logger.info("Job %s created", api_response.metadata.name)
70             self.__logger.debug("create_namespaced_job: %s", api_response)
71         watch_job = watch.Watch()
72         for event in watch_job.stream(
73                 func=self.batchv1.list_namespaced_job,
74                 namespace=self.namespace, timeout_seconds=self.watch_timeout):
75             if (event["object"].metadata.name == self.job_name and
76                     event["object"].status.succeeded == 1):
77                 self.__logger.info(
78                     "%s started in %0.2f sec", event['object'].metadata.name,
79                     time.time()-self.start_time)
80                 watch_job.stop()
81         pods = self.corev1.list_namespaced_pod(
82             self.namespace, label_selector='job-name={}'.format(self.job_name))
83         self.pod = pods.items[0].metadata.name
84         self.pod_log = self.corev1.read_namespaced_pod_log(
85             name=self.pod, namespace=self.namespace)
86         self.__logger.info("\n\n%s", self.pod_log)
87
88     def run(self, **kwargs):
89         assert self.job_name
90         self.start_time = time.time()
91         try:
92             self.deploy_job()
93         except client.rest.ApiException:
94             self.__logger.exception("Cannot run %s", self.job_name)
95         self.stop_time = time.time()
96
97     def clean(self):
98         if self.pod:
99             try:
100                 api_response = self.corev1.delete_namespaced_pod(
101                     name=self.pod, namespace=self.namespace)
102                 self.__logger.debug("delete_namespaced_pod: %s", api_response)
103             except client.rest.ApiException:
104                 pass
105         if self.job_name:
106             try:
107                 api_response = self.batchv1.delete_namespaced_job(
108                     name=self.job_name, namespace=self.namespace)
109                 self.__logger.debug(
110                     "delete_namespaced_deployment: %s", api_response)
111             except client.rest.ApiException:
112                 pass
113         if self.namespace:
114             try:
115                 api_response = self.corev1.delete_namespace(self.namespace)
116                 self.__logger.debug("delete_namespace: %s", self.namespace)
117             except client.rest.ApiException:
118                 pass
119
120
121 class KubeHunter(SecurityTesting):
122     """kube-hunter hunts for security weaknesses in Kubernetes clusters.
123
124     See https://github.com/aquasecurity/kube-hunter for more details
125     """
126
127     __logger = logging.getLogger(__name__)
128
129     def __init__(self, **kwargs):
130         super(KubeHunter, self).__init__(**kwargs)
131         self.job_name = "kube-hunter"
132
133     def process_results(self, **kwargs):
134         """Process kube-hunter details"""
135         self.details = json.loads(self.pod_log.splitlines()[-1])
136         if self.details["vulnerabilities"]:
137             self.result = 100
138             msg = prettytable.PrettyTable(
139                 header_style='upper', padding_width=5,
140                 field_names=['category', 'vulnerability', 'severity'])
141             severity = kwargs.get("severity", "high")
142             if severity == "low":
143                 allowed_severity = []
144             elif severity == "medium":
145                 allowed_severity = ["low"]
146             elif severity == "high":
147                 allowed_severity = ["low", "medium"]
148             else:
149                 self.__logger.warning(
150                     "Selecting high as default severity (%s is incorrect)",
151                     kwargs.get("severity", "high"))
152                 severity = "high"
153                 allowed_severity = ["low", "medium"]
154             for vulnerability in self.details["vulnerabilities"]:
155                 if vulnerability["severity"] in allowed_severity:
156                     self.__logger.warning(
157                         "Skipping %s (severity is configured as %s)",
158                         vulnerability["vulnerability"], severity)
159                 else:
160                     self.result = 0
161                 msg.add_row(
162                     [vulnerability["category"], vulnerability["vulnerability"],
163                      vulnerability["severity"]])
164             self.__logger.warning("\n\n%s\n", msg.get_string())
165         if self.details["hunter_statistics"]:
166             msg = prettytable.PrettyTable(
167                 header_style='upper', padding_width=5,
168                 field_names=['name', 'description', 'vulnerabilities'])
169             for statistics in self.details["hunter_statistics"]:
170                 msg.add_row(
171                     [statistics["name"],
172                      textwrap.fill(statistics["description"], width=50),
173                      statistics["vulnerabilities"]])
174             self.__logger.info("\n\n%s\n", msg.get_string())
175
176     def run(self, **kwargs):
177         super(KubeHunter, self).run(**kwargs)
178         try:
179             self.process_results(**kwargs)
180         except Exception:  # pylint: disable=broad-except
181             self.__logger.exception("Cannot process results")
182             self.result = 0
183
184
185 class KubeBench(SecurityTesting):
186     """kube-bench checks whether Kubernetes is deployed securelyself.
187
188     It runs the checks documented in the CIS Kubernetes Benchmark.
189
190     See https://github.com/aquasecurity/kube-bench for more details
191     """
192
193     __logger = logging.getLogger(__name__)
194
195     def run(self, **kwargs):
196         self.job_name = "kube-bench-{}".format(kwargs.get("target", "node"))
197         super(KubeBench, self).run(**kwargs)
198         self.details["report"] = ast.literal_eval(self.pod_log)
199         msg = prettytable.PrettyTable(
200             header_style='upper', padding_width=5,
201             field_names=['node_type', 'version', 'test_desc', 'pass',
202                          'fail', 'warn'])
203         for details in self.details["report"]:
204             for test in details['tests']:
205                 msg.add_row(
206                     [details['node_type'], details['version'], test['desc'],
207                      test['pass'], test['fail'], test['warn']])
208                 for result in test["results"]:
209                     if result['scored'] and result['status'] == 'FAIL':
210                         self.__logger.error(
211                             "%s\n%s", result['test_desc'],
212                             result['remediation'])
213         self.__logger.warning("Targets:\n\n%s\n", msg.get_string())
214         self.result = 100