1 ##############################################################################
2 # Copyright (c) 2017 Huawei Technologies Co.,Ltd.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
13 from oslo_serialization import jsonutils
15 from yardstick.common import constants
16 from yardstick.common import exceptions
17 from yardstick.common import kubernetes_utils as k8s_utils
18 from yardstick.common import utils
21 class ContainerObject(object):
23 SSH_MOUNT_PATH = '/tmp/.ssh/'
24 IMAGE_DEFAULT = 'openretriever/yardstick'
25 COMMAND_DEFAULT = '/bin/bash'
26 RESOURCES = ('requests', 'limits')
27 PORT_OPTIONS = ('containerPort', 'hostIP', 'hostPort', 'name', 'protocol')
28 IMAGE_PULL_POLICY = ('Always', 'IfNotPresent', 'Never')
30 def __init__(self, name, ssh_key, **kwargs):
32 self._ssh_key = ssh_key
33 self._image = kwargs.get('image', self.IMAGE_DEFAULT)
34 self._command = [kwargs.get('command', self.COMMAND_DEFAULT)]
35 self._args = kwargs.get('args', [])
36 self._volume_mounts = kwargs.get('volumeMounts', [])
37 self._security_context = kwargs.get('securityContext')
38 self._env = kwargs.get('env', [])
39 self._resources = kwargs.get('resources', {})
40 self._ports = kwargs.get('ports', [])
41 self._image_pull_policy = kwargs.get('imagePullPolicy')
43 def _create_volume_mounts(self):
44 """Return all "volumeMounts" items per container"""
45 volume_mounts_items = [self._create_volume_mounts_item(vol)
46 for vol in self._volume_mounts]
47 ssh_vol = {'name': self._ssh_key,
48 'mountPath': self.SSH_MOUNT_PATH}
49 volume_mounts_items.append(self._create_volume_mounts_item(ssh_vol))
50 return volume_mounts_items
53 def _create_volume_mounts_item(volume_mount):
54 """Create a "volumeMounts" item"""
55 return {'name': volume_mount['name'],
56 'mountPath': volume_mount['mountPath'],
57 'readOnly': volume_mount.get('readOnly', False)}
59 def get_container_item(self):
60 """Create a "container" item"""
61 container_name = '{}-container'.format(self._name)
62 container = {'args': self._args,
63 'command': self._command,
65 'name': container_name,
66 'volumeMounts': self._create_volume_mounts()}
67 if self._security_context:
68 container['securityContext'] = self._security_context
72 container['env'].append({'name': env['name'],
73 'value': env['value']})
75 container['ports'] = []
76 for port in self._ports:
77 if 'containerPort' not in port.keys():
78 raise exceptions.KubernetesContainerPortNotDefined(
80 _port = {port_option: value for port_option, value
81 in port.items() if port_option in self.PORT_OPTIONS}
82 container['ports'].append(_port)
84 container['resources'] = {}
85 for res in (res for res in self._resources if
86 res in self.RESOURCES):
87 container['resources'][res] = self._resources[res]
88 if self._image_pull_policy:
89 if self._image_pull_policy not in self.IMAGE_PULL_POLICY:
90 raise exceptions.KubernetesContainerWrongImagePullPolicy()
91 container['imagePullPolicy'] = self._image_pull_policy
95 class ReplicationControllerObject(object):
97 SSHKEY_DEFAULT = 'yardstick_key'
98 RESTART_POLICY = ('Always', 'OnFailure', 'Never')
99 TOLERATIONS_KEYS = ('key', 'value', 'effect', 'operator')
101 def __init__(self, name, **kwargs):
102 super(ReplicationControllerObject, self).__init__()
103 parameters = copy.deepcopy(kwargs)
105 self.node_selector = parameters.pop('nodeSelector', {})
106 self.ssh_key = parameters.pop('ssh_key', self.SSHKEY_DEFAULT)
107 self._volumes = parameters.pop('volumes', [])
108 self._security_context = parameters.pop('securityContext', None)
109 self._networks = parameters.pop('networks', [])
110 self._tolerations = parameters.pop('tolerations', [])
111 self._restart_policy = parameters.pop('restartPolicy', 'Always')
112 if self._restart_policy not in self.RESTART_POLICY:
113 raise exceptions.KubernetesWrongRestartPolicy(
114 rpolicy=self._restart_policy)
116 containers = parameters.pop('containers', None)
119 ContainerObject(self.name, self.ssh_key, **container)
120 for container in containers]
123 ContainerObject(self.name, self.ssh_key, **parameters)]
127 "kind": "ReplicationController",
135 "labels": {"app": name}
141 "restartPolicy": self._restart_policy,
148 self._change_value_according_name(name)
149 self._add_containers()
150 self._add_node_selector()
152 self._add_security_context()
154 self._add_tolerations()
158 return self._networks
160 def get_template(self):
163 def _change_value_according_name(self, name):
164 utils.set_dict_value(self.template, 'metadata.name', name)
166 utils.set_dict_value(self.template,
167 'spec.template.metadata.labels.app',
170 def _add_containers(self):
171 containers = [container.get_container_item()
172 for container in self._containers]
173 utils.set_dict_value(self.template,
174 'spec.template.spec.containers',
177 def _add_node_selector(self):
178 utils.set_dict_value(self.template,
179 'spec.template.spec.nodeSelector',
182 def _add_volumes(self):
183 """Add "volume" items to container specs, including the SSH one"""
184 volume_items = [self._create_volume_item(vol) for vol in self._volumes]
185 volume_items.append(self._create_ssh_key_volume())
186 utils.set_dict_value(self.template,
187 'spec.template.spec.volumes',
190 def _create_ssh_key_volume(self):
191 """Create a "volume" item of type "configMap" for the SSH key"""
192 return {'name': self.ssh_key,
193 'configMap': {'name': self.ssh_key}}
196 def _create_volume_item(volume):
197 """Create a "volume" item"""
198 volume = copy.deepcopy(volume)
199 name = volume.pop('name')
200 for key in (k for k in volume if k in k8s_utils.get_volume_types()):
202 type_data = volume[key]
205 raise exceptions.KubernetesTemplateInvalidVolumeType(volume=volume)
207 return {'name': name,
208 type_name: type_data}
210 def _add_security_context(self):
211 if self._security_context:
212 utils.set_dict_value(self.template,
213 'spec.template.spec.securityContext',
214 self._security_context)
216 def _add_networks(self):
218 for net in self._networks:
219 networks.append({'name': net})
224 annotations = {'networks': jsonutils.dumps(networks)}
225 utils.set_dict_value(self.template,
226 'spec.template.metadata.annotations',
229 def _add_tolerations(self):
231 for tol in self._tolerations:
232 tolerations.append({k: tol[k] for k in tol
233 if k in self.TOLERATIONS_KEYS})
235 tolerations = ([{'operator': 'Exists'}] if not tolerations
237 utils.set_dict_value(self.template,
238 'spec.template.spec.tolerations',
242 class ServiceNodePortObject(object):
244 MANDATORY_PARAMETERS = {'port', 'name'}
245 NAME_REGEX = re.compile(r'^[a-z0-9]([-a-z0-9]*[a-z0-9])?$')
247 def __init__(self, name, **kwargs):
248 """Service kind "NodePort" object
250 :param name: (string) name of the Service
251 :param kwargs: (dict) node_ports -> (list) port, name, targetPort,
254 self._name = '{}-service'.format(name)
256 'metadata': {'name': '{}-service'.format(name)},
260 'selector': {'app': name}
264 self._add_port(22, 'ssh', protocol='TCP')
265 node_ports = copy.deepcopy(kwargs.get('node_ports', []))
266 for port in node_ports:
267 if not self.MANDATORY_PARAMETERS.issubset(port.keys()):
268 missing_parameters = ', '.join(
269 str(param) for param in
270 (self.MANDATORY_PARAMETERS - set(port.keys())))
271 raise exceptions.KubernetesServiceObjectDefinitionError(
272 missing_parameters=missing_parameters)
273 port_number = port.pop('port')
274 name = port.pop('name')
275 if not self.NAME_REGEX.match(name):
276 raise exceptions.KubernetesServiceObjectNameError(name=name)
277 self._add_port(port_number, name, **port)
279 def _add_port(self, port, name, protocol=None, targetPort=None,
281 _port = {'port': port,
284 _port['protocol'] = protocol
286 _port['targetPort'] = targetPort
288 _port['nodePort'] = nodePort
289 self.template['spec']['ports'].append(_port)
292 k8s_utils.create_service(self.template)
295 k8s_utils.delete_service(self._name)
298 class CustomResourceDefinitionObject(object):
300 MANDATORY_PARAMETERS = {'name'}
302 def __init__(self, ctx_name, **kwargs):
303 if not self.MANDATORY_PARAMETERS.issubset(kwargs):
304 missing_parameters = ', '.join(
305 str(param) for param in
306 (self.MANDATORY_PARAMETERS - set(kwargs)))
307 raise exceptions.KubernetesCRDObjectDefinitionError(
308 missing_parameters=missing_parameters)
310 singular = kwargs['name']
311 plural = singular + 's'
312 kind = singular.title()
313 version = kwargs.get('version', 'v1')
314 scope = kwargs.get('scope', constants.SCOPE_NAMESPACED)
315 group = ctx_name + '.com'
316 self._name = metadata_name = plural + '.' + group
320 'name': metadata_name
326 'names': {'plural': plural,
327 'singular': singular,
333 k8s_utils.create_custom_resource_definition(self._template)
336 k8s_utils.delete_custom_resource_definition(self._name)
339 class NetworkObject(object):
341 MANDATORY_PARAMETERS = {'plugin', 'args'}
344 def __init__(self, name, **kwargs):
345 if not self.MANDATORY_PARAMETERS.issubset(kwargs):
346 missing_parameters = ', '.join(
347 str(param) for param in
348 (self.MANDATORY_PARAMETERS - set(kwargs)))
349 raise exceptions.KubernetesNetworkObjectDefinitionError(
350 missing_parameters=missing_parameters)
353 self._plugin = kwargs['plugin']
354 self._args = kwargs['args']
356 self._template = None
366 crd = k8s_utils.get_custom_resource_definition(self.KIND)
368 raise exceptions.KubernetesNetworkObjectKindMissing()
376 self._group = self.crd.spec.group
383 self._version = self.crd.spec.version
390 self._plural = self.crd.spec.names.plural
397 self._scope = self.crd.spec.scope
402 """"Network" object template
404 This template can be rendered only once the CRD "Network" is created in
405 Kubernetes. This function call must be delayed until the creation of
409 return self._template
412 'apiVersion': '{}/{}'.format(self.group, self.version),
417 'plugin': self._plugin,
420 return self._template
423 k8s_utils.create_network(self.scope, self.group, self.version,
424 self.plural, self.template)
427 k8s_utils.delete_network(self.scope, self.group, self.version,
428 self.plural, self._name)
431 class KubernetesTemplate(object):
433 def __init__(self, name, context_cfg):
434 """KubernetesTemplate object initialization
436 :param name: (str) name of the Kubernetes context
437 :param context_cfg: (dict) context definition
439 context_cfg = copy.deepcopy(context_cfg)
440 servers_cfg = context_cfg.pop('servers', {})
441 crd_cfg = context_cfg.pop('custom_resources', [])
442 networks_cfg = context_cfg.pop('networks', {})
444 self.ssh_key = '{}-key'.format(name)
446 self.rcs = {self._get_rc_name(rc): cfg
447 for rc, cfg in servers_cfg.items()}
448 self.rc_objs = [ReplicationControllerObject(
449 rc, ssh_key=self.ssh_key, **cfg) for rc, cfg in self.rcs.items()]
450 self.service_objs = [ServiceNodePortObject(rc, **cfg)
451 for rc, cfg in self.rcs.items()]
452 self.crd = [CustomResourceDefinitionObject(self.name, **crd)
454 self.network_objs = [NetworkObject(net_name, **net_data)
455 for net_name, net_data in networks_cfg.items()]
458 def _get_rc_name(self, rc_name):
459 return '{}-{}'.format(rc_name, self.name)
461 def get_rc_pods(self):
462 resp = k8s_utils.get_pod_list()
463 self.pods = [p.metadata.name for p in resp.items for s in self.rcs
464 if p.metadata.name.startswith(s)]
468 def get_rc_by_name(self, rc_name):
469 """Returns a ``ReplicationControllerObject``, searching by name"""
470 for rc in (rc for rc in self.rc_objs if rc.name == rc_name):