Don't run disruptive hunter checks
[functest-kubernetes.git] / functest_kubernetes / security / security.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2020 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """
11 Define the parent for Kubernetes testing.
12 """
13
14 from __future__ import division
15
16 import ast
17 import json
18 import logging
19 import time
20 import textwrap
21 import yaml
22
23 from kubernetes import client
24 from kubernetes import config
25 from kubernetes import watch
26 import pkg_resources
27 import prettytable
28 from xtesting.core import testcase
29
30
31 class SecurityTesting(testcase.TestCase):
32     # pylint: disable=too-many-instance-attributes
33     """Run Security job"""
34     watch_timeout = 1200
35
36     __logger = logging.getLogger(__name__)
37
38     def __init__(self, **kwargs):
39         super(SecurityTesting, self).__init__(**kwargs)
40         config.load_kube_config()
41         self.corev1 = client.CoreV1Api()
42         self.batchv1 = client.BatchV1Api()
43         self.pod = None
44         self.pod_log = ""
45         self.job_name = None
46         self.output_log_name = 'functest-kubernetes.log'
47         self.output_debug_log_name = 'functest-kubernetes.debug.log'
48         self.namespace = ""
49
50     def deploy_job(self):
51         """Run Security job
52
53         It runs a single security job and then simply prints its output asis.
54         """
55
56         assert self.job_name
57         api_response = self.corev1.create_namespace(
58             client.V1Namespace(metadata=client.V1ObjectMeta(
59                 generate_name="ims-")))
60         self.namespace = api_response.metadata.name
61         self.__logger.debug("create_namespace: %s", api_response)
62         with open(pkg_resources.resource_filename(
63                 "functest_kubernetes",
64                 "security/{}.yaml".format(self.job_name))) as yfile:
65             body = yaml.safe_load(yfile)
66             api_response = self.batchv1.create_namespaced_job(
67                 body=body, namespace=self.namespace)
68             self.__logger.info("Job %s created", api_response.metadata.name)
69             self.__logger.debug("create_namespaced_job: %s", api_response)
70         watch_job = watch.Watch()
71         for event in watch_job.stream(
72                 func=self.batchv1.list_namespaced_job,
73                 namespace=self.namespace, timeout_seconds=self.watch_timeout):
74             if (event["object"].metadata.name == self.job_name and
75                     event["object"].status.succeeded == 1):
76                 self.__logger.info(
77                     "%s started in %0.2f sec", event['object'].metadata.name,
78                     time.time()-self.start_time)
79                 watch_job.stop()
80         pods = self.corev1.list_namespaced_pod(
81             self.namespace, label_selector='job-name={}'.format(self.job_name))
82         self.pod = pods.items[0].metadata.name
83         self.pod_log = self.corev1.read_namespaced_pod_log(
84             name=self.pod, namespace=self.namespace)
85         self.__logger.info("\n\n%s", self.pod_log)
86
87     def run(self, **kwargs):
88         assert self.job_name
89         self.start_time = time.time()
90         try:
91             self.deploy_job()
92         except client.rest.ApiException:
93             self.__logger.exception("Cannot run %s", self.job_name)
94         self.stop_time = time.time()
95
96     def clean(self):
97         if self.pod:
98             try:
99                 api_response = self.corev1.delete_namespaced_pod(
100                     name=self.pod, namespace=self.namespace)
101                 self.__logger.debug("delete_namespaced_pod: %s", api_response)
102             except client.rest.ApiException:
103                 pass
104         if self.job_name:
105             try:
106                 api_response = self.batchv1.delete_namespaced_job(
107                     name=self.job_name, namespace=self.namespace)
108                 self.__logger.debug(
109                     "delete_namespaced_deployment: %s", api_response)
110             except client.rest.ApiException:
111                 pass
112         if self.namespace:
113             try:
114                 api_response = self.corev1.delete_namespace(self.namespace)
115                 self.__logger.debug("delete_namespace: %s", self.namespace)
116             except client.rest.ApiException:
117                 pass
118
119
120 class KubeHunter(SecurityTesting):
121     """kube-hunter hunts for security weaknesses in Kubernetes clusters.
122
123     See https://github.com/aquasecurity/kube-hunter for more details
124     """
125
126     __logger = logging.getLogger(__name__)
127
128     def __init__(self, **kwargs):
129         super(KubeHunter, self).__init__(**kwargs)
130         self.job_name = "kube-hunter"
131
132     def process_results(self, **kwargs):
133         """Process kube-hunter details"""
134         self.details = json.loads(self.pod_log.splitlines()[-1])
135         if self.details["vulnerabilities"]:
136             self.result = 100
137             msg = prettytable.PrettyTable(
138                 header_style='upper', padding_width=5,
139                 field_names=['category', 'vulnerability', 'severity'])
140             severity = kwargs.get("severity", "high")
141             if severity == "low":
142                 allowed_severity = []
143             elif severity == "medium":
144                 allowed_severity = ["low"]
145             elif severity == "high":
146                 allowed_severity = ["low", "medium"]
147             else:
148                 self.__logger.warning(
149                     "Selecting high as default severity (%s is incorrect)",
150                     kwargs.get("severity", "high"))
151                 severity = "high"
152                 allowed_severity = ["low", "medium"]
153             for vulnerability in self.details["vulnerabilities"]:
154                 if vulnerability["severity"] in allowed_severity:
155                     self.__logger.warning(
156                         "Skipping %s (severity is configured as %s)",
157                         vulnerability["vulnerability"], severity)
158                 else:
159                     self.result = 0
160                 msg.add_row(
161                     [vulnerability["category"], vulnerability["vulnerability"],
162                      vulnerability["severity"]])
163             self.__logger.warning("\n\n%s\n", msg.get_string())
164         if self.details["hunter_statistics"]:
165             msg = prettytable.PrettyTable(
166                 header_style='upper', padding_width=5,
167                 field_names=['name', 'description', 'vulnerabilities'])
168             for statistics in self.details["hunter_statistics"]:
169                 msg.add_row(
170                     [statistics["name"],
171                      textwrap.fill(statistics["description"], width=50),
172                      statistics["vulnerabilities"]])
173             self.__logger.info("\n\n%s\n", msg.get_string())
174
175     def run(self, **kwargs):
176         super(KubeHunter, self).run(**kwargs)
177         try:
178             self.process_results(**kwargs)
179         except Exception:  # pylint: disable=broad-except
180             self.__logger.exception("Cannot process results")
181             self.result = 0
182
183
184 class KubeBench(SecurityTesting):
185     """kube-bench checks whether Kubernetes is deployed securelyself.
186
187     It runs the checks documented in the CIS Kubernetes Benchmark.
188
189     See https://github.com/aquasecurity/kube-bench for more details
190     """
191
192     __logger = logging.getLogger(__name__)
193
194     def run(self, **kwargs):
195         self.job_name = "kube-bench-{}".format(kwargs.get("target", "node"))
196         super(KubeBench, self).run(**kwargs)
197         self.details["report"] = ast.literal_eval(self.pod_log)
198         msg = prettytable.PrettyTable(
199             header_style='upper', padding_width=5,
200             field_names=['node_type', 'version', 'test_desc', 'pass',
201                          'fail', 'warn'])
202         for details in self.details["report"]:
203             for test in details['tests']:
204                 msg.add_row(
205                     [details['node_type'], details['version'], test['desc'],
206                      test['pass'], test['fail'], test['warn']])
207                 for result in test["results"]:
208                     if result['scored'] and result['status'] == 'FAIL':
209                         self.__logger.error(
210                             "%s\n%s", result['test_desc'],
211                             result['remediation'])
212         self.__logger.warning("Targets:\n\n%s\n", msg.get_string())
213         self.result = 100