f03845a48b5da5d87e5885f177f1718503ccf928
[functest-kubernetes.git] / functest_kubernetes / security / security.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2020 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """
11 Define the parent for Kubernetes testing.
12 """
13
14 from __future__ import division
15
16 import ast
17 import json
18 import logging
19 import os
20 import time
21 import textwrap
22 import yaml
23
24 from jinja2 import Template
25 from kubernetes import client
26 from kubernetes import config
27 from kubernetes import watch
28 import pkg_resources
29 import prettytable
30
31 from xtesting.core import testcase
32
33
34 class SecurityTesting(testcase.TestCase):
35     # pylint: disable=too-many-instance-attributes
36     """Run Security job"""
37     watch_timeout = 1200
38     dockerhub_repo = os.getenv("MIRROR_REPO", "docker.io")
39
40     __logger = logging.getLogger(__name__)
41
42     def __init__(self, **kwargs):
43         super().__init__(**kwargs)
44         config.load_kube_config()
45         self.corev1 = client.CoreV1Api()
46         self.batchv1 = client.BatchV1Api()
47         self.pod = None
48         self.pod_log = ""
49         self.job_name = None
50         self.output_log_name = 'functest-kubernetes.log'
51         self.output_debug_log_name = 'functest-kubernetes.debug.log'
52         self.namespace = ""
53         self.ns_generate_name = "security-"
54
55     def deploy_job(self):
56         """Run Security job
57
58         It runs a single security job and then simply prints its output asis.
59         """
60
61         assert self.job_name
62         api_response = self.corev1.create_namespace(
63             client.V1Namespace(metadata=client.V1ObjectMeta(
64                 generate_name=self.ns_generate_name)))
65         self.namespace = api_response.metadata.name
66         self.__logger.debug("create_namespace: %s", api_response)
67         with open(pkg_resources.resource_filename(
68                 "functest_kubernetes",
69                 f"security/{self.job_name}.yaml"),
70                 encoding='utf-8') as yfile:
71             template = Template(yfile.read())
72             body = yaml.safe_load(template.render(
73                 dockerhub_repo=os.getenv("DOCKERHUB_REPO",
74                                          self.dockerhub_repo)))
75             api_response = self.batchv1.create_namespaced_job(
76                 body=body, namespace=self.namespace)
77             self.__logger.info("Job %s created", api_response.metadata.name)
78             self.__logger.debug("create_namespaced_job: %s", api_response)
79         watch_job = watch.Watch()
80         for event in watch_job.stream(
81                 func=self.batchv1.list_namespaced_job,
82                 namespace=self.namespace, timeout_seconds=self.watch_timeout):
83             if (event["object"].metadata.name == self.job_name and
84                     event["object"].status.succeeded == 1):
85                 self.__logger.info(
86                     "%s started in %0.2f sec", event['object'].metadata.name,
87                     time.time()-self.start_time)
88                 watch_job.stop()
89         pods = self.corev1.list_namespaced_pod(
90             self.namespace, label_selector=f'job-name={self.job_name}')
91         self.pod = pods.items[0].metadata.name
92         self.pod_log = self.corev1.read_namespaced_pod_log(
93             name=self.pod, namespace=self.namespace)
94         self.__logger.info("\n\n%s", self.pod_log)
95
96     def run(self, **kwargs):
97         assert self.job_name
98         self.start_time = time.time()
99         try:
100             self.deploy_job()
101         except client.rest.ApiException:
102             self.__logger.exception("Cannot run %s", self.job_name)
103         self.stop_time = time.time()
104
105     def clean(self):
106         if self.pod:
107             try:
108                 api_response = self.corev1.delete_namespaced_pod(
109                     name=self.pod, namespace=self.namespace)
110                 self.__logger.debug("delete_namespaced_pod: %s", api_response)
111             except client.rest.ApiException:
112                 pass
113         if self.job_name:
114             try:
115                 api_response = self.batchv1.delete_namespaced_job(
116                     name=self.job_name, namespace=self.namespace)
117                 self.__logger.debug(
118                     "delete_namespaced_deployment: %s", api_response)
119             except client.rest.ApiException:
120                 pass
121         if self.namespace:
122             try:
123                 api_response = self.corev1.delete_namespace(self.namespace)
124                 self.__logger.debug("delete_namespace: %s", self.namespace)
125             except client.rest.ApiException:
126                 pass
127
128
129 class KubeHunter(SecurityTesting):
130     """kube-hunter hunts for security weaknesses in Kubernetes clusters.
131
132     See https://github.com/aquasecurity/kube-hunter for more details
133     """
134
135     __logger = logging.getLogger(__name__)
136
137     def __init__(self, **kwargs):
138         super().__init__(**kwargs)
139         self.job_name = "kube-hunter"
140         self.ns_generate_name = "kube-hunter-"
141
142     def process_results(self, **kwargs):
143         """Process kube-hunter details"""
144         self.details = json.loads(self.pod_log.splitlines()[-1])
145         if self.details["vulnerabilities"]:
146             self.result = 100
147             msg = prettytable.PrettyTable(
148                 header_style='upper', padding_width=5,
149                 field_names=['category', 'vulnerability', 'severity'])
150             severity = kwargs.get("severity", "none")
151             if severity == "low":
152                 allowed_severity = []
153             elif severity == "medium":
154                 allowed_severity = ["low"]
155             elif severity == "high":
156                 allowed_severity = ["low", "medium"]
157             else:
158                 self.__logger.warning(
159                     "Just printing all vulnerabilities as "
160                     "no severity criteria given")
161                 allowed_severity = ["low", "medium", "high"]
162             for vulnerability in self.details["vulnerabilities"]:
163                 if vulnerability["severity"] not in allowed_severity:
164                     self.result = 0
165                 msg.add_row(
166                     [vulnerability["category"], vulnerability["vulnerability"],
167                      vulnerability["severity"]])
168             self.__logger.warning("\n\n%s\n", msg.get_string())
169         if self.details["hunter_statistics"]:
170             msg = prettytable.PrettyTable(
171                 header_style='upper', padding_width=5,
172                 field_names=['name', 'description', 'vulnerabilities'])
173             for statistics in self.details["hunter_statistics"]:
174                 msg.add_row(
175                     [statistics["name"],
176                      textwrap.fill(statistics["description"], width=50),
177                      statistics["vulnerabilities"]])
178             self.__logger.info("\n\n%s\n", msg.get_string())
179
180     def run(self, **kwargs):
181         super().run(**kwargs)
182         try:
183             self.process_results(**kwargs)
184         except Exception:  # pylint: disable=broad-except
185             self.__logger.exception("Cannot process results")
186             self.result = 0
187
188
189 class KubeBench(SecurityTesting):
190     """kube-bench checks whether Kubernetes is deployed securelyself.
191
192     It runs the checks documented in the CIS Kubernetes Benchmark.
193
194     See https://github.com/aquasecurity/kube-bench for more details
195     """
196
197     __logger = logging.getLogger(__name__)
198
199     def __init__(self, **kwargs):
200         super().__init__(**kwargs)
201         self.job_name = "kube-bench"
202         self.ns_generate_name = "kube-bench-"
203
204     def run(self, **kwargs):
205         self.job_name = f'kube-bench-{kwargs.get("target", "node")}'
206         super().run(**kwargs)
207         self.details["report"] = ast.literal_eval(self.pod_log)
208         msg = prettytable.PrettyTable(
209             header_style='upper', padding_width=5,
210             field_names=['node_type', 'version', 'test_desc', 'pass',
211                          'fail', 'warn'])
212         for details in self.details["report"]["Controls"]:
213             for test in details['tests']:
214                 msg.add_row(
215                     [details['node_type'], details['version'], test['desc'],
216                      test['pass'], test['fail'], test['warn']])
217                 for result in test["results"]:
218                     if result['scored'] and result['status'] == 'FAIL':
219                         self.__logger.error(
220                             "%s\n%s", result['test_desc'],
221                             result['remediation'])
222         self.__logger.warning("Targets:\n\n%s\n", msg.get_string())
223         self.result = 100