Fix arm -> arm64
[functest-kubernetes.git] / functest_kubernetes / security / security.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2020 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """
11 Define the parent for Kubernetes testing.
12 """
13
14 from __future__ import division
15
16 import logging
17 import time
18 import yaml
19
20 from kubernetes import client
21 from kubernetes import config
22 from kubernetes import watch
23 import pkg_resources
24 from xtesting.core import testcase
25
26
27 class SecurityTesting(testcase.TestCase):
28     # pylint: disable=too-many-instance-attributes
29     """Run Security job"""
30     watch_timeout = 1200
31
32     __logger = logging.getLogger(__name__)
33
34     def __init__(self, **kwargs):
35         super(SecurityTesting, self).__init__(**kwargs)
36         config.load_kube_config()
37         self.corev1 = client.CoreV1Api()
38         self.batchv1 = client.BatchV1Api()
39         self.pod = None
40         self.job_name = None
41         self.output_log_name = 'functest-kubernetes.log'
42         self.output_debug_log_name = 'functest-kubernetes.debug.log'
43         self.namespace = ""
44
45     def deploy_job(self):
46         """Run Security job
47
48         It runs a single security job and then simply prints its output asis.
49         """
50
51         assert self.job_name
52         api_response = self.corev1.create_namespace(
53             client.V1Namespace(metadata=client.V1ObjectMeta(
54                 generate_name="ims-")))
55         self.namespace = api_response.metadata.name
56         self.__logger.debug("create_namespace: %s", api_response)
57         with open(pkg_resources.resource_filename(
58                 "functest_kubernetes",
59                 "security/{}.yaml".format(self.job_name))) as yfile:
60             body = yaml.safe_load(yfile)
61             api_response = self.batchv1.create_namespaced_job(
62                 body=body, namespace=self.namespace)
63             self.__logger.info("Job %s created", api_response.metadata.name)
64             self.__logger.debug("create_namespaced_job: %s", api_response)
65         watch_job = watch.Watch()
66         for event in watch_job.stream(
67                 func=self.batchv1.list_namespaced_job,
68                 namespace=self.namespace, timeout_seconds=self.watch_timeout):
69             if (event["object"].metadata.name == self.job_name and
70                     event["object"].status.succeeded == 1):
71                 self.__logger.info(
72                     "%s started in %0.2f sec", event['object'].metadata.name,
73                     time.time()-self.start_time)
74                 watch_job.stop()
75         pods = self.corev1.list_namespaced_pod(
76             self.namespace, label_selector='job-name={}'.format(self.job_name))
77         self.pod = pods.items[0].metadata.name
78         api_response = self.corev1.read_namespaced_pod_log(
79             name=self.pod, namespace=self.namespace)
80         self.__logger.warning("\n\n%s", api_response)
81         self.result = 100
82
83     def run(self, **kwargs):
84         assert self.job_name
85         self.start_time = time.time()
86         try:
87             self.deploy_job()
88         except client.rest.ApiException:
89             self.__logger.exception("Cannot run %s", self.job_name)
90         self.stop_time = time.time()
91
92     def clean(self):
93         if self.pod:
94             try:
95                 api_response = self.corev1.delete_namespaced_pod(
96                     name=self.pod, namespace=self.namespace)
97                 self.__logger.debug("delete_namespaced_pod: %s", api_response)
98             except client.rest.ApiException:
99                 pass
100         if self.job_name:
101             try:
102                 api_response = self.batchv1.delete_namespaced_job(
103                     name=self.job_name, namespace=self.namespace)
104                 self.__logger.debug(
105                     "delete_namespaced_deployment: %s", api_response)
106             except client.rest.ApiException:
107                 pass
108         if self.namespace:
109             try:
110                 api_response = self.corev1.delete_namespace(self.namespace)
111                 self.__logger.debug("delete_namespace: %s", self.namespace)
112             except client.rest.ApiException:
113                 pass
114
115
116 class KubeHunter(SecurityTesting):
117     """kube-hunter hunts for security weaknesses in Kubernetes clusters.
118
119     See https://github.com/aquasecurity/kube-hunter for more details
120     """
121
122     def __init__(self, **kwargs):
123         super(KubeHunter, self).__init__(**kwargs)
124         self.job_name = "kube-hunter"
125
126
127 class KubeBench(SecurityTesting):
128     """kube-bench checks whether Kubernetes is deployed securelyself.
129
130     It runs the checks documented in the CIS Kubernetes Benchmark.
131
132     See https://github.com/aquasecurity/kube-bench for more details
133     """
134
135     def __init__(self, **kwargs):
136         super(KubeBench, self).__init__(**kwargs)
137         self.job_name = "kube-bench"