Enhance kube-hunter result postprocessing
[functest-kubernetes.git] / functest_kubernetes / security / security.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2020 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """
11 Define the parent for Kubernetes testing.
12 """
13
14 from __future__ import division
15
16 import json
17 import logging
18 import time
19 import textwrap
20 import yaml
21
22 from kubernetes import client
23 from kubernetes import config
24 from kubernetes import watch
25 import pkg_resources
26 import prettytable
27 from xtesting.core import testcase
28
29
30 class SecurityTesting(testcase.TestCase):
31     # pylint: disable=too-many-instance-attributes
32     """Run Security job"""
33     watch_timeout = 1200
34
35     __logger = logging.getLogger(__name__)
36
37     def __init__(self, **kwargs):
38         super(SecurityTesting, self).__init__(**kwargs)
39         config.load_kube_config()
40         self.corev1 = client.CoreV1Api()
41         self.batchv1 = client.BatchV1Api()
42         self.pod = None
43         self.pod_log = ""
44         self.job_name = None
45         self.output_log_name = 'functest-kubernetes.log'
46         self.output_debug_log_name = 'functest-kubernetes.debug.log'
47         self.namespace = ""
48
49     def deploy_job(self):
50         """Run Security job
51
52         It runs a single security job and then simply prints its output asis.
53         """
54
55         assert self.job_name
56         api_response = self.corev1.create_namespace(
57             client.V1Namespace(metadata=client.V1ObjectMeta(
58                 generate_name="ims-")))
59         self.namespace = api_response.metadata.name
60         self.__logger.debug("create_namespace: %s", api_response)
61         # pylint: disable=bad-continuation
62         with open(pkg_resources.resource_filename(
63                 "functest_kubernetes",
64                 "security/{}.yaml".format(self.job_name))) as yfile:
65             body = yaml.safe_load(yfile)
66             api_response = self.batchv1.create_namespaced_job(
67                 body=body, namespace=self.namespace)
68             self.__logger.info("Job %s created", api_response.metadata.name)
69             self.__logger.debug("create_namespaced_job: %s", api_response)
70         watch_job = watch.Watch()
71         for event in watch_job.stream(
72                 func=self.batchv1.list_namespaced_job,
73                 namespace=self.namespace, timeout_seconds=self.watch_timeout):
74             if (event["object"].metadata.name == self.job_name and
75                     event["object"].status.succeeded == 1):
76                 self.__logger.info(
77                     "%s started in %0.2f sec", event['object'].metadata.name,
78                     time.time()-self.start_time)
79                 watch_job.stop()
80         pods = self.corev1.list_namespaced_pod(
81             self.namespace, label_selector='job-name={}'.format(self.job_name))
82         self.pod = pods.items[0].metadata.name
83         self.pod_log = self.corev1.read_namespaced_pod_log(
84             name=self.pod, namespace=self.namespace)
85         self.__logger.info("\n\n%s", self.pod_log)
86
87     def run(self, **kwargs):
88         assert self.job_name
89         self.start_time = time.time()
90         try:
91             self.deploy_job()
92         except client.rest.ApiException:
93             self.__logger.exception("Cannot run %s", self.job_name)
94         self.stop_time = time.time()
95
96     def clean(self):
97         if self.pod:
98             try:
99                 api_response = self.corev1.delete_namespaced_pod(
100                     name=self.pod, namespace=self.namespace)
101                 self.__logger.debug("delete_namespaced_pod: %s", api_response)
102             except client.rest.ApiException:
103                 pass
104         if self.job_name:
105             try:
106                 api_response = self.batchv1.delete_namespaced_job(
107                     name=self.job_name, namespace=self.namespace)
108                 self.__logger.debug(
109                     "delete_namespaced_deployment: %s", api_response)
110             except client.rest.ApiException:
111                 pass
112         if self.namespace:
113             try:
114                 api_response = self.corev1.delete_namespace(self.namespace)
115                 self.__logger.debug("delete_namespace: %s", self.namespace)
116             except client.rest.ApiException:
117                 pass
118
119
120 class KubeHunter(SecurityTesting):
121     """kube-hunter hunts for security weaknesses in Kubernetes clusters.
122
123     See https://github.com/aquasecurity/kube-hunter for more details
124     """
125
126     __logger = logging.getLogger(__name__)
127
128     def __init__(self, **kwargs):
129         super(KubeHunter, self).__init__(**kwargs)
130         self.job_name = "kube-hunter"
131
132     def process_results(self, **kwargs):
133         """Process kube-hunter details"""
134         self.details = json.loads(self.pod_log.splitlines()[-1])
135         if self.details["vulnerabilities"]:
136             self.result = 100
137             msg = prettytable.PrettyTable(
138                 header_style='upper', padding_width=5,
139                 field_names=['category', 'vulnerability', 'severity'])
140             severity = kwargs.get("severity", "high")
141             if severity == "low":
142                 allowed_severity = []
143             elif severity == "medium":
144                 allowed_severity = ["low"]
145             elif severity == "high":
146                 allowed_severity = ["low", "medium"]
147             else:
148                 self.__logger.warning(
149                     "Selecting high as default severity (%s is incorrect)",
150                     kwargs.get("severity", "high"))
151                 severity = "high"
152                 allowed_severity = ["low", "medium"]
153             for vulnerability in self.details["vulnerabilities"]:
154                 if vulnerability["severity"] in allowed_severity:
155                     self.__logger.warning(
156                         "Skipping %s (severity is configured as %s)",
157                         vulnerability["vulnerability"], severity)
158                 else:
159                     self.result = 0
160                 msg.add_row(
161                     [vulnerability["category"], vulnerability["vulnerability"],
162                      vulnerability["severity"]])
163             self.__logger.warning("\n\n%s\n", msg.get_string())
164         if self.details["hunter_statistics"]:
165             msg = prettytable.PrettyTable(
166                 header_style='upper', padding_width=5,
167                 field_names=['name', 'description', 'vulnerabilities'])
168             for statistics in self.details["hunter_statistics"]:
169                 msg.add_row(
170                     [statistics["name"],
171                      textwrap.fill(statistics["description"], width=50),
172                      statistics["vulnerabilities"]])
173             self.__logger.info("\n\n%s\n", msg.get_string())
174
175     def run(self, **kwargs):
176         super(KubeHunter, self).run(**kwargs)
177         try:
178             self.process_results(**kwargs)
179         except Exception:  # pylint: disable=broad-except
180             self.__logger.exception("Cannot process results")
181             self.result = 0
182
183
184 class KubeBench(SecurityTesting):
185     """kube-bench checks whether Kubernetes is deployed securelyself.
186
187     It runs the checks documented in the CIS Kubernetes Benchmark.
188
189     See https://github.com/aquasecurity/kube-bench for more details
190     """
191
192     def __init__(self, **kwargs):
193         super(KubeBench, self).__init__(**kwargs)
194         self.job_name = "kube-bench"
195
196     def run(self, **kwargs):
197         super(KubeBench, self).run(**kwargs)
198         self.result = 100