1 ##############################################################################
2 # Copyright (c) 2017 Huawei Technologies Co.,Ltd.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
11 from kubernetes import client
12 from kubernetes import config
13 from kubernetes.client.rest import ApiException
15 from yardstick.common import constants as consts
17 LOG = logging.getLogger(__name__)
18 LOG.setLevel(logging.DEBUG)
21 def get_core_api(): # pragma: no cover
23 config.load_kube_config(config_file=consts.K8S_CONF_FILE)
25 LOG.exception('config file not found')
28 return client.CoreV1Api()
31 def create_replication_controller(template,
34 **kwargs): # pragma: no cover
36 core_v1_api = get_core_api()
38 core_v1_api.create_namespaced_replication_controller(namespace,
42 LOG.exception('Create replication controller failed')
46 def delete_replication_controller(name,
49 **kwargs): # pragma: no cover
51 core_v1_api = get_core_api()
52 body = kwargs.get('body', client.V1DeleteOptions())
53 kwargs.pop('body', None)
55 core_v1_api.delete_namespaced_replication_controller(name,
60 LOG.exception('Delete replication controller failed')
67 **kwargs): # pragma: no cover
69 core_v1_api = get_core_api()
70 body = kwargs.get('body', client.V1DeleteOptions())
71 kwargs.pop('body', None)
73 core_v1_api.delete_namespaced_pod(name,
78 LOG.exception('Delete pod failed')
84 **kwargs): # pragma: no cover
85 core_v1_api = get_core_api()
87 resp = core_v1_api.read_namespaced_pod(name, namespace, **kwargs)
89 LOG.exception('Read pod failed')
95 def read_pod_status(name, namespace='default', **kwargs): # pragma: no cover
96 return read_pod(name).status.phase
99 def create_config_map(name,
103 **kwargs): # pragma: no cover
104 core_v1_api = get_core_api()
105 metadata = client.V1ObjectMeta(name=name)
106 body = client.V1ConfigMap(data=data, metadata=metadata)
108 core_v1_api.create_namespaced_config_map(namespace, body, **kwargs)
110 LOG.exception('Create config map failed')
114 def delete_config_map(name,
117 **kwargs): # pragma: no cover
118 core_v1_api = get_core_api()
119 body = kwargs.get('body', client.V1DeleteOptions())
120 kwargs.pop('body', None)
122 core_v1_api.delete_namespaced_config_map(name,
127 LOG.exception('Delete config map failed')
131 def get_pod_list(namespace='default'): # pragma: no cover
132 core_v1_api = get_core_api()
134 return core_v1_api.list_namespaced_pod(namespace=namespace)
136 LOG.exception('Get pod list failed')