Merge "Create Dockerfile to create a yardstick-image of docker"
[yardstick.git] / yardstick / orchestrator / kubernetes.py
1 ##############################################################################
2 # Copyright (c) 2017 Huawei Technologies Co.,Ltd.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import copy
11 import re
12
13 from oslo_serialization import jsonutils
14 import six
15
16 from yardstick.common import constants
17 from yardstick.common import exceptions
18 from yardstick.common import kubernetes_utils as k8s_utils
19 from yardstick.common import utils
20
21
22 class ContainerObject(object):
23
24     SSH_MOUNT_PATH = '/tmp/.ssh/'
25     IMAGE_DEFAULT = 'openretriever/yardstick'
26     COMMAND_DEFAULT = ['/bin/bash', '-c']
27     RESOURCES = ('requests', 'limits')
28     PORT_OPTIONS = ('containerPort', 'hostIP', 'hostPort', 'name', 'protocol')
29     IMAGE_PULL_POLICY = ('Always', 'IfNotPresent', 'Never')
30
31     def __init__(self, name, ssh_key, **kwargs):
32         self._name = name
33         self._ssh_key = ssh_key
34         self._image = kwargs.get('image', self.IMAGE_DEFAULT)
35         self._command = self._parse_commands(
36             kwargs.get('command', self.COMMAND_DEFAULT))
37         self._args = self._parse_commands(kwargs.get('args', []))
38         self._volume_mounts = kwargs.get('volumeMounts', [])
39         self._security_context = kwargs.get('securityContext')
40         self._env = kwargs.get('env', [])
41         self._resources = kwargs.get('resources', {})
42         self._ports = kwargs.get('ports', [])
43         self._image_pull_policy = kwargs.get('imagePullPolicy')
44         self._tty = kwargs.get('tty')
45         self._stdin = kwargs.get('stdin')
46
47     @staticmethod
48     def _parse_commands(command):
49         if isinstance(command, six.string_types):
50             return [command]
51         elif isinstance(command, list):
52             return command
53         raise exceptions.KubernetesContainerCommandType()
54
55     def _create_volume_mounts(self):
56         """Return all "volumeMounts" items per container"""
57         volume_mounts_items = [self._create_volume_mounts_item(vol)
58                                for vol in self._volume_mounts]
59         ssh_vol = {'name': self._ssh_key,
60                    'mountPath': self.SSH_MOUNT_PATH}
61         volume_mounts_items.append(self._create_volume_mounts_item(ssh_vol))
62         return volume_mounts_items
63
64     @staticmethod
65     def _create_volume_mounts_item(volume_mount):
66         """Create a "volumeMounts" item"""
67         return {'name': volume_mount['name'],
68                 'mountPath': volume_mount['mountPath'],
69                 'readOnly': volume_mount.get('readOnly', False)}
70
71     def get_container_item(self):
72         """Create a "container" item"""
73         container_name = '{}-container'.format(self._name)
74         container = {'args': self._args,
75                      'command': self._command,
76                      'image': self._image,
77                      'name': container_name,
78                      'volumeMounts': self._create_volume_mounts()}
79         if self._security_context:
80             container['securityContext'] = self._security_context
81         if self._env:
82             container['env'] = []
83             for env in self._env:
84                 container['env'].append({'name': env['name'],
85                                          'value': env['value']})
86         if self._ports:
87             container['ports'] = []
88             for port in self._ports:
89                 if 'containerPort' not in port.keys():
90                     raise exceptions.KubernetesContainerPortNotDefined(
91                         port=port)
92                 _port = {port_option: value for port_option, value
93                          in port.items() if port_option in self.PORT_OPTIONS}
94                 container['ports'].append(_port)
95         if self._resources:
96             container['resources'] = {}
97             for res in (res for res in self._resources if
98                         res in self.RESOURCES):
99                 container['resources'][res] = self._resources[res]
100         if self._image_pull_policy:
101             if self._image_pull_policy not in self.IMAGE_PULL_POLICY:
102                 raise exceptions.KubernetesContainerWrongImagePullPolicy()
103             container['imagePullPolicy'] = self._image_pull_policy
104         if self._stdin is not None:
105             container['stdin'] = self._stdin
106         if self._tty is not None:
107             container['tty'] = self._tty
108         return container
109
110
111 class ReplicationControllerObject(object):
112
113     SSHKEY_DEFAULT = 'yardstick_key'
114     RESTART_POLICY = ('Always', 'OnFailure', 'Never')
115     TOLERATIONS_KEYS = ('key', 'value', 'effect', 'operator')
116
117     def __init__(self, name, **kwargs):
118         super(ReplicationControllerObject, self).__init__()
119         parameters = copy.deepcopy(kwargs)
120         self.name = name
121         self.node_selector = parameters.pop('nodeSelector', {})
122         self.ssh_key = parameters.pop('ssh_key', self.SSHKEY_DEFAULT)
123         self._volumes = parameters.pop('volumes', [])
124         self._security_context = parameters.pop('securityContext', None)
125         self._networks = parameters.pop('networks', [])
126         self._tolerations = parameters.pop('tolerations', [])
127         self._restart_policy = parameters.pop('restartPolicy', 'Always')
128         if self._restart_policy not in self.RESTART_POLICY:
129             raise exceptions.KubernetesWrongRestartPolicy(
130                 rpolicy=self._restart_policy)
131
132         containers = parameters.pop('containers', None)
133         if containers:
134             self._containers = [
135                 ContainerObject(self.name, self.ssh_key, **container)
136                 for container in containers]
137         else:
138             self._containers = [
139                 ContainerObject(self.name, self.ssh_key, **parameters)]
140
141         self.template = {
142             "apiVersion": "v1",
143             "kind": "ReplicationController",
144             "metadata": {
145                 "name": ""
146             },
147             "spec": {
148                 "replicas": 1,
149                 "template": {
150                     "metadata": {
151                         "labels": {"app": name}
152                     },
153                     "spec": {
154                         "containers": [],
155                         "volumes": [],
156                         "nodeSelector": {},
157                         "restartPolicy": self._restart_policy,
158                         "tolerations": []
159                     }
160                 }
161             }
162         }
163
164         self._change_value_according_name(name)
165         self._add_containers()
166         self._add_node_selector()
167         self._add_volumes()
168         self._add_security_context()
169         self._add_networks()
170         self._add_tolerations()
171
172     @property
173     def networks(self):
174         return self._networks
175
176     def get_template(self):
177         return self.template
178
179     def _change_value_according_name(self, name):
180         utils.set_dict_value(self.template, 'metadata.name', name)
181
182         utils.set_dict_value(self.template,
183                              'spec.template.metadata.labels.app',
184                              name)
185
186     def _add_containers(self):
187         containers = [container.get_container_item()
188                       for container in self._containers]
189         utils.set_dict_value(self.template,
190                              'spec.template.spec.containers',
191                              containers)
192
193     def _add_node_selector(self):
194         utils.set_dict_value(self.template,
195                              'spec.template.spec.nodeSelector',
196                              self.node_selector)
197
198     def _add_volumes(self):
199         """Add "volume" items to container specs, including the SSH one"""
200         volume_items = [self._create_volume_item(vol) for vol in self._volumes]
201         volume_items.append(self._create_ssh_key_volume())
202         utils.set_dict_value(self.template,
203                              'spec.template.spec.volumes',
204                              volume_items)
205
206     def _create_ssh_key_volume(self):
207         """Create a "volume" item of type "configMap" for the SSH key"""
208         return {'name': self.ssh_key,
209                 'configMap': {'name': self.ssh_key}}
210
211     @staticmethod
212     def _create_volume_item(volume):
213         """Create a "volume" item"""
214         volume = copy.deepcopy(volume)
215         name = volume.pop('name')
216         for key in (k for k in volume if k in k8s_utils.get_volume_types()):
217             type_name = key
218             type_data = volume[key]
219             break
220         else:
221             raise exceptions.KubernetesTemplateInvalidVolumeType(volume=volume)
222
223         return {'name': name,
224                 type_name: type_data}
225
226     def _add_security_context(self):
227         if self._security_context:
228             utils.set_dict_value(self.template,
229                                  'spec.template.spec.securityContext',
230                                  self._security_context)
231
232     def _add_networks(self):
233         networks = []
234         for net in self._networks:
235             networks.append({'name': net})
236
237         if not networks:
238             return
239
240         annotations = {'networks': jsonutils.dumps(networks)}
241         utils.set_dict_value(self.template,
242                              'spec.template.metadata.annotations',
243                              annotations)
244
245     def _add_tolerations(self):
246         tolerations = []
247         for tol in self._tolerations:
248             tolerations.append({k: tol[k] for k in tol
249                                 if k in self.TOLERATIONS_KEYS})
250
251         tolerations = ([{'operator': 'Exists'}] if not tolerations
252                        else tolerations)
253         utils.set_dict_value(self.template,
254                              'spec.template.spec.tolerations',
255                              tolerations)
256
257
258 class ServiceNodePortObject(object):
259
260     MANDATORY_PARAMETERS = {'port', 'name'}
261     NAME_REGEX = re.compile(r'^[a-z0-9]([-a-z0-9]*[a-z0-9])?$')
262
263     def __init__(self, name, **kwargs):
264         """Service kind "NodePort" object
265
266         :param name: (string) name of the Service
267         :param kwargs: (dict) node_ports -> (list) port, name, targetPort,
268                                                    nodePort
269         """
270         self._name = '{}-service'.format(name)
271         self.template = {
272             'metadata': {'name': '{}-service'.format(name)},
273             'spec': {
274                 'type': 'NodePort',
275                 'ports': [],
276                 'selector': {'app': name}
277             }
278         }
279
280         self._add_port(22, 'ssh', protocol='TCP')
281         node_ports = copy.deepcopy(kwargs.get('node_ports', []))
282         for port in node_ports:
283             if not self.MANDATORY_PARAMETERS.issubset(port.keys()):
284                 missing_parameters = ', '.join(
285                     str(param) for param in
286                     (self.MANDATORY_PARAMETERS - set(port.keys())))
287                 raise exceptions.KubernetesServiceObjectDefinitionError(
288                     missing_parameters=missing_parameters)
289             port_number = port.pop('port')
290             name = port.pop('name')
291             if not self.NAME_REGEX.match(name):
292                 raise exceptions.KubernetesServiceObjectNameError(name=name)
293             self._add_port(port_number, name, **port)
294
295     def _add_port(self, port, name, protocol=None, targetPort=None,
296                   nodePort=None):
297         _port = {'port': port,
298                  'name': name}
299         if protocol:
300             _port['protocol'] = protocol
301         if targetPort:
302             _port['targetPort'] = targetPort
303         if nodePort:
304             _port['nodePort'] = nodePort
305         self.template['spec']['ports'].append(_port)
306
307     def create(self):
308         k8s_utils.create_service(self.template)
309
310     def delete(self):
311         k8s_utils.delete_service(self._name)
312
313
314 class CustomResourceDefinitionObject(object):
315
316     MANDATORY_PARAMETERS = {'name'}
317
318     def __init__(self, ctx_name, **kwargs):
319         if not self.MANDATORY_PARAMETERS.issubset(kwargs):
320             missing_parameters = ', '.join(
321                 str(param) for param in
322                 (self.MANDATORY_PARAMETERS - set(kwargs)))
323             raise exceptions.KubernetesCRDObjectDefinitionError(
324                 missing_parameters=missing_parameters)
325
326         singular = kwargs['name']
327         plural = singular + 's'
328         kind = singular.title()
329         version = kwargs.get('version', 'v1')
330         scope = kwargs.get('scope', constants.SCOPE_NAMESPACED)
331         group = ctx_name + '.com'
332         self._name = metadata_name = plural + '.' + group
333
334         self._template = {
335             'metadata': {
336                 'name': metadata_name
337             },
338             'spec': {
339                 'group': group,
340                 'version': version,
341                 'scope': scope,
342                 'names': {'plural': plural,
343                           'singular': singular,
344                           'kind': kind}
345             }
346         }
347
348     def create(self):
349         k8s_utils.create_custom_resource_definition(self._template)
350
351     def delete(self):
352         k8s_utils.delete_custom_resource_definition(self._name)
353
354
355 class NetworkObject(object):
356
357     MANDATORY_PARAMETERS = {'plugin', 'args'}
358     KIND = 'Network'
359
360     def __init__(self, name, **kwargs):
361         if not self.MANDATORY_PARAMETERS.issubset(kwargs):
362             missing_parameters = ', '.join(
363                 str(param) for param in
364                 (self.MANDATORY_PARAMETERS - set(kwargs)))
365             raise exceptions.KubernetesNetworkObjectDefinitionError(
366                 missing_parameters=missing_parameters)
367
368         self._name = name
369         self._plugin = kwargs['plugin']
370         self._args = kwargs['args']
371         self._crd = None
372         self._template = None
373         self._group = None
374         self._version = None
375         self._plural = None
376         self._scope = None
377
378     @property
379     def crd(self):
380         if self._crd:
381             return self._crd
382         crd = k8s_utils.get_custom_resource_definition(self.KIND)
383         if not crd:
384             raise exceptions.KubernetesNetworkObjectKindMissing()
385         self._crd = crd
386         return self._crd
387
388     @property
389     def group(self):
390         if self._group:
391             return self._group
392         self._group = self.crd.spec.group
393         return self._group
394
395     @property
396     def version(self):
397         if self._version:
398             return self._version
399         self._version = self.crd.spec.version
400         return self._version
401
402     @property
403     def plural(self):
404         if self._plural:
405             return self._plural
406         self._plural = self.crd.spec.names.plural
407         return self._plural
408
409     @property
410     def scope(self):
411         if self._scope:
412             return self._scope
413         self._scope = self.crd.spec.scope
414         return self._scope
415
416     @property
417     def template(self):
418         """"Network" object template
419
420         This template can be rendered only once the CRD "Network" is created in
421         Kubernetes. This function call must be delayed until the creation of
422         the CRD "Network".
423         """
424         if self._template:
425             return self._template
426
427         self._template = {
428             'apiVersion': '{}/{}'.format(self.group, self.version),
429             'kind': self.KIND,
430             'metadata': {
431                 'name': self._name
432             },
433             'plugin': self._plugin,
434             'args': self._args
435         }
436         return self._template
437
438     def create(self):
439         k8s_utils.create_network(self.scope, self.group, self.version,
440                                  self.plural, self.template)
441
442     def delete(self):
443         k8s_utils.delete_network(self.scope, self.group, self.version,
444                                  self.plural, self._name)
445
446
447 class KubernetesTemplate(object):
448
449     def __init__(self, name, context_cfg):
450         """KubernetesTemplate object initialization
451
452         :param name: (str) name of the Kubernetes context
453         :param context_cfg: (dict) context definition
454         """
455         context_cfg = copy.deepcopy(context_cfg)
456         servers_cfg = context_cfg.pop('servers', {})
457         crd_cfg = context_cfg.pop('custom_resources', [])
458         networks_cfg = context_cfg.pop('networks', {})
459         self.name = name
460         self.ssh_key = '{}-key'.format(name)
461
462         self.rcs = {self._get_rc_name(rc): cfg
463                     for rc, cfg in servers_cfg.items()}
464         self.rc_objs = [ReplicationControllerObject(
465             rc, ssh_key=self.ssh_key, **cfg) for rc, cfg in self.rcs.items()]
466         self.service_objs = [ServiceNodePortObject(rc, **cfg)
467                              for rc, cfg in self.rcs.items()]
468         self.crd = [CustomResourceDefinitionObject(self.name, **crd)
469                     for crd in crd_cfg]
470         self.network_objs = [NetworkObject(net_name, **net_data)
471                              for net_name, net_data in networks_cfg.items()]
472         self.pods = []
473
474     def _get_rc_name(self, rc_name):
475         return '{}-{}'.format(rc_name, self.name)
476
477     def get_rc_pods(self):
478         resp = k8s_utils.get_pod_list()
479         self.pods = [p.metadata.name for p in resp.items for s in self.rcs
480                      if p.metadata.name.startswith(s)]
481
482         return self.pods
483
484     def get_rc_by_name(self, rc_name):
485         """Returns a ``ReplicationControllerObject``, searching by name"""
486         for rc in (rc for rc in self.rc_objs if rc.name == rc_name):
487             return rc