1 ##############################################################################
2 # Copyright (c) 2017 Huawei Technologies Co.,Ltd.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
13 from oslo_serialization import jsonutils
15 from yardstick.common import constants
16 from yardstick.common import exceptions
17 from yardstick.common import kubernetes_utils as k8s_utils
18 from yardstick.common import utils
21 class ContainerObject(object):
23 SSH_MOUNT_PATH = '/tmp/.ssh/'
24 IMAGE_DEFAULT = 'openretriever/yardstick'
25 COMMAND_DEFAULT = '/bin/bash'
26 RESOURCES = ('requests', 'limits')
27 PORT_OPTIONS = ('containerPort', 'hostIP', 'hostPort', 'name', 'protocol')
29 def __init__(self, name, ssh_key, **kwargs):
31 self._ssh_key = ssh_key
32 self._image = kwargs.get('image', self.IMAGE_DEFAULT)
33 self._command = [kwargs.get('command', self.COMMAND_DEFAULT)]
34 self._args = kwargs.get('args', [])
35 self._volume_mounts = kwargs.get('volumeMounts', [])
36 self._security_context = kwargs.get('securityContext')
37 self._env = kwargs.get('env', [])
38 self._resources = kwargs.get('resources', {})
39 self._ports = kwargs.get('ports', [])
41 def _create_volume_mounts(self):
42 """Return all "volumeMounts" items per container"""
43 volume_mounts_items = [self._create_volume_mounts_item(vol)
44 for vol in self._volume_mounts]
45 ssh_vol = {'name': self._ssh_key,
46 'mountPath': self.SSH_MOUNT_PATH}
47 volume_mounts_items.append(self._create_volume_mounts_item(ssh_vol))
48 return volume_mounts_items
51 def _create_volume_mounts_item(volume_mount):
52 """Create a "volumeMounts" item"""
53 return {'name': volume_mount['name'],
54 'mountPath': volume_mount['mountPath'],
55 'readOnly': volume_mount.get('readOnly', False)}
57 def get_container_item(self):
58 """Create a "container" item"""
59 container_name = '{}-container'.format(self._name)
60 container = {'args': self._args,
61 'command': self._command,
63 'name': container_name,
64 'volumeMounts': self._create_volume_mounts()}
65 if self._security_context:
66 container['securityContext'] = self._security_context
70 container['env'].append({'name': env['name'],
71 'value': env['value']})
73 container['ports'] = []
74 for port in self._ports:
75 if 'containerPort' not in port.keys():
76 raise exceptions.KubernetesContainerPortNotDefined(
78 _port = {port_option: value for port_option, value
79 in port.items() if port_option in self.PORT_OPTIONS}
80 container['ports'].append(_port)
82 container['resources'] = {}
83 for res in (res for res in self._resources if
84 res in self.RESOURCES):
85 container['resources'][res] = self._resources[res]
89 class ReplicationControllerObject(object):
91 SSHKEY_DEFAULT = 'yardstick_key'
92 RESTART_POLICY = ('Always', 'OnFailure', 'Never')
93 TOLERATIONS_KEYS = ('key', 'value', 'effect', 'operator')
95 def __init__(self, name, **kwargs):
96 super(ReplicationControllerObject, self).__init__()
97 parameters = copy.deepcopy(kwargs)
99 self.node_selector = parameters.pop('nodeSelector', {})
100 self.ssh_key = parameters.pop('ssh_key', self.SSHKEY_DEFAULT)
101 self._volumes = parameters.pop('volumes', [])
102 self._security_context = parameters.pop('securityContext', None)
103 self._networks = parameters.pop('networks', [])
104 self._tolerations = parameters.pop('tolerations', [])
105 self._restart_policy = parameters.pop('restartPolicy', 'Always')
106 if self._restart_policy not in self.RESTART_POLICY:
107 raise exceptions.KubernetesWrongRestartPolicy(
108 rpolicy=self._restart_policy)
110 containers = parameters.pop('containers', None)
113 ContainerObject(self.name, self.ssh_key, **container)
114 for container in containers]
117 ContainerObject(self.name, self.ssh_key, **parameters)]
121 "kind": "ReplicationController",
129 "labels": {"app": name}
135 "restartPolicy": self._restart_policy,
142 self._change_value_according_name(name)
143 self._add_containers()
144 self._add_node_selector()
146 self._add_security_context()
148 self._add_tolerations()
152 return self._networks
154 def get_template(self):
157 def _change_value_according_name(self, name):
158 utils.set_dict_value(self.template, 'metadata.name', name)
160 utils.set_dict_value(self.template,
161 'spec.template.metadata.labels.app',
164 def _add_containers(self):
165 containers = [container.get_container_item()
166 for container in self._containers]
167 utils.set_dict_value(self.template,
168 'spec.template.spec.containers',
171 def _add_node_selector(self):
172 utils.set_dict_value(self.template,
173 'spec.template.spec.nodeSelector',
176 def _add_volumes(self):
177 """Add "volume" items to container specs, including the SSH one"""
178 volume_items = [self._create_volume_item(vol) for vol in self._volumes]
179 volume_items.append(self._create_ssh_key_volume())
180 utils.set_dict_value(self.template,
181 'spec.template.spec.volumes',
184 def _create_ssh_key_volume(self):
185 """Create a "volume" item of type "configMap" for the SSH key"""
186 return {'name': self.ssh_key,
187 'configMap': {'name': self.ssh_key}}
190 def _create_volume_item(volume):
191 """Create a "volume" item"""
192 volume = copy.deepcopy(volume)
193 name = volume.pop('name')
194 for key in (k for k in volume if k in k8s_utils.get_volume_types()):
196 type_data = volume[key]
199 raise exceptions.KubernetesTemplateInvalidVolumeType(volume=volume)
201 return {'name': name,
202 type_name: type_data}
204 def _add_security_context(self):
205 if self._security_context:
206 utils.set_dict_value(self.template,
207 'spec.template.spec.securityContext',
208 self._security_context)
210 def _add_networks(self):
212 for net in self._networks:
213 networks.append({'name': net})
218 annotations = {'networks': jsonutils.dumps(networks)}
219 utils.set_dict_value(self.template,
220 'spec.template.metadata.annotations',
223 def _add_tolerations(self):
225 for tol in self._tolerations:
226 tolerations.append({k: tol[k] for k in tol
227 if k in self.TOLERATIONS_KEYS})
229 tolerations = ([{'operator': 'Exists'}] if not tolerations
231 utils.set_dict_value(self.template,
232 'spec.template.spec.tolerations',
236 class ServiceNodePortObject(object):
238 MANDATORY_PARAMETERS = {'port', 'name'}
239 NAME_REGEX = re.compile(r'^[a-z0-9]([-a-z0-9]*[a-z0-9])?$')
241 def __init__(self, name, **kwargs):
242 """Service kind "NodePort" object
244 :param name: (string) name of the Service
245 :param kwargs: (dict) node_ports -> (list) port, name, targetPort,
248 self._name = '{}-service'.format(name)
250 'metadata': {'name': '{}-service'.format(name)},
254 'selector': {'app': name}
258 self._add_port(22, 'ssh', protocol='TCP')
259 node_ports = copy.deepcopy(kwargs.get('node_ports', []))
260 for port in node_ports:
261 if not self.MANDATORY_PARAMETERS.issubset(port.keys()):
262 missing_parameters = ', '.join(
263 str(param) for param in
264 (self.MANDATORY_PARAMETERS - set(port.keys())))
265 raise exceptions.KubernetesServiceObjectDefinitionError(
266 missing_parameters=missing_parameters)
267 port_number = port.pop('port')
268 name = port.pop('name')
269 if not self.NAME_REGEX.match(name):
270 raise exceptions.KubernetesServiceObjectNameError(name=name)
271 self._add_port(port_number, name, **port)
273 def _add_port(self, port, name, protocol=None, targetPort=None,
275 _port = {'port': port,
278 _port['protocol'] = protocol
280 _port['targetPort'] = targetPort
282 _port['nodePort'] = nodePort
283 self.template['spec']['ports'].append(_port)
286 k8s_utils.create_service(self.template)
289 k8s_utils.delete_service(self._name)
292 class CustomResourceDefinitionObject(object):
294 MANDATORY_PARAMETERS = {'name'}
296 def __init__(self, ctx_name, **kwargs):
297 if not self.MANDATORY_PARAMETERS.issubset(kwargs):
298 missing_parameters = ', '.join(
299 str(param) for param in
300 (self.MANDATORY_PARAMETERS - set(kwargs)))
301 raise exceptions.KubernetesCRDObjectDefinitionError(
302 missing_parameters=missing_parameters)
304 singular = kwargs['name']
305 plural = singular + 's'
306 kind = singular.title()
307 version = kwargs.get('version', 'v1')
308 scope = kwargs.get('scope', constants.SCOPE_NAMESPACED)
309 group = ctx_name + '.com'
310 self._name = metadata_name = plural + '.' + group
314 'name': metadata_name
320 'names': {'plural': plural,
321 'singular': singular,
327 k8s_utils.create_custom_resource_definition(self._template)
330 k8s_utils.delete_custom_resource_definition(self._name)
333 class NetworkObject(object):
335 MANDATORY_PARAMETERS = {'plugin', 'args'}
338 def __init__(self, name, **kwargs):
339 if not self.MANDATORY_PARAMETERS.issubset(kwargs):
340 missing_parameters = ', '.join(
341 str(param) for param in
342 (self.MANDATORY_PARAMETERS - set(kwargs)))
343 raise exceptions.KubernetesNetworkObjectDefinitionError(
344 missing_parameters=missing_parameters)
347 self._plugin = kwargs['plugin']
348 self._args = kwargs['args']
350 self._template = None
360 crd = k8s_utils.get_custom_resource_definition(self.KIND)
362 raise exceptions.KubernetesNetworkObjectKindMissing()
370 self._group = self.crd.spec.group
377 self._version = self.crd.spec.version
384 self._plural = self.crd.spec.names.plural
391 self._scope = self.crd.spec.scope
396 """"Network" object template
398 This template can be rendered only once the CRD "Network" is created in
399 Kubernetes. This function call must be delayed until the creation of
403 return self._template
406 'apiVersion': '{}/{}'.format(self.group, self.version),
411 'plugin': self._plugin,
414 return self._template
417 k8s_utils.create_network(self.scope, self.group, self.version,
418 self.plural, self.template)
421 k8s_utils.delete_network(self.scope, self.group, self.version,
422 self.plural, self._name)
425 class KubernetesTemplate(object):
427 def __init__(self, name, context_cfg):
428 """KubernetesTemplate object initialization
430 :param name: (str) name of the Kubernetes context
431 :param context_cfg: (dict) context definition
433 context_cfg = copy.deepcopy(context_cfg)
434 servers_cfg = context_cfg.pop('servers', {})
435 crd_cfg = context_cfg.pop('custom_resources', [])
436 networks_cfg = context_cfg.pop('networks', {})
438 self.ssh_key = '{}-key'.format(name)
440 self.rcs = {self._get_rc_name(rc): cfg
441 for rc, cfg in servers_cfg.items()}
442 self.rc_objs = [ReplicationControllerObject(
443 rc, ssh_key=self.ssh_key, **cfg) for rc, cfg in self.rcs.items()]
444 self.service_objs = [ServiceNodePortObject(rc, **cfg)
445 for rc, cfg in self.rcs.items()]
446 self.crd = [CustomResourceDefinitionObject(self.name, **crd)
448 self.network_objs = [NetworkObject(net_name, **net_data)
449 for net_name, net_data in networks_cfg.items()]
452 def _get_rc_name(self, rc_name):
453 return '{}-{}'.format(rc_name, self.name)
455 def get_rc_pods(self):
456 resp = k8s_utils.get_pod_list()
457 self.pods = [p.metadata.name for p in resp.items for s in self.rcs
458 if p.metadata.name.startswith(s)]
462 def get_rc_by_name(self, rc_name):
463 """Returns a ``ReplicationControllerObject``, searching by name"""
464 for rc in (rc for rc in self.rc_objs if rc.name == rc_name):