Merge "Add "imagePullPolicy" parameter in Kubernetes container"
[yardstick.git] / yardstick / orchestrator / kubernetes.py
1 ##############################################################################
2 # Copyright (c) 2017 Huawei Technologies Co.,Ltd.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import copy
11 import re
12
13 from oslo_serialization import jsonutils
14
15 from yardstick.common import constants
16 from yardstick.common import exceptions
17 from yardstick.common import kubernetes_utils as k8s_utils
18 from yardstick.common import utils
19
20
21 class ContainerObject(object):
22
23     SSH_MOUNT_PATH = '/tmp/.ssh/'
24     IMAGE_DEFAULT = 'openretriever/yardstick'
25     COMMAND_DEFAULT = '/bin/bash'
26     RESOURCES = ('requests', 'limits')
27     PORT_OPTIONS = ('containerPort', 'hostIP', 'hostPort', 'name', 'protocol')
28     IMAGE_PULL_POLICY = ('Always', 'IfNotPresent', 'Never')
29
30     def __init__(self, name, ssh_key, **kwargs):
31         self._name = name
32         self._ssh_key = ssh_key
33         self._image = kwargs.get('image', self.IMAGE_DEFAULT)
34         self._command = [kwargs.get('command', self.COMMAND_DEFAULT)]
35         self._args = kwargs.get('args', [])
36         self._volume_mounts = kwargs.get('volumeMounts', [])
37         self._security_context = kwargs.get('securityContext')
38         self._env = kwargs.get('env', [])
39         self._resources = kwargs.get('resources', {})
40         self._ports = kwargs.get('ports', [])
41         self._image_pull_policy = kwargs.get('imagePullPolicy')
42
43     def _create_volume_mounts(self):
44         """Return all "volumeMounts" items per container"""
45         volume_mounts_items = [self._create_volume_mounts_item(vol)
46                                for vol in self._volume_mounts]
47         ssh_vol = {'name': self._ssh_key,
48                    'mountPath': self.SSH_MOUNT_PATH}
49         volume_mounts_items.append(self._create_volume_mounts_item(ssh_vol))
50         return volume_mounts_items
51
52     @staticmethod
53     def _create_volume_mounts_item(volume_mount):
54         """Create a "volumeMounts" item"""
55         return {'name': volume_mount['name'],
56                 'mountPath': volume_mount['mountPath'],
57                 'readOnly': volume_mount.get('readOnly', False)}
58
59     def get_container_item(self):
60         """Create a "container" item"""
61         container_name = '{}-container'.format(self._name)
62         container = {'args': self._args,
63                      'command': self._command,
64                      'image': self._image,
65                      'name': container_name,
66                      'volumeMounts': self._create_volume_mounts()}
67         if self._security_context:
68             container['securityContext'] = self._security_context
69         if self._env:
70             container['env'] = []
71             for env in self._env:
72                 container['env'].append({'name': env['name'],
73                                          'value': env['value']})
74         if self._ports:
75             container['ports'] = []
76             for port in self._ports:
77                 if 'containerPort' not in port.keys():
78                     raise exceptions.KubernetesContainerPortNotDefined(
79                         port=port)
80                 _port = {port_option: value for port_option, value
81                          in port.items() if port_option in self.PORT_OPTIONS}
82                 container['ports'].append(_port)
83         if self._resources:
84             container['resources'] = {}
85             for res in (res for res in self._resources if
86                         res in self.RESOURCES):
87                 container['resources'][res] = self._resources[res]
88         if self._image_pull_policy:
89             if self._image_pull_policy not in self.IMAGE_PULL_POLICY:
90                 raise exceptions.KubernetesContainerWrongImagePullPolicy()
91             container['imagePullPolicy'] = self._image_pull_policy
92         return container
93
94
95 class ReplicationControllerObject(object):
96
97     SSHKEY_DEFAULT = 'yardstick_key'
98     RESTART_POLICY = ('Always', 'OnFailure', 'Never')
99     TOLERATIONS_KEYS = ('key', 'value', 'effect', 'operator')
100
101     def __init__(self, name, **kwargs):
102         super(ReplicationControllerObject, self).__init__()
103         parameters = copy.deepcopy(kwargs)
104         self.name = name
105         self.node_selector = parameters.pop('nodeSelector', {})
106         self.ssh_key = parameters.pop('ssh_key', self.SSHKEY_DEFAULT)
107         self._volumes = parameters.pop('volumes', [])
108         self._security_context = parameters.pop('securityContext', None)
109         self._networks = parameters.pop('networks', [])
110         self._tolerations = parameters.pop('tolerations', [])
111         self._restart_policy = parameters.pop('restartPolicy', 'Always')
112         if self._restart_policy not in self.RESTART_POLICY:
113             raise exceptions.KubernetesWrongRestartPolicy(
114                 rpolicy=self._restart_policy)
115
116         containers = parameters.pop('containers', None)
117         if containers:
118             self._containers = [
119                 ContainerObject(self.name, self.ssh_key, **container)
120                 for container in containers]
121         else:
122             self._containers = [
123                 ContainerObject(self.name, self.ssh_key, **parameters)]
124
125         self.template = {
126             "apiVersion": "v1",
127             "kind": "ReplicationController",
128             "metadata": {
129                 "name": ""
130             },
131             "spec": {
132                 "replicas": 1,
133                 "template": {
134                     "metadata": {
135                         "labels": {"app": name}
136                     },
137                     "spec": {
138                         "containers": [],
139                         "volumes": [],
140                         "nodeSelector": {},
141                         "restartPolicy": self._restart_policy,
142                         "tolerations": []
143                     }
144                 }
145             }
146         }
147
148         self._change_value_according_name(name)
149         self._add_containers()
150         self._add_node_selector()
151         self._add_volumes()
152         self._add_security_context()
153         self._add_networks()
154         self._add_tolerations()
155
156     @property
157     def networks(self):
158         return self._networks
159
160     def get_template(self):
161         return self.template
162
163     def _change_value_according_name(self, name):
164         utils.set_dict_value(self.template, 'metadata.name', name)
165
166         utils.set_dict_value(self.template,
167                              'spec.template.metadata.labels.app',
168                              name)
169
170     def _add_containers(self):
171         containers = [container.get_container_item()
172                       for container in self._containers]
173         utils.set_dict_value(self.template,
174                              'spec.template.spec.containers',
175                              containers)
176
177     def _add_node_selector(self):
178         utils.set_dict_value(self.template,
179                              'spec.template.spec.nodeSelector',
180                              self.node_selector)
181
182     def _add_volumes(self):
183         """Add "volume" items to container specs, including the SSH one"""
184         volume_items = [self._create_volume_item(vol) for vol in self._volumes]
185         volume_items.append(self._create_ssh_key_volume())
186         utils.set_dict_value(self.template,
187                              'spec.template.spec.volumes',
188                              volume_items)
189
190     def _create_ssh_key_volume(self):
191         """Create a "volume" item of type "configMap" for the SSH key"""
192         return {'name': self.ssh_key,
193                 'configMap': {'name': self.ssh_key}}
194
195     @staticmethod
196     def _create_volume_item(volume):
197         """Create a "volume" item"""
198         volume = copy.deepcopy(volume)
199         name = volume.pop('name')
200         for key in (k for k in volume if k in k8s_utils.get_volume_types()):
201             type_name = key
202             type_data = volume[key]
203             break
204         else:
205             raise exceptions.KubernetesTemplateInvalidVolumeType(volume=volume)
206
207         return {'name': name,
208                 type_name: type_data}
209
210     def _add_security_context(self):
211         if self._security_context:
212             utils.set_dict_value(self.template,
213                                  'spec.template.spec.securityContext',
214                                  self._security_context)
215
216     def _add_networks(self):
217         networks = []
218         for net in self._networks:
219             networks.append({'name': net})
220
221         if not networks:
222             return
223
224         annotations = {'networks': jsonutils.dumps(networks)}
225         utils.set_dict_value(self.template,
226                              'spec.template.metadata.annotations',
227                              annotations)
228
229     def _add_tolerations(self):
230         tolerations = []
231         for tol in self._tolerations:
232             tolerations.append({k: tol[k] for k in tol
233                                 if k in self.TOLERATIONS_KEYS})
234
235         tolerations = ([{'operator': 'Exists'}] if not tolerations
236                        else tolerations)
237         utils.set_dict_value(self.template,
238                              'spec.template.spec.tolerations',
239                              tolerations)
240
241
242 class ServiceNodePortObject(object):
243
244     MANDATORY_PARAMETERS = {'port', 'name'}
245     NAME_REGEX = re.compile(r'^[a-z0-9]([-a-z0-9]*[a-z0-9])?$')
246
247     def __init__(self, name, **kwargs):
248         """Service kind "NodePort" object
249
250         :param name: (string) name of the Service
251         :param kwargs: (dict) node_ports -> (list) port, name, targetPort,
252                                                    nodePort
253         """
254         self._name = '{}-service'.format(name)
255         self.template = {
256             'metadata': {'name': '{}-service'.format(name)},
257             'spec': {
258                 'type': 'NodePort',
259                 'ports': [],
260                 'selector': {'app': name}
261             }
262         }
263
264         self._add_port(22, 'ssh', protocol='TCP')
265         node_ports = copy.deepcopy(kwargs.get('node_ports', []))
266         for port in node_ports:
267             if not self.MANDATORY_PARAMETERS.issubset(port.keys()):
268                 missing_parameters = ', '.join(
269                     str(param) for param in
270                     (self.MANDATORY_PARAMETERS - set(port.keys())))
271                 raise exceptions.KubernetesServiceObjectDefinitionError(
272                     missing_parameters=missing_parameters)
273             port_number = port.pop('port')
274             name = port.pop('name')
275             if not self.NAME_REGEX.match(name):
276                 raise exceptions.KubernetesServiceObjectNameError(name=name)
277             self._add_port(port_number, name, **port)
278
279     def _add_port(self, port, name, protocol=None, targetPort=None,
280                   nodePort=None):
281         _port = {'port': port,
282                  'name': name}
283         if protocol:
284             _port['protocol'] = protocol
285         if targetPort:
286             _port['targetPort'] = targetPort
287         if nodePort:
288             _port['nodePort'] = nodePort
289         self.template['spec']['ports'].append(_port)
290
291     def create(self):
292         k8s_utils.create_service(self.template)
293
294     def delete(self):
295         k8s_utils.delete_service(self._name)
296
297
298 class CustomResourceDefinitionObject(object):
299
300     MANDATORY_PARAMETERS = {'name'}
301
302     def __init__(self, ctx_name, **kwargs):
303         if not self.MANDATORY_PARAMETERS.issubset(kwargs):
304             missing_parameters = ', '.join(
305                 str(param) for param in
306                 (self.MANDATORY_PARAMETERS - set(kwargs)))
307             raise exceptions.KubernetesCRDObjectDefinitionError(
308                 missing_parameters=missing_parameters)
309
310         singular = kwargs['name']
311         plural = singular + 's'
312         kind = singular.title()
313         version = kwargs.get('version', 'v1')
314         scope = kwargs.get('scope', constants.SCOPE_NAMESPACED)
315         group = ctx_name + '.com'
316         self._name = metadata_name = plural + '.' + group
317
318         self._template = {
319             'metadata': {
320                 'name': metadata_name
321             },
322             'spec': {
323                 'group': group,
324                 'version': version,
325                 'scope': scope,
326                 'names': {'plural': plural,
327                           'singular': singular,
328                           'kind': kind}
329             }
330         }
331
332     def create(self):
333         k8s_utils.create_custom_resource_definition(self._template)
334
335     def delete(self):
336         k8s_utils.delete_custom_resource_definition(self._name)
337
338
339 class NetworkObject(object):
340
341     MANDATORY_PARAMETERS = {'plugin', 'args'}
342     KIND = 'Network'
343
344     def __init__(self, name, **kwargs):
345         if not self.MANDATORY_PARAMETERS.issubset(kwargs):
346             missing_parameters = ', '.join(
347                 str(param) for param in
348                 (self.MANDATORY_PARAMETERS - set(kwargs)))
349             raise exceptions.KubernetesNetworkObjectDefinitionError(
350                 missing_parameters=missing_parameters)
351
352         self._name = name
353         self._plugin = kwargs['plugin']
354         self._args = kwargs['args']
355         self._crd = None
356         self._template = None
357         self._group = None
358         self._version = None
359         self._plural = None
360         self._scope = None
361
362     @property
363     def crd(self):
364         if self._crd:
365             return self._crd
366         crd = k8s_utils.get_custom_resource_definition(self.KIND)
367         if not crd:
368             raise exceptions.KubernetesNetworkObjectKindMissing()
369         self._crd = crd
370         return self._crd
371
372     @property
373     def group(self):
374         if self._group:
375             return self._group
376         self._group = self.crd.spec.group
377         return self._group
378
379     @property
380     def version(self):
381         if self._version:
382             return self._version
383         self._version = self.crd.spec.version
384         return self._version
385
386     @property
387     def plural(self):
388         if self._plural:
389             return self._plural
390         self._plural = self.crd.spec.names.plural
391         return self._plural
392
393     @property
394     def scope(self):
395         if self._scope:
396             return self._scope
397         self._scope = self.crd.spec.scope
398         return self._scope
399
400     @property
401     def template(self):
402         """"Network" object template
403
404         This template can be rendered only once the CRD "Network" is created in
405         Kubernetes. This function call must be delayed until the creation of
406         the CRD "Network".
407         """
408         if self._template:
409             return self._template
410
411         self._template = {
412             'apiVersion': '{}/{}'.format(self.group, self.version),
413             'kind': self.KIND,
414             'metadata': {
415                 'name': self._name
416             },
417             'plugin': self._plugin,
418             'args': self._args
419         }
420         return self._template
421
422     def create(self):
423         k8s_utils.create_network(self.scope, self.group, self.version,
424                                  self.plural, self.template)
425
426     def delete(self):
427         k8s_utils.delete_network(self.scope, self.group, self.version,
428                                  self.plural, self._name)
429
430
431 class KubernetesTemplate(object):
432
433     def __init__(self, name, context_cfg):
434         """KubernetesTemplate object initialization
435
436         :param name: (str) name of the Kubernetes context
437         :param context_cfg: (dict) context definition
438         """
439         context_cfg = copy.deepcopy(context_cfg)
440         servers_cfg = context_cfg.pop('servers', {})
441         crd_cfg = context_cfg.pop('custom_resources', [])
442         networks_cfg = context_cfg.pop('networks', {})
443         self.name = name
444         self.ssh_key = '{}-key'.format(name)
445
446         self.rcs = {self._get_rc_name(rc): cfg
447                     for rc, cfg in servers_cfg.items()}
448         self.rc_objs = [ReplicationControllerObject(
449             rc, ssh_key=self.ssh_key, **cfg) for rc, cfg in self.rcs.items()]
450         self.service_objs = [ServiceNodePortObject(rc, **cfg)
451                              for rc, cfg in self.rcs.items()]
452         self.crd = [CustomResourceDefinitionObject(self.name, **crd)
453                     for crd in crd_cfg]
454         self.network_objs = [NetworkObject(net_name, **net_data)
455                              for net_name, net_data in networks_cfg.items()]
456         self.pods = []
457
458     def _get_rc_name(self, rc_name):
459         return '{}-{}'.format(rc_name, self.name)
460
461     def get_rc_pods(self):
462         resp = k8s_utils.get_pod_list()
463         self.pods = [p.metadata.name for p in resp.items for s in self.rcs
464                      if p.metadata.name.startswith(s)]
465
466         return self.pods
467
468     def get_rc_by_name(self, rc_name):
469         """Returns a ``ReplicationControllerObject``, searching by name"""
470         for rc in (rc for rc in self.rc_objs if rc.name == rc_name):
471             return rc