1 ##############################################################################
2 # Copyright (c) 2017 Huawei Technologies Co.,Ltd.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
13 from oslo_serialization import jsonutils
16 from yardstick.common import constants
17 from yardstick.common import exceptions
18 from yardstick.common import kubernetes_utils as k8s_utils
19 from yardstick.common import utils
22 class ContainerObject(object):
24 SSH_MOUNT_PATH = '/tmp/.ssh/'
25 IMAGE_DEFAULT = 'openretriever/yardstick'
26 COMMAND_DEFAULT = ['/bin/bash', '-c']
27 RESOURCES = ('requests', 'limits')
28 PORT_OPTIONS = ('containerPort', 'hostIP', 'hostPort', 'name', 'protocol')
29 IMAGE_PULL_POLICY = ('Always', 'IfNotPresent', 'Never')
31 def __init__(self, name, ssh_key, **kwargs):
33 self._ssh_key = ssh_key
34 self._image = kwargs.get('image', self.IMAGE_DEFAULT)
35 self._command = self._parse_commands(
36 kwargs.get('command', self.COMMAND_DEFAULT))
37 self._args = self._parse_commands(kwargs.get('args', []))
38 self._volume_mounts = kwargs.get('volumeMounts', [])
39 self._security_context = kwargs.get('securityContext')
40 self._env = kwargs.get('env', [])
41 self._resources = kwargs.get('resources', {})
42 self._ports = kwargs.get('ports', [])
43 self._image_pull_policy = kwargs.get('imagePullPolicy')
44 self._tty = kwargs.get('tty')
45 self._stdin = kwargs.get('stdin')
48 def _parse_commands(command):
49 if isinstance(command, six.string_types):
51 elif isinstance(command, list):
53 raise exceptions.KubernetesContainerCommandType()
55 def _create_volume_mounts(self):
56 """Return all "volumeMounts" items per container"""
57 volume_mounts_items = [self._create_volume_mounts_item(vol)
58 for vol in self._volume_mounts]
59 ssh_vol = {'name': self._ssh_key,
60 'mountPath': self.SSH_MOUNT_PATH}
61 volume_mounts_items.append(self._create_volume_mounts_item(ssh_vol))
62 return volume_mounts_items
65 def _create_volume_mounts_item(volume_mount):
66 """Create a "volumeMounts" item"""
67 return {'name': volume_mount['name'],
68 'mountPath': volume_mount['mountPath'],
69 'readOnly': volume_mount.get('readOnly', False)}
71 def get_container_item(self):
72 """Create a "container" item"""
73 container_name = '{}-container'.format(self._name)
74 container = {'args': self._args,
75 'command': self._command,
77 'name': container_name,
78 'volumeMounts': self._create_volume_mounts()}
79 if self._security_context:
80 container['securityContext'] = self._security_context
84 container['env'].append({'name': env['name'],
85 'value': env['value']})
87 container['ports'] = []
88 for port in self._ports:
89 if 'containerPort' not in port.keys():
90 raise exceptions.KubernetesContainerPortNotDefined(
92 _port = {port_option: value for port_option, value
93 in port.items() if port_option in self.PORT_OPTIONS}
94 container['ports'].append(_port)
96 container['resources'] = {}
97 for res in (res for res in self._resources if
98 res in self.RESOURCES):
99 container['resources'][res] = self._resources[res]
100 if self._image_pull_policy:
101 if self._image_pull_policy not in self.IMAGE_PULL_POLICY:
102 raise exceptions.KubernetesContainerWrongImagePullPolicy()
103 container['imagePullPolicy'] = self._image_pull_policy
104 if self._stdin is not None:
105 container['stdin'] = self._stdin
106 if self._tty is not None:
107 container['tty'] = self._tty
111 class ReplicationControllerObject(object):
113 SSHKEY_DEFAULT = 'yardstick_key'
114 RESTART_POLICY = ('Always', 'OnFailure', 'Never')
115 TOLERATIONS_KEYS = ('key', 'value', 'effect', 'operator')
117 def __init__(self, name, **kwargs):
118 super(ReplicationControllerObject, self).__init__()
119 parameters = copy.deepcopy(kwargs)
121 self.node_selector = parameters.pop('nodeSelector', {})
122 self.ssh_key = parameters.pop('ssh_key', self.SSHKEY_DEFAULT)
123 self._volumes = parameters.pop('volumes', [])
124 self._security_context = parameters.pop('securityContext', None)
125 self._networks = parameters.pop('networks', [])
126 self._tolerations = parameters.pop('tolerations', [])
127 self._restart_policy = parameters.pop('restartPolicy', 'Always')
128 if self._restart_policy not in self.RESTART_POLICY:
129 raise exceptions.KubernetesWrongRestartPolicy(
130 rpolicy=self._restart_policy)
132 containers = parameters.pop('containers', None)
135 ContainerObject(self.name, self.ssh_key, **container)
136 for container in containers]
139 ContainerObject(self.name, self.ssh_key, **parameters)]
143 "kind": "ReplicationController",
151 "labels": {"app": name}
157 "restartPolicy": self._restart_policy,
164 self._change_value_according_name(name)
165 self._add_containers()
166 self._add_node_selector()
168 self._add_security_context()
170 self._add_tolerations()
174 return self._networks
176 def get_template(self):
179 def _change_value_according_name(self, name):
180 utils.set_dict_value(self.template, 'metadata.name', name)
182 utils.set_dict_value(self.template,
183 'spec.template.metadata.labels.app',
186 def _add_containers(self):
187 containers = [container.get_container_item()
188 for container in self._containers]
189 utils.set_dict_value(self.template,
190 'spec.template.spec.containers',
193 def _add_node_selector(self):
194 utils.set_dict_value(self.template,
195 'spec.template.spec.nodeSelector',
198 def _add_volumes(self):
199 """Add "volume" items to container specs, including the SSH one"""
200 volume_items = [self._create_volume_item(vol) for vol in self._volumes]
201 volume_items.append(self._create_ssh_key_volume())
202 utils.set_dict_value(self.template,
203 'spec.template.spec.volumes',
206 def _create_ssh_key_volume(self):
207 """Create a "volume" item of type "configMap" for the SSH key"""
208 return {'name': self.ssh_key,
209 'configMap': {'name': self.ssh_key}}
212 def _create_volume_item(volume):
213 """Create a "volume" item"""
214 volume = copy.deepcopy(volume)
215 name = volume.pop('name')
216 for key in (k for k in volume if k in k8s_utils.get_volume_types()):
218 type_data = volume[key]
221 raise exceptions.KubernetesTemplateInvalidVolumeType(volume=volume)
223 return {'name': name,
224 type_name: type_data}
226 def _add_security_context(self):
227 if self._security_context:
228 utils.set_dict_value(self.template,
229 'spec.template.spec.securityContext',
230 self._security_context)
232 def _add_networks(self):
234 for net in self._networks:
235 networks.append({'name': net})
240 annotations = {'networks': jsonutils.dumps(networks)}
241 utils.set_dict_value(self.template,
242 'spec.template.metadata.annotations',
245 def _add_tolerations(self):
247 for tol in self._tolerations:
248 tolerations.append({k: tol[k] for k in tol
249 if k in self.TOLERATIONS_KEYS})
251 tolerations = ([{'operator': 'Exists'}] if not tolerations
253 utils.set_dict_value(self.template,
254 'spec.template.spec.tolerations',
258 class ServiceNodePortObject(object):
260 MANDATORY_PARAMETERS = {'port', 'name'}
261 NAME_REGEX = re.compile(r'^[a-z0-9]([-a-z0-9]*[a-z0-9])?$')
263 def __init__(self, name, **kwargs):
264 """Service kind "NodePort" object
266 :param name: (string) name of the Service
267 :param kwargs: (dict) node_ports -> (list) port, name, targetPort,
270 self._name = '{}-service'.format(name)
272 'metadata': {'name': '{}-service'.format(name)},
276 'selector': {'app': name}
280 self._add_port(22, 'ssh', protocol='TCP')
281 node_ports = copy.deepcopy(kwargs.get('node_ports', []))
282 for port in node_ports:
283 if not self.MANDATORY_PARAMETERS.issubset(port.keys()):
284 missing_parameters = ', '.join(
285 str(param) for param in
286 (self.MANDATORY_PARAMETERS - set(port.keys())))
287 raise exceptions.KubernetesServiceObjectDefinitionError(
288 missing_parameters=missing_parameters)
289 port_number = port.pop('port')
290 name = port.pop('name')
291 if not self.NAME_REGEX.match(name):
292 raise exceptions.KubernetesServiceObjectNameError(name=name)
293 self._add_port(port_number, name, **port)
295 def _add_port(self, port, name, protocol=None, targetPort=None,
297 _port = {'port': port,
300 _port['protocol'] = protocol
302 _port['targetPort'] = targetPort
304 _port['nodePort'] = nodePort
305 self.template['spec']['ports'].append(_port)
308 k8s_utils.create_service(self.template)
311 k8s_utils.delete_service(self._name, skip_codes=[404])
314 class CustomResourceDefinitionObject(object):
316 MANDATORY_PARAMETERS = {'name'}
318 def __init__(self, ctx_name, **kwargs):
319 if not self.MANDATORY_PARAMETERS.issubset(kwargs):
320 missing_parameters = ', '.join(
321 str(param) for param in
322 (self.MANDATORY_PARAMETERS - set(kwargs)))
323 raise exceptions.KubernetesCRDObjectDefinitionError(
324 missing_parameters=missing_parameters)
326 singular = kwargs['name']
327 plural = singular + 's'
328 kind = singular.title()
329 version = kwargs.get('version', 'v1')
330 scope = kwargs.get('scope', constants.SCOPE_NAMESPACED)
331 group = ctx_name + '.com'
332 self._name = metadata_name = plural + '.' + group
336 'name': metadata_name
342 'names': {'plural': plural,
343 'singular': singular,
349 k8s_utils.create_custom_resource_definition(self._template)
352 k8s_utils.delete_custom_resource_definition(self._name, skip_codes=[404])
355 class NetworkObject(object):
357 MANDATORY_PARAMETERS = {'plugin', 'args'}
360 def __init__(self, name, **kwargs):
361 if not self.MANDATORY_PARAMETERS.issubset(kwargs):
362 missing_parameters = ', '.join(
363 str(param) for param in
364 (self.MANDATORY_PARAMETERS - set(kwargs)))
365 raise exceptions.KubernetesNetworkObjectDefinitionError(
366 missing_parameters=missing_parameters)
369 self._plugin = kwargs['plugin']
370 self._args = kwargs['args']
372 self._template = None
382 crd = k8s_utils.get_custom_resource_definition(self.KIND)
384 raise exceptions.KubernetesNetworkObjectKindMissing()
392 self._group = self.crd.spec.group
399 self._version = self.crd.spec.version
406 self._plural = self.crd.spec.names.plural
413 self._scope = self.crd.spec.scope
418 """"Network" object template
420 This template can be rendered only once the CRD "Network" is created in
421 Kubernetes. This function call must be delayed until the creation of
425 return self._template
428 'apiVersion': '{}/{}'.format(self.group, self.version),
433 'plugin': self._plugin,
436 return self._template
439 k8s_utils.create_network(self.scope, self.group, self.version,
440 self.plural, self.template, self._name)
443 k8s_utils.delete_network(self.scope, self.group, self.version,
444 self.plural, self._name, skip_codes=[404])
447 class KubernetesTemplate(object):
449 def __init__(self, name, context_cfg):
450 """KubernetesTemplate object initialization
452 :param name: (str) name of the Kubernetes context
453 :param context_cfg: (dict) context definition
455 context_cfg = copy.deepcopy(context_cfg)
456 servers_cfg = context_cfg.pop('servers', {})
457 crd_cfg = context_cfg.pop('custom_resources', [])
458 networks_cfg = context_cfg.pop('networks', {})
460 self.ssh_key = '{}-key'.format(name)
462 self.rcs = {self._get_rc_name(rc): cfg
463 for rc, cfg in servers_cfg.items()}
464 self.rc_objs = [ReplicationControllerObject(
465 rc, ssh_key=self.ssh_key, **cfg) for rc, cfg in self.rcs.items()]
466 self.service_objs = [ServiceNodePortObject(rc, **cfg)
467 for rc, cfg in self.rcs.items()]
468 self.crd = [CustomResourceDefinitionObject(self.name, **crd)
470 self.network_objs = [NetworkObject(net_name, **net_data)
471 for net_name, net_data in networks_cfg.items()]
474 def _get_rc_name(self, rc_name):
475 return '{}-{}'.format(rc_name, self.name)
477 def get_rc_pods(self):
478 resp = k8s_utils.get_pod_list()
479 self.pods = [p.metadata.name for p in resp.items for s in self.rcs
480 if p.metadata.name.startswith(s)]
484 def get_rc_by_name(self, rc_name):
485 """Returns a ``ReplicationControllerObject``, searching by name"""
486 for rc in (rc for rc in self.rc_objs if rc.name == rc_name):