1 ##############################################################################
2 # Copyright (c) 2017 Huawei Technologies Co.,Ltd.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
11 from kubernetes import client
12 from kubernetes import config
13 from kubernetes.client.rest import ApiException
15 from yardstick.common import constants as consts
16 from yardstick.common import exceptions
19 LOG = logging.getLogger(__name__)
20 LOG.setLevel(logging.DEBUG)
23 def get_core_api(): # pragma: no cover
25 config.load_kube_config(config_file=consts.K8S_CONF_FILE)
27 raise exceptions.KubernetesConfigFileNotFound()
28 return client.CoreV1Api()
31 def get_extensions_v1beta_api():
33 config.load_kube_config(config_file=consts.K8S_CONF_FILE)
35 raise exceptions.KubernetesConfigFileNotFound()
36 return client.ApiextensionsV1beta1Api()
39 def get_custom_objects_api():
41 config.load_kube_config(config_file=consts.K8S_CONF_FILE)
43 raise exceptions.KubernetesConfigFileNotFound()
44 return client.CustomObjectsApi()
47 def get_node_list(**kwargs): # pragma: no cover
48 core_v1_api = get_core_api()
50 return core_v1_api.list_node(**kwargs)
52 LOG.exception('Get node list failed')
56 def create_service(template,
59 **kwargs): # pragma: no cover
60 # pylint: disable=unused-argument
61 core_v1_api = get_core_api()
62 metadata = client.V1ObjectMeta(**template.get('metadata', {}))
64 ports = [client.V1ServicePort(**port) for port in
65 template.get('spec', {}).get('ports', [])]
66 template['spec']['ports'] = ports
67 spec = client.V1ServiceSpec(**template.get('spec', {}))
69 service = client.V1Service(metadata=metadata, spec=spec)
72 core_v1_api.create_namespaced_service('default', service)
74 LOG.exception('Create Service failed')
78 def delete_service(name, namespace='default', skip_codes=None, **kwargs):
79 skip_codes = [] if not skip_codes else skip_codes
80 core_v1_api = get_core_api()
82 body = client.V1DeleteOptions()
83 core_v1_api.delete_namespaced_service(name, namespace, body, **kwargs)
84 except ApiException as e:
85 if e.status in skip_codes:
88 raise exceptions.KubernetesApiException(
89 action='delete', resource='Service')
92 def get_service_list(namespace='default', **kwargs):
93 core_v1_api = get_core_api()
95 return core_v1_api.list_namespaced_service(namespace, **kwargs)
97 LOG.exception('Get Service list failed')
101 def get_service_by_name(name): # pragma: no cover
102 service_list = get_service_list()
103 return next((s.spec for s in service_list.items if s.metadata.name == name), None)
106 def create_replication_controller(template,
109 **kwargs): # pragma: no cover
110 # pylint: disable=unused-argument
111 core_v1_api = get_core_api()
113 core_v1_api.create_namespaced_replication_controller(namespace,
117 LOG.exception('Create replication controller failed')
121 def delete_replication_controller(name,
124 **kwargs): # pragma: no cover
125 # pylint: disable=unused-argument
126 core_v1_api = get_core_api()
127 body = kwargs.get('body', client.V1DeleteOptions())
128 kwargs.pop('body', None)
130 core_v1_api.delete_namespaced_replication_controller(name,
135 LOG.exception('Delete replication controller failed')
143 **kwargs): # pragma: no cover
144 # pylint: disable=unused-argument
145 skip_codes = [] if not skip_codes else skip_codes
146 core_v1_api = get_core_api()
147 body = kwargs.get('body', client.V1DeleteOptions())
148 kwargs.pop('body', None)
150 core_v1_api.delete_namespaced_pod(name,
154 except ApiException as e:
155 if e.status in skip_codes:
158 raise exceptions.KubernetesApiException(
159 action='delete', resource='Pod')
164 **kwargs): # pragma: no cover
165 core_v1_api = get_core_api()
167 resp = core_v1_api.read_namespaced_pod(name, namespace, **kwargs)
169 LOG.exception('Read pod failed')
175 def read_pod_status(name, namespace='default', **kwargs): # pragma: no cover
176 # pylint: disable=unused-argument
177 return read_pod(name).status.phase
180 def create_config_map(name,
184 **kwargs): # pragma: no cover
185 # pylint: disable=unused-argument
186 core_v1_api = get_core_api()
187 metadata = client.V1ObjectMeta(name=name)
188 body = client.V1ConfigMap(data=data, metadata=metadata)
190 core_v1_api.create_namespaced_config_map(namespace, body, **kwargs)
192 LOG.exception('Create config map failed')
196 def delete_config_map(name,
199 **kwargs): # pragma: no cover
200 # pylint: disable=unused-argument
201 core_v1_api = get_core_api()
202 body = kwargs.get('body', client.V1DeleteOptions())
203 kwargs.pop('body', None)
205 core_v1_api.delete_namespaced_config_map(name,
210 LOG.exception('Delete config map failed')
214 def create_custom_resource_definition(body):
215 api = get_extensions_v1beta_api()
216 body_obj = client.V1beta1CustomResourceDefinition(
217 spec=body['spec'], metadata=body['metadata'])
219 api.create_custom_resource_definition(body_obj)
221 # NOTE(ralonsoh): bug in kubernetes-client/python 6.0.0
222 # https://github.com/kubernetes-client/python/issues/491
225 raise exceptions.KubernetesApiException(
226 action='create', resource='CustomResourceDefinition')
229 def delete_custom_resource_definition(name, skip_codes=None):
230 skip_codes = [] if not skip_codes else skip_codes
231 api = get_extensions_v1beta_api()
232 body_obj = client.V1DeleteOptions()
234 api.delete_custom_resource_definition(name, body_obj)
235 except ApiException as e:
236 if e.status in skip_codes:
239 raise exceptions.KubernetesApiException(
240 action='delete', resource='CustomResourceDefinition')
243 def get_custom_resource_definition(kind):
244 api = get_extensions_v1beta_api()
246 crd_list = api.list_custom_resource_definition()
247 for crd_obj in (crd_obj for crd_obj in crd_list.items
248 if crd_obj.spec.names.kind == kind):
252 raise exceptions.KubernetesApiException(
253 action='delete', resource='CustomResourceDefinition')
256 def create_network(scope, group, version, plural, body, namespace='default'):
257 api = get_custom_objects_api()
259 if scope == consts.SCOPE_CLUSTER:
260 api.create_cluster_custom_object(group, version, plural, body)
262 api.create_namespaced_custom_object(
263 group, version, namespace, plural, body)
265 raise exceptions.KubernetesApiException(
266 action='create', resource='Custom Object: Network')
269 def delete_network(scope, group, version, plural, name, namespace='default'):
270 api = get_custom_objects_api()
272 if scope == consts.SCOPE_CLUSTER:
273 api.delete_cluster_custom_object(group, version, plural, name, {})
275 api.delete_namespaced_custom_object(
276 group, version, namespace, plural, name, {})
278 raise exceptions.KubernetesApiException(
279 action='delete', resource='Custom Object: Network')
282 def get_pod_list(namespace='default'): # pragma: no cover
283 core_v1_api = get_core_api()
285 return core_v1_api.list_namespaced_pod(namespace=namespace)
287 LOG.exception('Get pod list failed')
291 def get_pod_by_name(name): # pragma: no cover
292 pod_list = get_pod_list()
293 return next((n for n in pod_list.items if n.metadata.name.startswith(name)), None)
296 def get_volume_types():
297 """Return the "volume" types supported by the current API"""
298 return [vtype for vtype in client.V1Volume.attribute_map.values()