1 ##############################################################################
2 # Copyright (c) 2017 Huawei Technologies Co.,Ltd.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 from oslo_serialization import jsonutils
14 from yardstick.common import constants
15 from yardstick.common import exceptions
16 from yardstick.common import kubernetes_utils as k8s_utils
17 from yardstick.common import utils
20 class ContainerObject(object):
22 SSH_MOUNT_PATH = '/tmp/.ssh/'
23 IMAGE_DEFAULT = 'openretriever/yardstick'
24 COMMAND_DEFAULT = '/bin/bash'
25 RESOURCES = ('requests', 'limits')
26 PORT_OPTIONS = ('containerPort', 'hostIP', 'hostPort', 'name', 'protocol')
28 def __init__(self, name, ssh_key, **kwargs):
30 self._ssh_key = ssh_key
31 self._image = kwargs.get('image', self.IMAGE_DEFAULT)
32 self._command = [kwargs.get('command', self.COMMAND_DEFAULT)]
33 self._args = kwargs.get('args', [])
34 self._volume_mounts = kwargs.get('volumeMounts', [])
35 self._security_context = kwargs.get('securityContext')
36 self._env = kwargs.get('env', [])
37 self._resources = kwargs.get('resources', {})
38 self._ports = kwargs.get('ports', [])
40 def _create_volume_mounts(self):
41 """Return all "volumeMounts" items per container"""
42 volume_mounts_items = [self._create_volume_mounts_item(vol)
43 for vol in self._volume_mounts]
44 ssh_vol = {'name': self._ssh_key,
45 'mountPath': self.SSH_MOUNT_PATH}
46 volume_mounts_items.append(self._create_volume_mounts_item(ssh_vol))
47 return volume_mounts_items
50 def _create_volume_mounts_item(volume_mount):
51 """Create a "volumeMounts" item"""
52 return {'name': volume_mount['name'],
53 'mountPath': volume_mount['mountPath'],
54 'readOnly': volume_mount.get('readOnly', False)}
56 def get_container_item(self):
57 """Create a "container" item"""
58 container_name = '{}-container'.format(self._name)
59 container = {'args': self._args,
60 'command': self._command,
62 'name': container_name,
63 'volumeMounts': self._create_volume_mounts()}
64 if self._security_context:
65 container['securityContext'] = self._security_context
69 container['env'].append({'name': env['name'],
70 'value': env['value']})
72 container['ports'] = []
73 for port in self._ports:
74 if 'containerPort' not in port.keys():
75 raise exceptions.KubernetesContainerPortNotDefined(
77 _port = {port_option: value for port_option, value
78 in port.items() if port_option in self.PORT_OPTIONS}
79 container['ports'].append(_port)
81 container['resources'] = {}
82 for res in (res for res in self._resources if
83 res in self.RESOURCES):
84 container['resources'][res] = self._resources[res]
88 class ReplicationControllerObject(object):
90 SSHKEY_DEFAULT = 'yardstick_key'
91 RESTART_POLICY = ('Always', 'OnFailure', 'Never')
92 TOLERATIONS_KEYS = ('key', 'value', 'effect', 'operator')
94 def __init__(self, name, **kwargs):
95 super(ReplicationControllerObject, self).__init__()
96 parameters = copy.deepcopy(kwargs)
98 self.node_selector = parameters.pop('nodeSelector', {})
99 self.ssh_key = parameters.pop('ssh_key', self.SSHKEY_DEFAULT)
100 self._volumes = parameters.pop('volumes', [])
101 self._security_context = parameters.pop('securityContext', None)
102 self._networks = parameters.pop('networks', [])
103 self._tolerations = parameters.pop('tolerations', [])
104 self._restart_policy = parameters.pop('restartPolicy', 'Always')
105 if self._restart_policy not in self.RESTART_POLICY:
106 raise exceptions.KubernetesWrongRestartPolicy(
107 rpolicy=self._restart_policy)
109 containers = parameters.pop('containers', None)
112 ContainerObject(self.name, self.ssh_key, **container)
113 for container in containers]
116 ContainerObject(self.name, self.ssh_key, **parameters)]
120 "kind": "ReplicationController",
128 "labels": {"app": name}
134 "restartPolicy": self._restart_policy,
141 self._change_value_according_name(name)
142 self._add_containers()
143 self._add_node_selector()
145 self._add_security_context()
147 self._add_tolerations()
151 return self._networks
153 def get_template(self):
156 def _change_value_according_name(self, name):
157 utils.set_dict_value(self.template, 'metadata.name', name)
159 utils.set_dict_value(self.template,
160 'spec.template.metadata.labels.app',
163 def _add_containers(self):
164 containers = [container.get_container_item()
165 for container in self._containers]
166 utils.set_dict_value(self.template,
167 'spec.template.spec.containers',
170 def _add_node_selector(self):
171 utils.set_dict_value(self.template,
172 'spec.template.spec.nodeSelector',
175 def _add_volumes(self):
176 """Add "volume" items to container specs, including the SSH one"""
177 volume_items = [self._create_volume_item(vol) for vol in self._volumes]
178 volume_items.append(self._create_ssh_key_volume())
179 utils.set_dict_value(self.template,
180 'spec.template.spec.volumes',
183 def _create_ssh_key_volume(self):
184 """Create a "volume" item of type "configMap" for the SSH key"""
185 return {'name': self.ssh_key,
186 'configMap': {'name': self.ssh_key}}
189 def _create_volume_item(volume):
190 """Create a "volume" item"""
191 volume = copy.deepcopy(volume)
192 name = volume.pop('name')
193 for key in (k for k in volume if k in k8s_utils.get_volume_types()):
195 type_data = volume[key]
198 raise exceptions.KubernetesTemplateInvalidVolumeType(volume=volume)
200 return {'name': name,
201 type_name: type_data}
203 def _add_security_context(self):
204 if self._security_context:
205 utils.set_dict_value(self.template,
206 'spec.template.spec.securityContext',
207 self._security_context)
209 def _add_networks(self):
211 for net in self._networks:
212 networks.append({'name': net})
217 annotations = {'networks': jsonutils.dumps(networks)}
218 utils.set_dict_value(self.template,
219 'spec.template.metadata.annotations',
222 def _add_tolerations(self):
224 for tol in self._tolerations:
225 tolerations.append({k: tol[k] for k in tol
226 if k in self.TOLERATIONS_KEYS})
228 tolerations = ([{'operator': 'Exists'}] if not tolerations
230 utils.set_dict_value(self.template,
231 'spec.template.spec.tolerations',
235 class ServiceNodePortObject(object):
237 def __init__(self, name, **kwargs):
238 """Service kind "NodePort" object
240 :param name: (string) name of the Service
241 :param kwargs: (dict) node_ports -> (list) port, name, targetPort,
244 self._name = '{}-service'.format(name)
246 'metadata': {'name': '{}-service'.format(name)},
250 'selector': {'app': name}
254 self._add_port(22, protocol='TCP')
255 node_ports = copy.deepcopy(kwargs.get('node_ports', []))
256 for port in node_ports:
257 port_number = port.pop('port')
258 self._add_port(port_number, **port)
260 def _add_port(self, port, protocol=None, name=None, targetPort=None,
262 _port = {'port': port}
264 _port['protocol'] = protocol
268 _port['targetPort'] = targetPort
270 _port['nodePort'] = nodePort
271 self.template['spec']['ports'].append(_port)
274 k8s_utils.create_service(self.template)
277 k8s_utils.delete_service(self._name)
280 class CustomResourceDefinitionObject(object):
282 MANDATORY_PARAMETERS = {'name'}
284 def __init__(self, ctx_name, **kwargs):
285 if not self.MANDATORY_PARAMETERS.issubset(kwargs):
286 missing_parameters = ', '.join(
287 str(param) for param in
288 (self.MANDATORY_PARAMETERS - set(kwargs)))
289 raise exceptions.KubernetesCRDObjectDefinitionError(
290 missing_parameters=missing_parameters)
292 singular = kwargs['name']
293 plural = singular + 's'
294 kind = singular.title()
295 version = kwargs.get('version', 'v1')
296 scope = kwargs.get('scope', constants.SCOPE_NAMESPACED)
297 group = ctx_name + '.com'
298 self._name = metadata_name = plural + '.' + group
302 'name': metadata_name
308 'names': {'plural': plural,
309 'singular': singular,
315 k8s_utils.create_custom_resource_definition(self._template)
318 k8s_utils.delete_custom_resource_definition(self._name, skip_codes=[404])
321 class NetworkObject(object):
323 MANDATORY_PARAMETERS = {'plugin', 'args'}
326 def __init__(self, name, **kwargs):
327 if not self.MANDATORY_PARAMETERS.issubset(kwargs):
328 missing_parameters = ', '.join(
329 str(param) for param in
330 (self.MANDATORY_PARAMETERS - set(kwargs)))
331 raise exceptions.KubernetesNetworkObjectDefinitionError(
332 missing_parameters=missing_parameters)
335 self._plugin = kwargs['plugin']
336 self._args = kwargs['args']
338 self._template = None
348 crd = k8s_utils.get_custom_resource_definition(self.KIND)
350 raise exceptions.KubernetesNetworkObjectKindMissing()
358 self._group = self.crd.spec.group
365 self._version = self.crd.spec.version
372 self._plural = self.crd.spec.names.plural
379 self._scope = self.crd.spec.scope
384 """"Network" object template
386 This template can be rendered only once the CRD "Network" is created in
387 Kubernetes. This function call must be delayed until the creation of
391 return self._template
394 'apiVersion': '{}/{}'.format(self.group, self.version),
399 'plugin': self._plugin,
402 return self._template
405 k8s_utils.create_network(self.scope, self.group, self.version,
406 self.plural, self.template)
409 k8s_utils.delete_network(self.scope, self.group, self.version,
410 self.plural, self._name)
413 class KubernetesTemplate(object):
415 def __init__(self, name, context_cfg):
416 """KubernetesTemplate object initialization
418 :param name: (str) name of the Kubernetes context
419 :param context_cfg: (dict) context definition
421 context_cfg = copy.deepcopy(context_cfg)
422 servers_cfg = context_cfg.pop('servers', {})
423 crd_cfg = context_cfg.pop('custom_resources', [])
424 networks_cfg = context_cfg.pop('networks', {})
426 self.ssh_key = '{}-key'.format(name)
428 self.rcs = {self._get_rc_name(rc): cfg
429 for rc, cfg in servers_cfg.items()}
430 self.rc_objs = [ReplicationControllerObject(
431 rc, ssh_key=self.ssh_key, **cfg) for rc, cfg in self.rcs.items()]
432 self.service_objs = [ServiceNodePortObject(rc, **cfg)
433 for rc, cfg in self.rcs.items()]
434 self.crd = [CustomResourceDefinitionObject(self.name, **crd)
436 self.network_objs = [NetworkObject(net_name, **net_data)
437 for net_name, net_data in networks_cfg.items()]
440 def _get_rc_name(self, rc_name):
441 return '{}-{}'.format(rc_name, self.name)
443 def get_rc_pods(self):
444 resp = k8s_utils.get_pod_list()
445 self.pods = [p.metadata.name for p in resp.items for s in self.rcs
446 if p.metadata.name.startswith(s)]
450 def get_rc_by_name(self, rc_name):
451 """Returns a ``ReplicationControllerObject``, searching by name"""
452 for rc in (rc for rc in self.rc_objs if rc.name == rc_name):