33e70f8677c38e5447f671ea25fab17941158a7d
[functest-kubernetes.git] / functest_kubernetes / security / security.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2020 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """
11 Define the parent for Kubernetes testing.
12 """
13
14 from __future__ import division
15
16 import logging
17 import time
18 import yaml
19
20 from kubernetes import client
21 from kubernetes import config
22 from kubernetes import watch
23 import pkg_resources
24 from xtesting.core import testcase
25
26
27 class SecurityTesting(testcase.TestCase):
28     """Run Security job"""
29     namespace = 'default'
30     watch_timeout = 1200
31
32     __logger = logging.getLogger(__name__)
33
34     def __init__(self, **kwargs):
35         super(SecurityTesting, self).__init__(**kwargs)
36         config.load_kube_config()
37         self.corev1 = client.CoreV1Api()
38         self.batchv1 = client.BatchV1Api()
39         self.pod = None
40         self.job_name = None
41
42     def deploy_job(self):
43         """Run Security job
44
45         It runs a single security job and then simply prints its output asis.
46         """
47
48         assert self.job_name
49         with open(pkg_resources.resource_filename(
50                 "functest_kubernetes",
51                 "security/{}.yaml".format(self.job_name))) as yfile:
52             body = yaml.safe_load(yfile)
53             api_response = self.batchv1.create_namespaced_job(
54                 body=body, namespace="default")
55             self.__logger.info("Job %s created", api_response.metadata.name)
56             self.__logger.debug("create_namespaced_job: %s", api_response)
57         watch_job = watch.Watch()
58         for event in watch_job.stream(
59                 func=self.batchv1.list_namespaced_job,
60                 namespace=self.namespace, timeout_seconds=self.watch_timeout):
61             if (event["object"].metadata.name == self.job_name and
62                     event["object"].status.succeeded == 1):
63                 self.__logger.info(
64                     "%s started in %0.2f sec", event['object'].metadata.name,
65                     time.time()-self.start_time)
66                 watch_job.stop()
67         pods = self.corev1.list_namespaced_pod(
68             self.namespace, label_selector='job-name={}'.format(self.job_name))
69         self.pod = pods.items[0].metadata.name
70         api_response = self.corev1.read_namespaced_pod_log(
71             name=self.pod, namespace=self.namespace)
72         self.__logger.warning("\n\n%s", api_response)
73         self.result = 100
74
75     def run(self, **kwargs):
76         assert self.job_name
77         self.start_time = time.time()
78         try:
79             self.deploy_job()
80         except client.rest.ApiException:
81             self.__logger.exception("Cannot run %s", self.job_name)
82         self.stop_time = time.time()
83
84     def clean(self):
85         try:
86             api_response = self.corev1.delete_namespaced_pod(
87                 name=self.pod, namespace=self.namespace)
88             self.__logger.debug("delete_namespaced_pod: %s", api_response)
89         except client.rest.ApiException:
90             pass
91         try:
92             api_response = self.batchv1.delete_namespaced_job(
93                 name=self.job_name, namespace=self.namespace)
94             self.__logger.debug(
95                 "delete_namespaced_deployment: %s", api_response)
96         except client.rest.ApiException:
97             pass
98
99
100 class KubeHunter(SecurityTesting):
101     """kube-hunter hunts for security weaknesses in Kubernetes clusters.
102
103     See https://github.com/aquasecurity/kube-hunter for more details
104     """
105
106     def __init__(self, **kwargs):
107         super(KubeHunter, self).__init__(**kwargs)
108         self.job_name = "kube-hunter"
109
110
111 class KubeBench(SecurityTesting):
112     """kube-bench checks whether Kubernetes is deployed securelyself.
113
114     It runs the checks documented in the CIS Kubernetes Benchmark.
115
116     See https://github.com/aquasecurity/kube-bench for more details
117     """
118
119     def __init__(self, **kwargs):
120         super(KubeBench, self).__init__(**kwargs)
121         self.job_name = "kube-bench"