Make K8s security tests namespace aware
[functest-kubernetes.git] / functest_kubernetes / security / security.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2020 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """
11 Define the parent for Kubernetes testing.
12 """
13
14 from __future__ import division
15
16 import logging
17 import time
18 import yaml
19
20 from kubernetes import client
21 from kubernetes import config
22 from kubernetes import watch
23 import pkg_resources
24 from xtesting.core import testcase
25
26
27 class SecurityTesting(testcase.TestCase):
28     # pylint: disable=too-many-instance-attributes
29     """Run Security job"""
30     watch_timeout = 1200
31
32     __logger = logging.getLogger(__name__)
33
34     def __init__(self, **kwargs):
35         super(SecurityTesting, self).__init__(**kwargs)
36         config.load_kube_config()
37         self.corev1 = client.CoreV1Api()
38         self.batchv1 = client.BatchV1Api()
39         self.pod = None
40         self.job_name = None
41         self.output_log_name = 'functest-kubernetes.log'
42         self.output_debug_log_name = 'functest-kubernetes.debug.log'
43         self.namespace = ""
44
45     def deploy_job(self):
46         """Run Security job
47
48         It runs a single security job and then simply prints its output asis.
49         """
50
51         assert self.job_name
52         api_response = self.corev1.create_namespace(
53             client.V1Namespace(metadata=client.V1ObjectMeta(
54                 generate_name="ims-")))
55         self.namespace = api_response.metadata.name
56         self.__logger.debug("create_namespace: %s", api_response)
57         # pylint: disable=bad-continuation
58         with open(pkg_resources.resource_filename(
59                 "functest_kubernetes",
60                 "security/{}.yaml".format(self.job_name))) as yfile:
61             body = yaml.safe_load(yfile)
62             api_response = self.batchv1.create_namespaced_job(
63                 body=body, namespace=self.namespace)
64             self.__logger.info("Job %s created", api_response.metadata.name)
65             self.__logger.debug("create_namespaced_job: %s", api_response)
66         watch_job = watch.Watch()
67         for event in watch_job.stream(
68                 func=self.batchv1.list_namespaced_job,
69                 namespace=self.namespace, timeout_seconds=self.watch_timeout):
70             if (event["object"].metadata.name == self.job_name and
71                     event["object"].status.succeeded == 1):
72                 self.__logger.info(
73                     "%s started in %0.2f sec", event['object'].metadata.name,
74                     time.time()-self.start_time)
75                 watch_job.stop()
76         pods = self.corev1.list_namespaced_pod(
77             self.namespace, label_selector='job-name={}'.format(self.job_name))
78         self.pod = pods.items[0].metadata.name
79         api_response = self.corev1.read_namespaced_pod_log(
80             name=self.pod, namespace=self.namespace)
81         self.__logger.warning("\n\n%s", api_response)
82         self.result = 100
83
84     def run(self, **kwargs):
85         assert self.job_name
86         self.start_time = time.time()
87         try:
88             self.deploy_job()
89         except client.rest.ApiException:
90             self.__logger.exception("Cannot run %s", self.job_name)
91         self.stop_time = time.time()
92
93     def clean(self):
94         if self.pod:
95             try:
96                 api_response = self.corev1.delete_namespaced_pod(
97                     name=self.pod, namespace=self.namespace)
98                 self.__logger.debug("delete_namespaced_pod: %s", api_response)
99             except client.rest.ApiException:
100                 pass
101         if self.job_name:
102             try:
103                 api_response = self.batchv1.delete_namespaced_job(
104                     name=self.job_name, namespace=self.namespace)
105                 self.__logger.debug(
106                     "delete_namespaced_deployment: %s", api_response)
107             except client.rest.ApiException:
108                 pass
109         if self.namespace:
110             try:
111                 api_response = self.corev1.delete_namespace(self.namespace)
112                 self.__logger.debug("delete_namespace: %s", self.namespace)
113             except client.rest.ApiException:
114                 pass
115
116
117 class KubeHunter(SecurityTesting):
118     """kube-hunter hunts for security weaknesses in Kubernetes clusters.
119
120     See https://github.com/aquasecurity/kube-hunter for more details
121     """
122
123     def __init__(self, **kwargs):
124         super(KubeHunter, self).__init__(**kwargs)
125         self.job_name = "kube-hunter"
126
127
128 class KubeBench(SecurityTesting):
129     """kube-bench checks whether Kubernetes is deployed securelyself.
130
131     It runs the checks documented in the CIS Kubernetes Benchmark.
132
133     See https://github.com/aquasecurity/kube-bench for more details
134     """
135
136     def __init__(self, **kwargs):
137         super(KubeBench, self).__init__(**kwargs)
138         self.job_name = "kube-bench"