Fix former pep8 issues
[functest-kubernetes.git] / functest_kubernetes / security / security.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2020 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """
11 Define the parent for Kubernetes testing.
12 """
13
14 from __future__ import division
15
16 import logging
17 import time
18 import yaml
19
20 from kubernetes import client
21 from kubernetes import config
22 from kubernetes import watch
23 import pkg_resources
24 from xtesting.core import testcase
25
26
27 class SecurityTesting(testcase.TestCase):
28     """Run Security job"""
29     namespace = 'default'
30     watch_timeout = 1200
31
32     __logger = logging.getLogger(__name__)
33
34     def __init__(self, **kwargs):
35         super(SecurityTesting, self).__init__(**kwargs)
36         config.load_kube_config()
37         self.corev1 = client.CoreV1Api()
38         self.batchv1 = client.BatchV1Api()
39         self.pod = None
40         self.job_name = None
41
42     def deploy_job(self):
43         """Run Security job
44
45         It runs a single security job and then simply prints its output asis.
46         """
47
48         assert self.job_name
49         # pylint: disable=bad-continuation
50         with open(pkg_resources.resource_filename(
51                 "functest_kubernetes",
52                 "security/{}.yaml".format(self.job_name))) as yfile:
53             body = yaml.safe_load(yfile)
54             api_response = self.batchv1.create_namespaced_job(
55                 body=body, namespace="default")
56             self.__logger.info("Job %s created", api_response.metadata.name)
57             self.__logger.debug("create_namespaced_job: %s", api_response)
58         watch_job = watch.Watch()
59         for event in watch_job.stream(
60                 func=self.batchv1.list_namespaced_job,
61                 namespace=self.namespace, timeout_seconds=self.watch_timeout):
62             if (event["object"].metadata.name == self.job_name and
63                     event["object"].status.succeeded == 1):
64                 self.__logger.info(
65                     "%s started in %0.2f sec", event['object'].metadata.name,
66                     time.time()-self.start_time)
67                 watch_job.stop()
68         pods = self.corev1.list_namespaced_pod(
69             self.namespace, label_selector='job-name={}'.format(self.job_name))
70         self.pod = pods.items[0].metadata.name
71         api_response = self.corev1.read_namespaced_pod_log(
72             name=self.pod, namespace=self.namespace)
73         self.__logger.warning("\n\n%s", api_response)
74         self.result = 100
75
76     def run(self, **kwargs):
77         assert self.job_name
78         self.start_time = time.time()
79         try:
80             self.deploy_job()
81         except client.rest.ApiException:
82             self.__logger.exception("Cannot run %s", self.job_name)
83         self.stop_time = time.time()
84
85     def clean(self):
86         try:
87             api_response = self.corev1.delete_namespaced_pod(
88                 name=self.pod, namespace=self.namespace)
89             self.__logger.debug("delete_namespaced_pod: %s", api_response)
90         except client.rest.ApiException:
91             pass
92         try:
93             api_response = self.batchv1.delete_namespaced_job(
94                 name=self.job_name, namespace=self.namespace)
95             self.__logger.debug(
96                 "delete_namespaced_deployment: %s", api_response)
97         except client.rest.ApiException:
98             pass
99
100
101 class KubeHunter(SecurityTesting):
102     """kube-hunter hunts for security weaknesses in Kubernetes clusters.
103
104     See https://github.com/aquasecurity/kube-hunter for more details
105     """
106
107     def __init__(self, **kwargs):
108         super(KubeHunter, self).__init__(**kwargs)
109         self.job_name = "kube-hunter"
110
111
112 class KubeBench(SecurityTesting):
113     """kube-bench checks whether Kubernetes is deployed securelyself.
114
115     It runs the checks documented in the CIS Kubernetes Benchmark.
116
117     See https://github.com/aquasecurity/kube-bench for more details
118     """
119
120     def __init__(self, **kwargs):
121         super(KubeBench, self).__init__(**kwargs)
122         self.job_name = "kube-bench"