Override the right log files
[functest-kubernetes.git] / functest_kubernetes / security / security.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2020 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """
11 Define the parent for Kubernetes testing.
12 """
13
14 from __future__ import division
15
16 import logging
17 import time
18 import yaml
19
20 from kubernetes import client
21 from kubernetes import config
22 from kubernetes import watch
23 import pkg_resources
24 from xtesting.core import testcase
25
26
27 class SecurityTesting(testcase.TestCase):
28     # pylint: disable=too-many-instance-attributes
29     """Run Security job"""
30     namespace = 'default'
31     watch_timeout = 1200
32
33     __logger = logging.getLogger(__name__)
34
35     def __init__(self, **kwargs):
36         super(SecurityTesting, self).__init__(**kwargs)
37         config.load_kube_config()
38         self.corev1 = client.CoreV1Api()
39         self.batchv1 = client.BatchV1Api()
40         self.pod = None
41         self.job_name = None
42         self.output_log_name = 'functest-kubernetes.log'
43         self.output_debug_log_name = 'functest-kubernetes.debug.log'
44
45     def deploy_job(self):
46         """Run Security job
47
48         It runs a single security job and then simply prints its output asis.
49         """
50
51         assert self.job_name
52         # pylint: disable=bad-continuation
53         with open(pkg_resources.resource_filename(
54                 "functest_kubernetes",
55                 "security/{}.yaml".format(self.job_name))) as yfile:
56             body = yaml.safe_load(yfile)
57             api_response = self.batchv1.create_namespaced_job(
58                 body=body, namespace="default")
59             self.__logger.info("Job %s created", api_response.metadata.name)
60             self.__logger.debug("create_namespaced_job: %s", api_response)
61         watch_job = watch.Watch()
62         for event in watch_job.stream(
63                 func=self.batchv1.list_namespaced_job,
64                 namespace=self.namespace, timeout_seconds=self.watch_timeout):
65             if (event["object"].metadata.name == self.job_name and
66                     event["object"].status.succeeded == 1):
67                 self.__logger.info(
68                     "%s started in %0.2f sec", event['object'].metadata.name,
69                     time.time()-self.start_time)
70                 watch_job.stop()
71         pods = self.corev1.list_namespaced_pod(
72             self.namespace, label_selector='job-name={}'.format(self.job_name))
73         self.pod = pods.items[0].metadata.name
74         api_response = self.corev1.read_namespaced_pod_log(
75             name=self.pod, namespace=self.namespace)
76         self.__logger.warning("\n\n%s", api_response)
77         self.result = 100
78
79     def run(self, **kwargs):
80         assert self.job_name
81         self.start_time = time.time()
82         try:
83             self.deploy_job()
84         except client.rest.ApiException:
85             self.__logger.exception("Cannot run %s", self.job_name)
86         self.stop_time = time.time()
87
88     def clean(self):
89         try:
90             api_response = self.corev1.delete_namespaced_pod(
91                 name=self.pod, namespace=self.namespace)
92             self.__logger.debug("delete_namespaced_pod: %s", api_response)
93         except client.rest.ApiException:
94             pass
95         try:
96             api_response = self.batchv1.delete_namespaced_job(
97                 name=self.job_name, namespace=self.namespace)
98             self.__logger.debug(
99                 "delete_namespaced_deployment: %s", api_response)
100         except client.rest.ApiException:
101             pass
102
103
104 class KubeHunter(SecurityTesting):
105     """kube-hunter hunts for security weaknesses in Kubernetes clusters.
106
107     See https://github.com/aquasecurity/kube-hunter for more details
108     """
109
110     def __init__(self, **kwargs):
111         super(KubeHunter, self).__init__(**kwargs)
112         self.job_name = "kube-hunter"
113
114
115 class KubeBench(SecurityTesting):
116     """kube-bench checks whether Kubernetes is deployed securelyself.
117
118     It runs the checks documented in the CIS Kubernetes Benchmark.
119
120     See https://github.com/aquasecurity/kube-bench for more details
121     """
122
123     def __init__(self, **kwargs):
124         super(KubeBench, self).__init__(**kwargs)
125         self.job_name = "kube-bench"