Switch to focal in travis-ci gates
[functest-kubernetes.git] / functest_kubernetes / security / security.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2020 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """
11 Define the parent for Kubernetes testing.
12 """
13
14 from __future__ import division
15
16 import ast
17 import json
18 import logging
19 import os
20 import time
21 import textwrap
22 import yaml
23
24 from jinja2 import Template
25 from kubernetes import client
26 from kubernetes import config
27 from kubernetes import watch
28 import pkg_resources
29 import prettytable
30 from xtesting.core import testcase
31
32
33 class SecurityTesting(testcase.TestCase):
34     # pylint: disable=too-many-instance-attributes
35     """Run Security job"""
36     watch_timeout = 1200
37     dockerhub_repo = os.getenv("MIRROR_REPO", "docker.io")
38
39     __logger = logging.getLogger(__name__)
40
41     def __init__(self, **kwargs):
42         super(SecurityTesting, self).__init__(**kwargs)
43         config.load_kube_config()
44         self.corev1 = client.CoreV1Api()
45         self.batchv1 = client.BatchV1Api()
46         self.pod = None
47         self.pod_log = ""
48         self.job_name = None
49         self.output_log_name = 'functest-kubernetes.log'
50         self.output_debug_log_name = 'functest-kubernetes.debug.log'
51         self.namespace = ""
52         self.ns_generate_name = "security-"
53
54     def deploy_job(self):
55         """Run Security job
56
57         It runs a single security job and then simply prints its output asis.
58         """
59
60         assert self.job_name
61         api_response = self.corev1.create_namespace(
62             client.V1Namespace(metadata=client.V1ObjectMeta(
63                 generate_name=self.ns_generate_name)))
64         self.namespace = api_response.metadata.name
65         self.__logger.debug("create_namespace: %s", api_response)
66         with open(pkg_resources.resource_filename(
67                 "functest_kubernetes",
68                 "security/{}.yaml".format(self.job_name))) as yfile:
69             template = Template(yfile.read())
70             body = yaml.safe_load(template.render(
71                 dockerhub_repo=os.getenv("DOCKERHUB_REPO",
72                                          self.dockerhub_repo)))
73             api_response = self.batchv1.create_namespaced_job(
74                 body=body, namespace=self.namespace)
75             self.__logger.info("Job %s created", api_response.metadata.name)
76             self.__logger.debug("create_namespaced_job: %s", api_response)
77         watch_job = watch.Watch()
78         for event in watch_job.stream(
79                 func=self.batchv1.list_namespaced_job,
80                 namespace=self.namespace, timeout_seconds=self.watch_timeout):
81             if (event["object"].metadata.name == self.job_name and
82                     event["object"].status.succeeded == 1):
83                 self.__logger.info(
84                     "%s started in %0.2f sec", event['object'].metadata.name,
85                     time.time()-self.start_time)
86                 watch_job.stop()
87         pods = self.corev1.list_namespaced_pod(
88             self.namespace, label_selector='job-name={}'.format(self.job_name))
89         self.pod = pods.items[0].metadata.name
90         self.pod_log = self.corev1.read_namespaced_pod_log(
91             name=self.pod, namespace=self.namespace)
92         self.__logger.info("\n\n%s", self.pod_log)
93
94     def run(self, **kwargs):
95         assert self.job_name
96         self.start_time = time.time()
97         try:
98             self.deploy_job()
99         except client.rest.ApiException:
100             self.__logger.exception("Cannot run %s", self.job_name)
101         self.stop_time = time.time()
102
103     def clean(self):
104         if self.pod:
105             try:
106                 api_response = self.corev1.delete_namespaced_pod(
107                     name=self.pod, namespace=self.namespace)
108                 self.__logger.debug("delete_namespaced_pod: %s", api_response)
109             except client.rest.ApiException:
110                 pass
111         if self.job_name:
112             try:
113                 api_response = self.batchv1.delete_namespaced_job(
114                     name=self.job_name, namespace=self.namespace)
115                 self.__logger.debug(
116                     "delete_namespaced_deployment: %s", api_response)
117             except client.rest.ApiException:
118                 pass
119         if self.namespace:
120             try:
121                 api_response = self.corev1.delete_namespace(self.namespace)
122                 self.__logger.debug("delete_namespace: %s", self.namespace)
123             except client.rest.ApiException:
124                 pass
125
126
127 class KubeHunter(SecurityTesting):
128     """kube-hunter hunts for security weaknesses in Kubernetes clusters.
129
130     See https://github.com/aquasecurity/kube-hunter for more details
131     """
132
133     __logger = logging.getLogger(__name__)
134
135     def __init__(self, **kwargs):
136         super(KubeHunter, self).__init__(**kwargs)
137         self.job_name = "kube-hunter"
138         self.ns_generate_name = "kube-hunter-"
139
140     def process_results(self, **kwargs):
141         """Process kube-hunter details"""
142         self.details = json.loads(self.pod_log.splitlines()[-1])
143         if self.details["vulnerabilities"]:
144             self.result = 100
145             msg = prettytable.PrettyTable(
146                 header_style='upper', padding_width=5,
147                 field_names=['category', 'vulnerability', 'severity'])
148             severity = kwargs.get("severity", "high")
149             if severity == "low":
150                 allowed_severity = []
151             elif severity == "medium":
152                 allowed_severity = ["low"]
153             elif severity == "high":
154                 allowed_severity = ["low", "medium"]
155             else:
156                 self.__logger.warning(
157                     "Selecting high as default severity (%s is incorrect)",
158                     kwargs.get("severity", "high"))
159                 severity = "high"
160                 allowed_severity = ["low", "medium"]
161             for vulnerability in self.details["vulnerabilities"]:
162                 if vulnerability["severity"] in allowed_severity:
163                     self.__logger.warning(
164                         "Skipping %s (severity is configured as %s)",
165                         vulnerability["vulnerability"], severity)
166                 else:
167                     self.result = 0
168                 msg.add_row(
169                     [vulnerability["category"], vulnerability["vulnerability"],
170                      vulnerability["severity"]])
171             self.__logger.warning("\n\n%s\n", msg.get_string())
172         if self.details["hunter_statistics"]:
173             msg = prettytable.PrettyTable(
174                 header_style='upper', padding_width=5,
175                 field_names=['name', 'description', 'vulnerabilities'])
176             for statistics in self.details["hunter_statistics"]:
177                 msg.add_row(
178                     [statistics["name"],
179                      textwrap.fill(statistics["description"], width=50),
180                      statistics["vulnerabilities"]])
181             self.__logger.info("\n\n%s\n", msg.get_string())
182
183     def run(self, **kwargs):
184         super(KubeHunter, self).run(**kwargs)
185         try:
186             self.process_results(**kwargs)
187         except Exception:  # pylint: disable=broad-except
188             self.__logger.exception("Cannot process results")
189             self.result = 0
190
191
192 class KubeBench(SecurityTesting):
193     """kube-bench checks whether Kubernetes is deployed securelyself.
194
195     It runs the checks documented in the CIS Kubernetes Benchmark.
196
197     See https://github.com/aquasecurity/kube-bench for more details
198     """
199
200     __logger = logging.getLogger(__name__)
201
202     def __init__(self, **kwargs):
203         super(KubeBench, self).__init__(**kwargs)
204         self.job_name = "kube-bench"
205         self.ns_generate_name = "kube-bench-"
206
207     def run(self, **kwargs):
208         self.job_name = "kube-bench-{}".format(kwargs.get("target", "node"))
209         super(KubeBench, self).run(**kwargs)
210         self.details["report"] = ast.literal_eval(self.pod_log)
211         msg = prettytable.PrettyTable(
212             header_style='upper', padding_width=5,
213             field_names=['node_type', 'version', 'test_desc', 'pass',
214                          'fail', 'warn'])
215         for details in self.details["report"]:
216             for test in details['tests']:
217                 msg.add_row(
218                     [details['node_type'], details['version'], test['desc'],
219                      test['pass'], test['fail'], test['warn']])
220                 for result in test["results"]:
221                     if result['scored'] and result['status'] == 'FAIL':
222                         self.__logger.error(
223                             "%s\n%s", result['test_desc'],
224                             result['remediation'])
225         self.__logger.warning("Targets:\n\n%s\n", msg.get_string())
226         self.result = 100