Enhance kube-hunter result postprocessing
[functest-kubernetes.git] / functest_kubernetes / security / security.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2020 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """
11 Define the parent for Kubernetes testing.
12 """
13
14 from __future__ import division
15
16 import json
17 import logging
18 import time
19 import textwrap
20 import yaml
21
22 from kubernetes import client
23 from kubernetes import config
24 from kubernetes import watch
25 import pkg_resources
26 import prettytable
27 from xtesting.core import testcase
28
29
30 class SecurityTesting(testcase.TestCase):
31     # pylint: disable=too-many-instance-attributes
32     """Run Security job"""
33     watch_timeout = 1200
34
35     __logger = logging.getLogger(__name__)
36
37     def __init__(self, **kwargs):
38         super(SecurityTesting, self).__init__(**kwargs)
39         config.load_kube_config()
40         self.corev1 = client.CoreV1Api()
41         self.batchv1 = client.BatchV1Api()
42         self.pod = None
43         self.pod_log = ""
44         self.job_name = None
45         self.output_log_name = 'functest-kubernetes.log'
46         self.output_debug_log_name = 'functest-kubernetes.debug.log'
47         self.namespace = ""
48
49     def deploy_job(self):
50         """Run Security job
51
52         It runs a single security job and then simply prints its output asis.
53         """
54
55         assert self.job_name
56         api_response = self.corev1.create_namespace(
57             client.V1Namespace(metadata=client.V1ObjectMeta(
58                 generate_name="ims-")))
59         self.namespace = api_response.metadata.name
60         self.__logger.debug("create_namespace: %s", api_response)
61         with open(pkg_resources.resource_filename(
62                 "functest_kubernetes",
63                 "security/{}.yaml".format(self.job_name))) as yfile:
64             body = yaml.safe_load(yfile)
65             api_response = self.batchv1.create_namespaced_job(
66                 body=body, namespace=self.namespace)
67             self.__logger.info("Job %s created", api_response.metadata.name)
68             self.__logger.debug("create_namespaced_job: %s", api_response)
69         watch_job = watch.Watch()
70         for event in watch_job.stream(
71                 func=self.batchv1.list_namespaced_job,
72                 namespace=self.namespace, timeout_seconds=self.watch_timeout):
73             if (event["object"].metadata.name == self.job_name and
74                     event["object"].status.succeeded == 1):
75                 self.__logger.info(
76                     "%s started in %0.2f sec", event['object'].metadata.name,
77                     time.time()-self.start_time)
78                 watch_job.stop()
79         pods = self.corev1.list_namespaced_pod(
80             self.namespace, label_selector='job-name={}'.format(self.job_name))
81         self.pod = pods.items[0].metadata.name
82         self.pod_log = self.corev1.read_namespaced_pod_log(
83             name=self.pod, namespace=self.namespace)
84         self.__logger.info("\n\n%s", self.pod_log)
85
86     def run(self, **kwargs):
87         assert self.job_name
88         self.start_time = time.time()
89         try:
90             self.deploy_job()
91         except client.rest.ApiException:
92             self.__logger.exception("Cannot run %s", self.job_name)
93         self.stop_time = time.time()
94
95     def clean(self):
96         if self.pod:
97             try:
98                 api_response = self.corev1.delete_namespaced_pod(
99                     name=self.pod, namespace=self.namespace)
100                 self.__logger.debug("delete_namespaced_pod: %s", api_response)
101             except client.rest.ApiException:
102                 pass
103         if self.job_name:
104             try:
105                 api_response = self.batchv1.delete_namespaced_job(
106                     name=self.job_name, namespace=self.namespace)
107                 self.__logger.debug(
108                     "delete_namespaced_deployment: %s", api_response)
109             except client.rest.ApiException:
110                 pass
111         if self.namespace:
112             try:
113                 api_response = self.corev1.delete_namespace(self.namespace)
114                 self.__logger.debug("delete_namespace: %s", self.namespace)
115             except client.rest.ApiException:
116                 pass
117
118
119 class KubeHunter(SecurityTesting):
120     """kube-hunter hunts for security weaknesses in Kubernetes clusters.
121
122     See https://github.com/aquasecurity/kube-hunter for more details
123     """
124
125     __logger = logging.getLogger(__name__)
126
127     def __init__(self, **kwargs):
128         super(KubeHunter, self).__init__(**kwargs)
129         self.job_name = "kube-hunter"
130
131     def process_results(self, **kwargs):
132         """Process kube-hunter details"""
133         self.details = json.loads(self.pod_log.splitlines()[-1])
134         if self.details["vulnerabilities"]:
135             self.result = 100
136             msg = prettytable.PrettyTable(
137                 header_style='upper', padding_width=5,
138                 field_names=['category', 'vulnerability', 'severity'])
139             severity = kwargs.get("severity", "high")
140             if severity == "low":
141                 allowed_severity = []
142             elif severity == "medium":
143                 allowed_severity = ["low"]
144             elif severity == "high":
145                 allowed_severity = ["low", "medium"]
146             else:
147                 self.__logger.warning(
148                     "Selecting high as default severity (%s is incorrect)",
149                     kwargs.get("severity", "high"))
150                 severity = "high"
151                 allowed_severity = ["low", "medium"]
152             for vulnerability in self.details["vulnerabilities"]:
153                 if vulnerability["severity"] in allowed_severity:
154                     self.__logger.warning(
155                         "Skipping %s (severity is configured as %s)",
156                         vulnerability["vulnerability"], severity)
157                 else:
158                     self.result = 0
159                 msg.add_row(
160                     [vulnerability["category"], vulnerability["vulnerability"],
161                      vulnerability["severity"]])
162             self.__logger.warning("\n\n%s\n", msg.get_string())
163         if self.details["hunter_statistics"]:
164             msg = prettytable.PrettyTable(
165                 header_style='upper', padding_width=5,
166                 field_names=['name', 'description', 'vulnerabilities'])
167             for statistics in self.details["hunter_statistics"]:
168                 msg.add_row(
169                     [statistics["name"],
170                      textwrap.fill(statistics["description"], width=50),
171                      statistics["vulnerabilities"]])
172             self.__logger.info("\n\n%s\n", msg.get_string())
173
174     def run(self, **kwargs):
175         super(KubeHunter, self).run(**kwargs)
176         try:
177             self.process_results(**kwargs)
178         except Exception:  # pylint: disable=broad-except
179             self.__logger.exception("Cannot process results")
180             self.result = 0
181
182
183 class KubeBench(SecurityTesting):
184     """kube-bench checks whether Kubernetes is deployed securelyself.
185
186     It runs the checks documented in the CIS Kubernetes Benchmark.
187
188     See https://github.com/aquasecurity/kube-bench for more details
189     """
190
191     def __init__(self, **kwargs):
192         super(KubeBench, self).__init__(**kwargs)
193         self.job_name = "kube-bench"
194
195     def run(self, **kwargs):
196         super(KubeBench, self).run(**kwargs)
197         self.result = 100