1 ##############################################################################
2 # Copyright (c) 2017 Huawei Technologies Co.,Ltd.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 from oslo_serialization import jsonutils
14 from yardstick.common import constants
15 from yardstick.common import exceptions
16 from yardstick.common import kubernetes_utils as k8s_utils
17 from yardstick.common import utils
20 class ContainerObject(object):
22 SSH_MOUNT_PATH = '/tmp/.ssh/'
23 IMAGE_DEFAULT = 'openretriever/yardstick'
24 COMMAND_DEFAULT = '/bin/bash'
25 RESOURCES = ('requests', 'limits')
26 PORT_OPTIONS = ('containerPort', 'hostIP', 'hostPort', 'name', 'protocol')
28 def __init__(self, name, ssh_key, **kwargs):
30 self._ssh_key = ssh_key
31 self._image = kwargs.get('image', self.IMAGE_DEFAULT)
32 self._command = [kwargs.get('command', self.COMMAND_DEFAULT)]
33 self._args = kwargs.get('args', [])
34 self._volume_mounts = kwargs.get('volumeMounts', [])
35 self._security_context = kwargs.get('securityContext')
36 self._env = kwargs.get('env', [])
37 self._resources = kwargs.get('resources', {})
38 self._ports = kwargs.get('ports', [])
40 def _create_volume_mounts(self):
41 """Return all "volumeMounts" items per container"""
42 volume_mounts_items = [self._create_volume_mounts_item(vol)
43 for vol in self._volume_mounts]
44 ssh_vol = {'name': self._ssh_key,
45 'mountPath': self.SSH_MOUNT_PATH}
46 volume_mounts_items.append(self._create_volume_mounts_item(ssh_vol))
47 return volume_mounts_items
50 def _create_volume_mounts_item(volume_mount):
51 """Create a "volumeMounts" item"""
52 return {'name': volume_mount['name'],
53 'mountPath': volume_mount['mountPath'],
54 'readOnly': volume_mount.get('readOnly', False)}
56 def get_container_item(self):
57 """Create a "container" item"""
58 container_name = '{}-container'.format(self._name)
59 container = {'args': self._args,
60 'command': self._command,
62 'name': container_name,
63 'volumeMounts': self._create_volume_mounts()}
64 if self._security_context:
65 container['securityContext'] = self._security_context
69 container['env'].append({'name': env['name'],
70 'value': env['value']})
72 container['ports'] = []
73 for port in self._ports:
74 if 'containerPort' not in port.keys():
75 raise exceptions.KubernetesContainerPortNotDefined(
77 _port = {port_option: value for port_option, value
78 in port.items() if port_option in self.PORT_OPTIONS}
79 container['ports'].append(_port)
81 container['resources'] = {}
82 for res in (res for res in self._resources if
83 res in self.RESOURCES):
84 container['resources'][res] = self._resources[res]
88 class ReplicationControllerObject(object):
90 SSHKEY_DEFAULT = 'yardstick_key'
91 RESTART_POLICY = ('Always', 'OnFailure', 'Never')
92 TOLERATIONS_KEYS = ('key', 'value', 'effect', 'operator')
94 def __init__(self, name, **kwargs):
95 super(ReplicationControllerObject, self).__init__()
96 parameters = copy.deepcopy(kwargs)
98 self.node_selector = parameters.pop('nodeSelector', {})
99 self.ssh_key = parameters.pop('ssh_key', self.SSHKEY_DEFAULT)
100 self._volumes = parameters.pop('volumes', [])
101 self._security_context = parameters.pop('securityContext', None)
102 self._networks = parameters.pop('networks', [])
103 self._tolerations = parameters.pop('tolerations', [])
104 self._restart_policy = parameters.pop('restartPolicy', 'Always')
105 if self._restart_policy not in self.RESTART_POLICY:
106 raise exceptions.KubernetesWrongRestartPolicy(
107 rpolicy=self._restart_policy)
109 containers = parameters.pop('containers', None)
112 ContainerObject(self.name, self.ssh_key, **container)
113 for container in containers]
116 ContainerObject(self.name, self.ssh_key, **parameters)]
120 "kind": "ReplicationController",
128 "labels": {"app": name}
134 "restartPolicy": self._restart_policy,
141 self._change_value_according_name(name)
142 self._add_containers()
143 self._add_node_selector()
145 self._add_security_context()
147 self._add_tolerations()
149 def get_template(self):
152 def _change_value_according_name(self, name):
153 utils.set_dict_value(self.template, 'metadata.name', name)
155 utils.set_dict_value(self.template,
156 'spec.template.metadata.labels.app',
159 def _add_containers(self):
160 containers = [container.get_container_item()
161 for container in self._containers]
162 utils.set_dict_value(self.template,
163 'spec.template.spec.containers',
166 def _add_node_selector(self):
167 utils.set_dict_value(self.template,
168 'spec.template.spec.nodeSelector',
171 def _add_volumes(self):
172 """Add "volume" items to container specs, including the SSH one"""
173 volume_items = [self._create_volume_item(vol) for vol in self._volumes]
174 volume_items.append(self._create_ssh_key_volume())
175 utils.set_dict_value(self.template,
176 'spec.template.spec.volumes',
179 def _create_ssh_key_volume(self):
180 """Create a "volume" item of type "configMap" for the SSH key"""
181 return {'name': self.ssh_key,
182 'configMap': {'name': self.ssh_key}}
185 def _create_volume_item(volume):
186 """Create a "volume" item"""
187 volume = copy.deepcopy(volume)
188 name = volume.pop('name')
189 for key in (k for k in volume if k in k8s_utils.get_volume_types()):
191 type_data = volume[key]
194 raise exceptions.KubernetesTemplateInvalidVolumeType(volume=volume)
196 return {'name': name,
197 type_name: type_data}
199 def _add_security_context(self):
200 if self._security_context:
201 utils.set_dict_value(self.template,
202 'spec.template.spec.securityContext',
203 self._security_context)
205 def _add_networks(self):
207 for net in self._networks:
208 networks.append({'name': net})
213 annotations = {'networks': jsonutils.dumps(networks)}
214 utils.set_dict_value(self.template,
215 'spec.template.metadata.annotations',
218 def _add_tolerations(self):
220 for tol in self._tolerations:
221 tolerations.append({k: tol[k] for k in tol
222 if k in self.TOLERATIONS_KEYS})
224 tolerations = ([{'operator': 'Exists'}] if not tolerations
226 utils.set_dict_value(self.template,
227 'spec.template.spec.tolerations',
231 class ServiceNodePortObject(object):
233 def __init__(self, name, **kwargs):
234 """Service kind "NodePort" object
236 :param name: (string) name of the Service
237 :param kwargs: (dict) node_ports -> (list) port, name, targetPort,
240 self._name = '{}-service'.format(name)
242 'metadata': {'name': '{}-service'.format(name)},
246 'selector': {'app': name}
250 self._add_port(22, protocol='TCP')
251 node_ports = copy.deepcopy(kwargs.get('node_ports', []))
252 for port in node_ports:
253 port_number = port.pop('port')
254 self._add_port(port_number, **port)
256 def _add_port(self, port, protocol=None, name=None, targetPort=None,
258 _port = {'port': port}
260 _port['protocol'] = protocol
264 _port['targetPort'] = targetPort
266 _port['nodePort'] = nodePort
267 self.template['spec']['ports'].append(_port)
270 k8s_utils.create_service(self.template)
273 k8s_utils.delete_service(self._name)
276 class CustomResourceDefinitionObject(object):
278 MANDATORY_PARAMETERS = {'name'}
280 def __init__(self, ctx_name, **kwargs):
281 if not self.MANDATORY_PARAMETERS.issubset(kwargs):
282 missing_parameters = ', '.join(
283 str(param) for param in
284 (self.MANDATORY_PARAMETERS - set(kwargs)))
285 raise exceptions.KubernetesCRDObjectDefinitionError(
286 missing_parameters=missing_parameters)
288 singular = kwargs['name']
289 plural = singular + 's'
290 kind = singular.title()
291 version = kwargs.get('version', 'v1')
292 scope = kwargs.get('scope', constants.SCOPE_NAMESPACED)
293 group = ctx_name + '.com'
294 self._name = metadata_name = plural + '.' + group
298 'name': metadata_name
304 'names': {'plural': plural,
305 'singular': singular,
311 k8s_utils.create_custom_resource_definition(self._template)
314 k8s_utils.delete_custom_resource_definition(self._name)
317 class NetworkObject(object):
319 MANDATORY_PARAMETERS = {'plugin', 'args'}
322 def __init__(self, name, **kwargs):
323 if not self.MANDATORY_PARAMETERS.issubset(kwargs):
324 missing_parameters = ', '.join(
325 str(param) for param in
326 (self.MANDATORY_PARAMETERS - set(kwargs)))
327 raise exceptions.KubernetesNetworkObjectDefinitionError(
328 missing_parameters=missing_parameters)
331 self._plugin = kwargs['plugin']
332 self._args = kwargs['args']
334 self._template = None
344 crd = k8s_utils.get_custom_resource_definition(self.KIND)
346 raise exceptions.KubernetesNetworkObjectKindMissing()
354 self._group = self.crd.spec.group
361 self._version = self.crd.spec.version
368 self._plural = self.crd.spec.names.plural
375 self._scope = self.crd.spec.scope
380 """"Network" object template
382 This template can be rendered only once the CRD "Network" is created in
383 Kubernetes. This function call must be delayed until the creation of
387 return self._template
390 'apiVersion': '{}/{}'.format(self.group, self.version),
395 'plugin': self._plugin,
398 return self._template
401 k8s_utils.create_network(self.scope, self.group, self.version,
402 self.plural, self.template)
405 k8s_utils.delete_network(self.scope, self.group, self.version,
406 self.plural, self._name)
409 class KubernetesTemplate(object):
411 def __init__(self, name, context_cfg):
412 """KubernetesTemplate object initialization
414 :param name: (str) name of the Kubernetes context
415 :param context_cfg: (dict) context definition
417 context_cfg = copy.deepcopy(context_cfg)
418 servers_cfg = context_cfg.pop('servers', {})
419 crd_cfg = context_cfg.pop('custom_resources', [])
420 networks_cfg = context_cfg.pop('networks', {})
422 self.ssh_key = '{}-key'.format(name)
424 self.rcs = {self._get_rc_name(rc): cfg
425 for rc, cfg in servers_cfg.items()}
426 self.k8s_objs = [ReplicationControllerObject(
427 rc, ssh_key=self.ssh_key, **cfg) for rc, cfg in self.rcs.items()]
428 self.service_objs = [ServiceNodePortObject(rc, **cfg)
429 for rc, cfg in self.rcs.items()]
430 self.crd = [CustomResourceDefinitionObject(self.name, **crd)
432 self.network_objs = [NetworkObject(net_name, **net_data)
433 for net_name, net_data in networks_cfg.items()]
436 def _get_rc_name(self, rc_name):
437 return '{}-{}'.format(rc_name, self.name)
439 def get_rc_pods(self):
440 resp = k8s_utils.get_pod_list()
441 self.pods = [p.metadata.name for p in resp.items for s in self.rcs
442 if p.metadata.name.startswith(s)]