1 ##############################################################################
2 # Copyright (c) 2017 Huawei Technologies Co.,Ltd.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
13 from oslo_serialization import jsonutils
16 from yardstick.common import constants
17 from yardstick.common import exceptions
18 from yardstick.common import kubernetes_utils as k8s_utils
19 from yardstick.common import utils
22 class ContainerObject(object):
24 SSH_MOUNT_PATH = '/tmp/.ssh/'
25 IMAGE_DEFAULT = 'openretriever/yardstick'
26 COMMAND_DEFAULT = ['/bin/bash', '-c']
27 RESOURCES = ('requests', 'limits')
28 PORT_OPTIONS = ('containerPort', 'hostIP', 'hostPort', 'name', 'protocol')
29 IMAGE_PULL_POLICY = ('Always', 'IfNotPresent', 'Never')
31 def __init__(self, name, ssh_key, **kwargs):
33 self._ssh_key = ssh_key
34 self._image = kwargs.get('image', self.IMAGE_DEFAULT)
35 self._command = self._parse_commands(
36 kwargs.get('command', self.COMMAND_DEFAULT))
37 self._args = self._parse_commands(kwargs.get('args', []))
38 self._volume_mounts = kwargs.get('volumeMounts', [])
39 self._security_context = kwargs.get('securityContext')
40 self._env = kwargs.get('env', [])
41 self._resources = kwargs.get('resources', {})
42 self._ports = kwargs.get('ports', [])
43 self._image_pull_policy = kwargs.get('imagePullPolicy')
46 def _parse_commands(command):
47 if isinstance(command, six.string_types):
49 elif isinstance(command, list):
51 raise exceptions.KubernetesContainerCommandType()
53 def _create_volume_mounts(self):
54 """Return all "volumeMounts" items per container"""
55 volume_mounts_items = [self._create_volume_mounts_item(vol)
56 for vol in self._volume_mounts]
57 ssh_vol = {'name': self._ssh_key,
58 'mountPath': self.SSH_MOUNT_PATH}
59 volume_mounts_items.append(self._create_volume_mounts_item(ssh_vol))
60 return volume_mounts_items
63 def _create_volume_mounts_item(volume_mount):
64 """Create a "volumeMounts" item"""
65 return {'name': volume_mount['name'],
66 'mountPath': volume_mount['mountPath'],
67 'readOnly': volume_mount.get('readOnly', False)}
69 def get_container_item(self):
70 """Create a "container" item"""
71 container_name = '{}-container'.format(self._name)
72 container = {'args': self._args,
73 'command': self._command,
75 'name': container_name,
76 'volumeMounts': self._create_volume_mounts()}
77 if self._security_context:
78 container['securityContext'] = self._security_context
82 container['env'].append({'name': env['name'],
83 'value': env['value']})
85 container['ports'] = []
86 for port in self._ports:
87 if 'containerPort' not in port.keys():
88 raise exceptions.KubernetesContainerPortNotDefined(
90 _port = {port_option: value for port_option, value
91 in port.items() if port_option in self.PORT_OPTIONS}
92 container['ports'].append(_port)
94 container['resources'] = {}
95 for res in (res for res in self._resources if
96 res in self.RESOURCES):
97 container['resources'][res] = self._resources[res]
98 if self._image_pull_policy:
99 if self._image_pull_policy not in self.IMAGE_PULL_POLICY:
100 raise exceptions.KubernetesContainerWrongImagePullPolicy()
101 container['imagePullPolicy'] = self._image_pull_policy
105 class ReplicationControllerObject(object):
107 SSHKEY_DEFAULT = 'yardstick_key'
108 RESTART_POLICY = ('Always', 'OnFailure', 'Never')
109 TOLERATIONS_KEYS = ('key', 'value', 'effect', 'operator')
111 def __init__(self, name, **kwargs):
112 super(ReplicationControllerObject, self).__init__()
113 parameters = copy.deepcopy(kwargs)
115 self.node_selector = parameters.pop('nodeSelector', {})
116 self.ssh_key = parameters.pop('ssh_key', self.SSHKEY_DEFAULT)
117 self._volumes = parameters.pop('volumes', [])
118 self._security_context = parameters.pop('securityContext', None)
119 self._networks = parameters.pop('networks', [])
120 self._tolerations = parameters.pop('tolerations', [])
121 self._restart_policy = parameters.pop('restartPolicy', 'Always')
122 if self._restart_policy not in self.RESTART_POLICY:
123 raise exceptions.KubernetesWrongRestartPolicy(
124 rpolicy=self._restart_policy)
126 containers = parameters.pop('containers', None)
129 ContainerObject(self.name, self.ssh_key, **container)
130 for container in containers]
133 ContainerObject(self.name, self.ssh_key, **parameters)]
137 "kind": "ReplicationController",
145 "labels": {"app": name}
151 "restartPolicy": self._restart_policy,
158 self._change_value_according_name(name)
159 self._add_containers()
160 self._add_node_selector()
162 self._add_security_context()
164 self._add_tolerations()
168 return self._networks
170 def get_template(self):
173 def _change_value_according_name(self, name):
174 utils.set_dict_value(self.template, 'metadata.name', name)
176 utils.set_dict_value(self.template,
177 'spec.template.metadata.labels.app',
180 def _add_containers(self):
181 containers = [container.get_container_item()
182 for container in self._containers]
183 utils.set_dict_value(self.template,
184 'spec.template.spec.containers',
187 def _add_node_selector(self):
188 utils.set_dict_value(self.template,
189 'spec.template.spec.nodeSelector',
192 def _add_volumes(self):
193 """Add "volume" items to container specs, including the SSH one"""
194 volume_items = [self._create_volume_item(vol) for vol in self._volumes]
195 volume_items.append(self._create_ssh_key_volume())
196 utils.set_dict_value(self.template,
197 'spec.template.spec.volumes',
200 def _create_ssh_key_volume(self):
201 """Create a "volume" item of type "configMap" for the SSH key"""
202 return {'name': self.ssh_key,
203 'configMap': {'name': self.ssh_key}}
206 def _create_volume_item(volume):
207 """Create a "volume" item"""
208 volume = copy.deepcopy(volume)
209 name = volume.pop('name')
210 for key in (k for k in volume if k in k8s_utils.get_volume_types()):
212 type_data = volume[key]
215 raise exceptions.KubernetesTemplateInvalidVolumeType(volume=volume)
217 return {'name': name,
218 type_name: type_data}
220 def _add_security_context(self):
221 if self._security_context:
222 utils.set_dict_value(self.template,
223 'spec.template.spec.securityContext',
224 self._security_context)
226 def _add_networks(self):
228 for net in self._networks:
229 networks.append({'name': net})
234 annotations = {'networks': jsonutils.dumps(networks)}
235 utils.set_dict_value(self.template,
236 'spec.template.metadata.annotations',
239 def _add_tolerations(self):
241 for tol in self._tolerations:
242 tolerations.append({k: tol[k] for k in tol
243 if k in self.TOLERATIONS_KEYS})
245 tolerations = ([{'operator': 'Exists'}] if not tolerations
247 utils.set_dict_value(self.template,
248 'spec.template.spec.tolerations',
252 class ServiceNodePortObject(object):
254 MANDATORY_PARAMETERS = {'port', 'name'}
255 NAME_REGEX = re.compile(r'^[a-z0-9]([-a-z0-9]*[a-z0-9])?$')
257 def __init__(self, name, **kwargs):
258 """Service kind "NodePort" object
260 :param name: (string) name of the Service
261 :param kwargs: (dict) node_ports -> (list) port, name, targetPort,
264 self._name = '{}-service'.format(name)
266 'metadata': {'name': '{}-service'.format(name)},
270 'selector': {'app': name}
274 self._add_port(22, 'ssh', protocol='TCP')
275 node_ports = copy.deepcopy(kwargs.get('node_ports', []))
276 for port in node_ports:
277 if not self.MANDATORY_PARAMETERS.issubset(port.keys()):
278 missing_parameters = ', '.join(
279 str(param) for param in
280 (self.MANDATORY_PARAMETERS - set(port.keys())))
281 raise exceptions.KubernetesServiceObjectDefinitionError(
282 missing_parameters=missing_parameters)
283 port_number = port.pop('port')
284 name = port.pop('name')
285 if not self.NAME_REGEX.match(name):
286 raise exceptions.KubernetesServiceObjectNameError(name=name)
287 self._add_port(port_number, name, **port)
289 def _add_port(self, port, name, protocol=None, targetPort=None,
291 _port = {'port': port,
294 _port['protocol'] = protocol
296 _port['targetPort'] = targetPort
298 _port['nodePort'] = nodePort
299 self.template['spec']['ports'].append(_port)
302 k8s_utils.create_service(self.template)
305 k8s_utils.delete_service(self._name)
308 class CustomResourceDefinitionObject(object):
310 MANDATORY_PARAMETERS = {'name'}
312 def __init__(self, ctx_name, **kwargs):
313 if not self.MANDATORY_PARAMETERS.issubset(kwargs):
314 missing_parameters = ', '.join(
315 str(param) for param in
316 (self.MANDATORY_PARAMETERS - set(kwargs)))
317 raise exceptions.KubernetesCRDObjectDefinitionError(
318 missing_parameters=missing_parameters)
320 singular = kwargs['name']
321 plural = singular + 's'
322 kind = singular.title()
323 version = kwargs.get('version', 'v1')
324 scope = kwargs.get('scope', constants.SCOPE_NAMESPACED)
325 group = ctx_name + '.com'
326 self._name = metadata_name = plural + '.' + group
330 'name': metadata_name
336 'names': {'plural': plural,
337 'singular': singular,
343 k8s_utils.create_custom_resource_definition(self._template)
346 k8s_utils.delete_custom_resource_definition(self._name)
349 class NetworkObject(object):
351 MANDATORY_PARAMETERS = {'plugin', 'args'}
354 def __init__(self, name, **kwargs):
355 if not self.MANDATORY_PARAMETERS.issubset(kwargs):
356 missing_parameters = ', '.join(
357 str(param) for param in
358 (self.MANDATORY_PARAMETERS - set(kwargs)))
359 raise exceptions.KubernetesNetworkObjectDefinitionError(
360 missing_parameters=missing_parameters)
363 self._plugin = kwargs['plugin']
364 self._args = kwargs['args']
366 self._template = None
376 crd = k8s_utils.get_custom_resource_definition(self.KIND)
378 raise exceptions.KubernetesNetworkObjectKindMissing()
386 self._group = self.crd.spec.group
393 self._version = self.crd.spec.version
400 self._plural = self.crd.spec.names.plural
407 self._scope = self.crd.spec.scope
412 """"Network" object template
414 This template can be rendered only once the CRD "Network" is created in
415 Kubernetes. This function call must be delayed until the creation of
419 return self._template
422 'apiVersion': '{}/{}'.format(self.group, self.version),
427 'plugin': self._plugin,
430 return self._template
433 k8s_utils.create_network(self.scope, self.group, self.version,
434 self.plural, self.template)
437 k8s_utils.delete_network(self.scope, self.group, self.version,
438 self.plural, self._name)
441 class KubernetesTemplate(object):
443 def __init__(self, name, context_cfg):
444 """KubernetesTemplate object initialization
446 :param name: (str) name of the Kubernetes context
447 :param context_cfg: (dict) context definition
449 context_cfg = copy.deepcopy(context_cfg)
450 servers_cfg = context_cfg.pop('servers', {})
451 crd_cfg = context_cfg.pop('custom_resources', [])
452 networks_cfg = context_cfg.pop('networks', {})
454 self.ssh_key = '{}-key'.format(name)
456 self.rcs = {self._get_rc_name(rc): cfg
457 for rc, cfg in servers_cfg.items()}
458 self.rc_objs = [ReplicationControllerObject(
459 rc, ssh_key=self.ssh_key, **cfg) for rc, cfg in self.rcs.items()]
460 self.service_objs = [ServiceNodePortObject(rc, **cfg)
461 for rc, cfg in self.rcs.items()]
462 self.crd = [CustomResourceDefinitionObject(self.name, **crd)
464 self.network_objs = [NetworkObject(net_name, **net_data)
465 for net_name, net_data in networks_cfg.items()]
468 def _get_rc_name(self, rc_name):
469 return '{}-{}'.format(rc_name, self.name)
471 def get_rc_pods(self):
472 resp = k8s_utils.get_pod_list()
473 self.pods = [p.metadata.name for p in resp.items for s in self.rcs
474 if p.metadata.name.startswith(s)]
478 def get_rc_by_name(self, rc_name):
479 """Returns a ``ReplicationControllerObject``, searching by name"""
480 for rc in (rc for rc in self.rc_objs if rc.name == rc_name):