Override the right log files
[functest-kubernetes.git] / functest_kubernetes / security / security.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2020 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """
11 Define the parent for Kubernetes testing.
12 """
13
14 from __future__ import division
15
16 import logging
17 import time
18 import yaml
19
20 from kubernetes import client
21 from kubernetes import config
22 from kubernetes import watch
23 import pkg_resources
24 from xtesting.core import testcase
25
26
27 class SecurityTesting(testcase.TestCase):
28     # pylint: disable=too-many-instance-attributes
29     """Run Security job"""
30     namespace = 'default'
31     watch_timeout = 1200
32
33     __logger = logging.getLogger(__name__)
34
35     def __init__(self, **kwargs):
36         super(SecurityTesting, self).__init__(**kwargs)
37         config.load_kube_config()
38         self.corev1 = client.CoreV1Api()
39         self.batchv1 = client.BatchV1Api()
40         self.pod = None
41         self.job_name = None
42         self.output_log_name = 'functest-kubernetes.log'
43         self.output_debug_log_name = 'functest-kubernetes.debug.log'
44
45     def deploy_job(self):
46         """Run Security job
47
48         It runs a single security job and then simply prints its output asis.
49         """
50
51         assert self.job_name
52         with open(pkg_resources.resource_filename(
53                 "functest_kubernetes",
54                 "security/{}.yaml".format(self.job_name))) as yfile:
55             body = yaml.safe_load(yfile)
56             api_response = self.batchv1.create_namespaced_job(
57                 body=body, namespace="default")
58             self.__logger.info("Job %s created", api_response.metadata.name)
59             self.__logger.debug("create_namespaced_job: %s", api_response)
60         watch_job = watch.Watch()
61         for event in watch_job.stream(
62                 func=self.batchv1.list_namespaced_job,
63                 namespace=self.namespace, timeout_seconds=self.watch_timeout):
64             if (event["object"].metadata.name == self.job_name and
65                     event["object"].status.succeeded == 1):
66                 self.__logger.info(
67                     "%s started in %0.2f sec", event['object'].metadata.name,
68                     time.time()-self.start_time)
69                 watch_job.stop()
70         pods = self.corev1.list_namespaced_pod(
71             self.namespace, label_selector='job-name={}'.format(self.job_name))
72         self.pod = pods.items[0].metadata.name
73         api_response = self.corev1.read_namespaced_pod_log(
74             name=self.pod, namespace=self.namespace)
75         self.__logger.warning("\n\n%s", api_response)
76         self.result = 100
77
78     def run(self, **kwargs):
79         assert self.job_name
80         self.start_time = time.time()
81         try:
82             self.deploy_job()
83         except client.rest.ApiException:
84             self.__logger.exception("Cannot run %s", self.job_name)
85         self.stop_time = time.time()
86
87     def clean(self):
88         try:
89             api_response = self.corev1.delete_namespaced_pod(
90                 name=self.pod, namespace=self.namespace)
91             self.__logger.debug("delete_namespaced_pod: %s", api_response)
92         except client.rest.ApiException:
93             pass
94         try:
95             api_response = self.batchv1.delete_namespaced_job(
96                 name=self.job_name, namespace=self.namespace)
97             self.__logger.debug(
98                 "delete_namespaced_deployment: %s", api_response)
99         except client.rest.ApiException:
100             pass
101
102
103 class KubeHunter(SecurityTesting):
104     """kube-hunter hunts for security weaknesses in Kubernetes clusters.
105
106     See https://github.com/aquasecurity/kube-hunter for more details
107     """
108
109     def __init__(self, **kwargs):
110         super(KubeHunter, self).__init__(**kwargs)
111         self.job_name = "kube-hunter"
112
113
114 class KubeBench(SecurityTesting):
115     """kube-bench checks whether Kubernetes is deployed securelyself.
116
117     It runs the checks documented in the CIS Kubernetes Benchmark.
118
119     See https://github.com/aquasecurity/kube-bench for more details
120     """
121
122     def __init__(self, **kwargs):
123         super(KubeBench, self).__init__(**kwargs)
124         self.job_name = "kube-bench"