3e2572fcc7ee3160ece7a91f4fcb0c8eed107355
[yardstick.git] / yardstick / orchestrator / kubernetes.py
1 ##############################################################################
2 # Copyright (c) 2017 Huawei Technologies Co.,Ltd.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import copy
11 import re
12
13 from oslo_serialization import jsonutils
14 import six
15
16 from yardstick.common import constants
17 from yardstick.common import exceptions
18 from yardstick.common import kubernetes_utils as k8s_utils
19 from yardstick.common import utils
20
21
22 class ContainerObject(object):
23
24     SSH_MOUNT_PATH = '/tmp/.ssh/'
25     IMAGE_DEFAULT = 'openretriever/yardstick'
26     COMMAND_DEFAULT = ['/bin/bash', '-c']
27     RESOURCES = ('requests', 'limits')
28     PORT_OPTIONS = ('containerPort', 'hostIP', 'hostPort', 'name', 'protocol')
29     IMAGE_PULL_POLICY = ('Always', 'IfNotPresent', 'Never')
30
31     def __init__(self, name, ssh_key, **kwargs):
32         self._name = name
33         self._ssh_key = ssh_key
34         self._image = kwargs.get('image', self.IMAGE_DEFAULT)
35         self._command = self._parse_commands(
36             kwargs.get('command', self.COMMAND_DEFAULT))
37         self._args = self._parse_commands(kwargs.get('args', []))
38         self._volume_mounts = kwargs.get('volumeMounts', [])
39         self._security_context = kwargs.get('securityContext')
40         self._env = kwargs.get('env', [])
41         self._resources = kwargs.get('resources', {})
42         self._ports = kwargs.get('ports', [])
43         self._image_pull_policy = kwargs.get('imagePullPolicy')
44
45     @staticmethod
46     def _parse_commands(command):
47         if isinstance(command, six.string_types):
48             return [command]
49         elif isinstance(command, list):
50             return command
51         raise exceptions.KubernetesContainerCommandType()
52
53     def _create_volume_mounts(self):
54         """Return all "volumeMounts" items per container"""
55         volume_mounts_items = [self._create_volume_mounts_item(vol)
56                                for vol in self._volume_mounts]
57         ssh_vol = {'name': self._ssh_key,
58                    'mountPath': self.SSH_MOUNT_PATH}
59         volume_mounts_items.append(self._create_volume_mounts_item(ssh_vol))
60         return volume_mounts_items
61
62     @staticmethod
63     def _create_volume_mounts_item(volume_mount):
64         """Create a "volumeMounts" item"""
65         return {'name': volume_mount['name'],
66                 'mountPath': volume_mount['mountPath'],
67                 'readOnly': volume_mount.get('readOnly', False)}
68
69     def get_container_item(self):
70         """Create a "container" item"""
71         container_name = '{}-container'.format(self._name)
72         container = {'args': self._args,
73                      'command': self._command,
74                      'image': self._image,
75                      'name': container_name,
76                      'volumeMounts': self._create_volume_mounts()}
77         if self._security_context:
78             container['securityContext'] = self._security_context
79         if self._env:
80             container['env'] = []
81             for env in self._env:
82                 container['env'].append({'name': env['name'],
83                                          'value': env['value']})
84         if self._ports:
85             container['ports'] = []
86             for port in self._ports:
87                 if 'containerPort' not in port.keys():
88                     raise exceptions.KubernetesContainerPortNotDefined(
89                         port=port)
90                 _port = {port_option: value for port_option, value
91                          in port.items() if port_option in self.PORT_OPTIONS}
92                 container['ports'].append(_port)
93         if self._resources:
94             container['resources'] = {}
95             for res in (res for res in self._resources if
96                         res in self.RESOURCES):
97                 container['resources'][res] = self._resources[res]
98         if self._image_pull_policy:
99             if self._image_pull_policy not in self.IMAGE_PULL_POLICY:
100                 raise exceptions.KubernetesContainerWrongImagePullPolicy()
101             container['imagePullPolicy'] = self._image_pull_policy
102         return container
103
104
105 class ReplicationControllerObject(object):
106
107     SSHKEY_DEFAULT = 'yardstick_key'
108     RESTART_POLICY = ('Always', 'OnFailure', 'Never')
109     TOLERATIONS_KEYS = ('key', 'value', 'effect', 'operator')
110
111     def __init__(self, name, **kwargs):
112         super(ReplicationControllerObject, self).__init__()
113         parameters = copy.deepcopy(kwargs)
114         self.name = name
115         self.node_selector = parameters.pop('nodeSelector', {})
116         self.ssh_key = parameters.pop('ssh_key', self.SSHKEY_DEFAULT)
117         self._volumes = parameters.pop('volumes', [])
118         self._security_context = parameters.pop('securityContext', None)
119         self._networks = parameters.pop('networks', [])
120         self._tolerations = parameters.pop('tolerations', [])
121         self._restart_policy = parameters.pop('restartPolicy', 'Always')
122         if self._restart_policy not in self.RESTART_POLICY:
123             raise exceptions.KubernetesWrongRestartPolicy(
124                 rpolicy=self._restart_policy)
125
126         containers = parameters.pop('containers', None)
127         if containers:
128             self._containers = [
129                 ContainerObject(self.name, self.ssh_key, **container)
130                 for container in containers]
131         else:
132             self._containers = [
133                 ContainerObject(self.name, self.ssh_key, **parameters)]
134
135         self.template = {
136             "apiVersion": "v1",
137             "kind": "ReplicationController",
138             "metadata": {
139                 "name": ""
140             },
141             "spec": {
142                 "replicas": 1,
143                 "template": {
144                     "metadata": {
145                         "labels": {"app": name}
146                     },
147                     "spec": {
148                         "containers": [],
149                         "volumes": [],
150                         "nodeSelector": {},
151                         "restartPolicy": self._restart_policy,
152                         "tolerations": []
153                     }
154                 }
155             }
156         }
157
158         self._change_value_according_name(name)
159         self._add_containers()
160         self._add_node_selector()
161         self._add_volumes()
162         self._add_security_context()
163         self._add_networks()
164         self._add_tolerations()
165
166     @property
167     def networks(self):
168         return self._networks
169
170     def get_template(self):
171         return self.template
172
173     def _change_value_according_name(self, name):
174         utils.set_dict_value(self.template, 'metadata.name', name)
175
176         utils.set_dict_value(self.template,
177                              'spec.template.metadata.labels.app',
178                              name)
179
180     def _add_containers(self):
181         containers = [container.get_container_item()
182                       for container in self._containers]
183         utils.set_dict_value(self.template,
184                              'spec.template.spec.containers',
185                              containers)
186
187     def _add_node_selector(self):
188         utils.set_dict_value(self.template,
189                              'spec.template.spec.nodeSelector',
190                              self.node_selector)
191
192     def _add_volumes(self):
193         """Add "volume" items to container specs, including the SSH one"""
194         volume_items = [self._create_volume_item(vol) for vol in self._volumes]
195         volume_items.append(self._create_ssh_key_volume())
196         utils.set_dict_value(self.template,
197                              'spec.template.spec.volumes',
198                              volume_items)
199
200     def _create_ssh_key_volume(self):
201         """Create a "volume" item of type "configMap" for the SSH key"""
202         return {'name': self.ssh_key,
203                 'configMap': {'name': self.ssh_key}}
204
205     @staticmethod
206     def _create_volume_item(volume):
207         """Create a "volume" item"""
208         volume = copy.deepcopy(volume)
209         name = volume.pop('name')
210         for key in (k for k in volume if k in k8s_utils.get_volume_types()):
211             type_name = key
212             type_data = volume[key]
213             break
214         else:
215             raise exceptions.KubernetesTemplateInvalidVolumeType(volume=volume)
216
217         return {'name': name,
218                 type_name: type_data}
219
220     def _add_security_context(self):
221         if self._security_context:
222             utils.set_dict_value(self.template,
223                                  'spec.template.spec.securityContext',
224                                  self._security_context)
225
226     def _add_networks(self):
227         networks = []
228         for net in self._networks:
229             networks.append({'name': net})
230
231         if not networks:
232             return
233
234         annotations = {'networks': jsonutils.dumps(networks)}
235         utils.set_dict_value(self.template,
236                              'spec.template.metadata.annotations',
237                              annotations)
238
239     def _add_tolerations(self):
240         tolerations = []
241         for tol in self._tolerations:
242             tolerations.append({k: tol[k] for k in tol
243                                 if k in self.TOLERATIONS_KEYS})
244
245         tolerations = ([{'operator': 'Exists'}] if not tolerations
246                        else tolerations)
247         utils.set_dict_value(self.template,
248                              'spec.template.spec.tolerations',
249                              tolerations)
250
251
252 class ServiceNodePortObject(object):
253
254     MANDATORY_PARAMETERS = {'port', 'name'}
255     NAME_REGEX = re.compile(r'^[a-z0-9]([-a-z0-9]*[a-z0-9])?$')
256
257     def __init__(self, name, **kwargs):
258         """Service kind "NodePort" object
259
260         :param name: (string) name of the Service
261         :param kwargs: (dict) node_ports -> (list) port, name, targetPort,
262                                                    nodePort
263         """
264         self._name = '{}-service'.format(name)
265         self.template = {
266             'metadata': {'name': '{}-service'.format(name)},
267             'spec': {
268                 'type': 'NodePort',
269                 'ports': [],
270                 'selector': {'app': name}
271             }
272         }
273
274         self._add_port(22, 'ssh', protocol='TCP')
275         node_ports = copy.deepcopy(kwargs.get('node_ports', []))
276         for port in node_ports:
277             if not self.MANDATORY_PARAMETERS.issubset(port.keys()):
278                 missing_parameters = ', '.join(
279                     str(param) for param in
280                     (self.MANDATORY_PARAMETERS - set(port.keys())))
281                 raise exceptions.KubernetesServiceObjectDefinitionError(
282                     missing_parameters=missing_parameters)
283             port_number = port.pop('port')
284             name = port.pop('name')
285             if not self.NAME_REGEX.match(name):
286                 raise exceptions.KubernetesServiceObjectNameError(name=name)
287             self._add_port(port_number, name, **port)
288
289     def _add_port(self, port, name, protocol=None, targetPort=None,
290                   nodePort=None):
291         _port = {'port': port,
292                  'name': name}
293         if protocol:
294             _port['protocol'] = protocol
295         if targetPort:
296             _port['targetPort'] = targetPort
297         if nodePort:
298             _port['nodePort'] = nodePort
299         self.template['spec']['ports'].append(_port)
300
301     def create(self):
302         k8s_utils.create_service(self.template)
303
304     def delete(self):
305         k8s_utils.delete_service(self._name)
306
307
308 class CustomResourceDefinitionObject(object):
309
310     MANDATORY_PARAMETERS = {'name'}
311
312     def __init__(self, ctx_name, **kwargs):
313         if not self.MANDATORY_PARAMETERS.issubset(kwargs):
314             missing_parameters = ', '.join(
315                 str(param) for param in
316                 (self.MANDATORY_PARAMETERS - set(kwargs)))
317             raise exceptions.KubernetesCRDObjectDefinitionError(
318                 missing_parameters=missing_parameters)
319
320         singular = kwargs['name']
321         plural = singular + 's'
322         kind = singular.title()
323         version = kwargs.get('version', 'v1')
324         scope = kwargs.get('scope', constants.SCOPE_NAMESPACED)
325         group = ctx_name + '.com'
326         self._name = metadata_name = plural + '.' + group
327
328         self._template = {
329             'metadata': {
330                 'name': metadata_name
331             },
332             'spec': {
333                 'group': group,
334                 'version': version,
335                 'scope': scope,
336                 'names': {'plural': plural,
337                           'singular': singular,
338                           'kind': kind}
339             }
340         }
341
342     def create(self):
343         k8s_utils.create_custom_resource_definition(self._template)
344
345     def delete(self):
346         k8s_utils.delete_custom_resource_definition(self._name)
347
348
349 class NetworkObject(object):
350
351     MANDATORY_PARAMETERS = {'plugin', 'args'}
352     KIND = 'Network'
353
354     def __init__(self, name, **kwargs):
355         if not self.MANDATORY_PARAMETERS.issubset(kwargs):
356             missing_parameters = ', '.join(
357                 str(param) for param in
358                 (self.MANDATORY_PARAMETERS - set(kwargs)))
359             raise exceptions.KubernetesNetworkObjectDefinitionError(
360                 missing_parameters=missing_parameters)
361
362         self._name = name
363         self._plugin = kwargs['plugin']
364         self._args = kwargs['args']
365         self._crd = None
366         self._template = None
367         self._group = None
368         self._version = None
369         self._plural = None
370         self._scope = None
371
372     @property
373     def crd(self):
374         if self._crd:
375             return self._crd
376         crd = k8s_utils.get_custom_resource_definition(self.KIND)
377         if not crd:
378             raise exceptions.KubernetesNetworkObjectKindMissing()
379         self._crd = crd
380         return self._crd
381
382     @property
383     def group(self):
384         if self._group:
385             return self._group
386         self._group = self.crd.spec.group
387         return self._group
388
389     @property
390     def version(self):
391         if self._version:
392             return self._version
393         self._version = self.crd.spec.version
394         return self._version
395
396     @property
397     def plural(self):
398         if self._plural:
399             return self._plural
400         self._plural = self.crd.spec.names.plural
401         return self._plural
402
403     @property
404     def scope(self):
405         if self._scope:
406             return self._scope
407         self._scope = self.crd.spec.scope
408         return self._scope
409
410     @property
411     def template(self):
412         """"Network" object template
413
414         This template can be rendered only once the CRD "Network" is created in
415         Kubernetes. This function call must be delayed until the creation of
416         the CRD "Network".
417         """
418         if self._template:
419             return self._template
420
421         self._template = {
422             'apiVersion': '{}/{}'.format(self.group, self.version),
423             'kind': self.KIND,
424             'metadata': {
425                 'name': self._name
426             },
427             'plugin': self._plugin,
428             'args': self._args
429         }
430         return self._template
431
432     def create(self):
433         k8s_utils.create_network(self.scope, self.group, self.version,
434                                  self.plural, self.template)
435
436     def delete(self):
437         k8s_utils.delete_network(self.scope, self.group, self.version,
438                                  self.plural, self._name)
439
440
441 class KubernetesTemplate(object):
442
443     def __init__(self, name, context_cfg):
444         """KubernetesTemplate object initialization
445
446         :param name: (str) name of the Kubernetes context
447         :param context_cfg: (dict) context definition
448         """
449         context_cfg = copy.deepcopy(context_cfg)
450         servers_cfg = context_cfg.pop('servers', {})
451         crd_cfg = context_cfg.pop('custom_resources', [])
452         networks_cfg = context_cfg.pop('networks', {})
453         self.name = name
454         self.ssh_key = '{}-key'.format(name)
455
456         self.rcs = {self._get_rc_name(rc): cfg
457                     for rc, cfg in servers_cfg.items()}
458         self.rc_objs = [ReplicationControllerObject(
459             rc, ssh_key=self.ssh_key, **cfg) for rc, cfg in self.rcs.items()]
460         self.service_objs = [ServiceNodePortObject(rc, **cfg)
461                              for rc, cfg in self.rcs.items()]
462         self.crd = [CustomResourceDefinitionObject(self.name, **crd)
463                     for crd in crd_cfg]
464         self.network_objs = [NetworkObject(net_name, **net_data)
465                              for net_name, net_data in networks_cfg.items()]
466         self.pods = []
467
468     def _get_rc_name(self, rc_name):
469         return '{}-{}'.format(rc_name, self.name)
470
471     def get_rc_pods(self):
472         resp = k8s_utils.get_pod_list()
473         self.pods = [p.metadata.name for p in resp.items for s in self.rcs
474                      if p.metadata.name.startswith(s)]
475
476         return self.pods
477
478     def get_rc_by_name(self, rc_name):
479         """Returns a ``ReplicationControllerObject``, searching by name"""
480         for rc in (rc for rc in self.rc_objs if rc.name == rc_name):
481             return rc