X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Forchestrator%2Fkubernetes.py;h=b0b93a3c2af3c42b9b8e8cac3faa7616d259fe73;hb=06d2c38dfd22956a7f60535eb5b59d6fb94098ff;hp=ac3a09ed1d49cc0643a60cafef2ec2e1de4dfe04;hpb=b4bba7a8d9d1e3379209d13a3272412e55f1dc74;p=yardstick.git diff --git a/yardstick/orchestrator/kubernetes.py b/yardstick/orchestrator/kubernetes.py index ac3a09ed1..b0b93a3c2 100644 --- a/yardstick/orchestrator/kubernetes.py +++ b/yardstick/orchestrator/kubernetes.py @@ -7,25 +7,136 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from __future__ import absolute_import -from __future__ import print_function +import copy +import re -from yardstick.common import utils +from oslo_serialization import jsonutils +import six + +from yardstick.common import constants +from yardstick.common import exceptions from yardstick.common import kubernetes_utils as k8s_utils +from yardstick.common import utils + + +class ContainerObject(object): + + SSH_MOUNT_PATH = '/tmp/.ssh/' + IMAGE_DEFAULT = 'openretriever/yardstick' + COMMAND_DEFAULT = ['/bin/bash', '-c'] + RESOURCES = ('requests', 'limits') + PORT_OPTIONS = ('containerPort', 'hostIP', 'hostPort', 'name', 'protocol') + IMAGE_PULL_POLICY = ('Always', 'IfNotPresent', 'Never') + + def __init__(self, name, ssh_key, **kwargs): + self._name = name + self._ssh_key = ssh_key + self._image = kwargs.get('image', self.IMAGE_DEFAULT) + self._command = self._parse_commands( + kwargs.get('command', self.COMMAND_DEFAULT)) + self._args = self._parse_commands(kwargs.get('args', [])) + self._volume_mounts = kwargs.get('volumeMounts', []) + self._security_context = kwargs.get('securityContext') + self._env = kwargs.get('env', []) + self._resources = kwargs.get('resources', {}) + self._ports = kwargs.get('ports', []) + self._image_pull_policy = kwargs.get('imagePullPolicy') + self._tty = kwargs.get('tty') + self._stdin = kwargs.get('stdin') + + @staticmethod + def _parse_commands(command): + if isinstance(command, six.string_types): + return [command] + elif isinstance(command, list): + return command + raise exceptions.KubernetesContainerCommandType() + + def _create_volume_mounts(self): + """Return all "volumeMounts" items per container""" + volume_mounts_items = [self._create_volume_mounts_item(vol) + for vol in self._volume_mounts] + ssh_vol = {'name': self._ssh_key, + 'mountPath': self.SSH_MOUNT_PATH} + volume_mounts_items.append(self._create_volume_mounts_item(ssh_vol)) + return volume_mounts_items + + @staticmethod + def _create_volume_mounts_item(volume_mount): + """Create a "volumeMounts" item""" + return {'name': volume_mount['name'], + 'mountPath': volume_mount['mountPath'], + 'readOnly': volume_mount.get('readOnly', False)} + + def get_container_item(self): + """Create a "container" item""" + container_name = '{}-container'.format(self._name) + container = {'args': self._args, + 'command': self._command, + 'image': self._image, + 'name': container_name, + 'volumeMounts': self._create_volume_mounts()} + if self._security_context: + container['securityContext'] = self._security_context + if self._env: + container['env'] = [] + for env in self._env: + container['env'].append({'name': env['name'], + 'value': env['value']}) + if self._ports: + container['ports'] = [] + for port in self._ports: + if 'containerPort' not in port.keys(): + raise exceptions.KubernetesContainerPortNotDefined( + port=port) + _port = {port_option: value for port_option, value + in port.items() if port_option in self.PORT_OPTIONS} + container['ports'].append(_port) + if self._resources: + container['resources'] = {} + for res in (res for res in self._resources if + res in self.RESOURCES): + container['resources'][res] = self._resources[res] + if self._image_pull_policy: + if self._image_pull_policy not in self.IMAGE_PULL_POLICY: + raise exceptions.KubernetesContainerWrongImagePullPolicy() + container['imagePullPolicy'] = self._image_pull_policy + if self._stdin is not None: + container['stdin'] = self._stdin + if self._tty is not None: + container['tty'] = self._tty + return container + +class ReplicationControllerObject(object): -class KubernetesObject(object): + SSHKEY_DEFAULT = 'yardstick_key' + RESTART_POLICY = ('Always', 'OnFailure', 'Never') + TOLERATIONS_KEYS = ('key', 'value', 'effect', 'operator') def __init__(self, name, **kwargs): - super(KubernetesObject, self).__init__() + super(ReplicationControllerObject, self).__init__() + parameters = copy.deepcopy(kwargs) self.name = name - self.image = kwargs.get('image', 'openretriever/yardstick') - self.command = [kwargs.get('command', '/bin/bash')] - self.args = kwargs.get('args', []) - self.ssh_key = kwargs.get('ssh_key', 'yardstick_key') - self.node_selector = kwargs.get('nodeSelector', {}) - - self.volumes = [] + self.node_selector = parameters.pop('nodeSelector', {}) + self.ssh_key = parameters.pop('ssh_key', self.SSHKEY_DEFAULT) + self._volumes = parameters.pop('volumes', []) + self._security_context = parameters.pop('securityContext', None) + self._networks = parameters.pop('networks', []) + self._tolerations = parameters.pop('tolerations', []) + self._restart_policy = parameters.pop('restartPolicy', 'Always') + if self._restart_policy not in self.RESTART_POLICY: + raise exceptions.KubernetesWrongRestartPolicy( + rpolicy=self._restart_policy) + + containers = parameters.pop('containers', None) + if containers: + self._containers = [ + ContainerObject(self.name, self.ssh_key, **container) + for container in containers] + else: + self._containers = [ + ContainerObject(self.name, self.ssh_key, **parameters)] self.template = { "apiVersion": "v1", @@ -37,14 +148,14 @@ class KubernetesObject(object): "replicas": 1, "template": { "metadata": { - "labels": { - "app": name - } + "labels": {"app": name} }, "spec": { "containers": [], "volumes": [], - "nodeSelector": {} + "nodeSelector": {}, + "restartPolicy": self._restart_policy, + "tolerations": [] } } } @@ -53,8 +164,14 @@ class KubernetesObject(object): self._change_value_according_name(name) self._add_containers() self._add_node_selector() - self._add_ssh_key_volume() self._add_volumes() + self._add_security_context() + self._add_networks() + self._add_tolerations() + + @property + def networks(self): + return self._networks def get_template(self): return self.template @@ -67,95 +184,291 @@ class KubernetesObject(object): name) def _add_containers(self): - containers = [self._add_container()] + containers = [container.get_container_item() + for container in self._containers] utils.set_dict_value(self.template, 'spec.template.spec.containers', containers) - def _add_container(self): - container_name = '{}-container'.format(self.name) - ssh_key_mount_path = '/tmp/.ssh/' - - container = { - "args": self.args, - "command": self.command, - "image": self.image, - "name": container_name, - "volumeMounts": [ - { - "mountPath": ssh_key_mount_path, - "name": self.ssh_key - } - ] - } - - return container - def _add_node_selector(self): utils.set_dict_value(self.template, 'spec.template.spec.nodeSelector', self.node_selector) def _add_volumes(self): + """Add "volume" items to container specs, including the SSH one""" + volume_items = [self._create_volume_item(vol) for vol in self._volumes] + volume_items.append(self._create_ssh_key_volume()) utils.set_dict_value(self.template, 'spec.template.spec.volumes', - self.volumes) + volume_items) + + def _create_ssh_key_volume(self): + """Create a "volume" item of type "configMap" for the SSH key""" + return {'name': self.ssh_key, + 'configMap': {'name': self.ssh_key}} + + @staticmethod + def _create_volume_item(volume): + """Create a "volume" item""" + volume = copy.deepcopy(volume) + name = volume.pop('name') + for key in (k for k in volume if k in k8s_utils.get_volume_types()): + type_name = key + type_data = volume[key] + break + else: + raise exceptions.KubernetesTemplateInvalidVolumeType(volume=volume) + + return {'name': name, + type_name: type_data} + + def _add_security_context(self): + if self._security_context: + utils.set_dict_value(self.template, + 'spec.template.spec.securityContext', + self._security_context) + + def _add_networks(self): + networks = [] + for net in self._networks: + networks.append({'name': net}) + + if not networks: + return + + annotations = {'networks': jsonutils.dumps(networks)} + utils.set_dict_value(self.template, + 'spec.template.metadata.annotations', + annotations) - def _add_volume(self, volume): - self.volumes.append(volume) + def _add_tolerations(self): + tolerations = [] + for tol in self._tolerations: + tolerations.append({k: tol[k] for k in tol + if k in self.TOLERATIONS_KEYS}) - def _add_ssh_key_volume(self): - key_volume = { - "configMap": { - "name": self.ssh_key - }, - "name": self.ssh_key - } - self._add_volume(key_volume) + tolerations = ([{'operator': 'Exists'}] if not tolerations + else tolerations) + utils.set_dict_value(self.template, + 'spec.template.spec.tolerations', + tolerations) -class ServiceObject(object): +class ServiceNodePortObject(object): + + MANDATORY_PARAMETERS = {'port', 'name'} + NAME_REGEX = re.compile(r'^[a-z0-9]([-a-z0-9]*[a-z0-9])?$') + + def __init__(self, name, **kwargs): + """Service kind "NodePort" object - def __init__(self, name): - self.name = '{}-service'.format(name) + :param name: (string) name of the Service + :param kwargs: (dict) node_ports -> (list) port, name, targetPort, + nodePort + """ + self._name = '{}-service'.format(name) self.template = { - 'metadata': { - 'name': '{}-service'.format(name) - }, + 'metadata': {'name': '{}-service'.format(name)}, 'spec': { 'type': 'NodePort', - 'ports': [ - { - 'port': 22, - 'protocol': 'TCP' - } - ], - 'selector': { - 'app': name - } + 'ports': [], + 'selector': {'app': name} } } + self._add_port(22, 'ssh', protocol='TCP') + node_ports = copy.deepcopy(kwargs.get('node_ports', [])) + for port in node_ports: + if not self.MANDATORY_PARAMETERS.issubset(port.keys()): + missing_parameters = ', '.join( + str(param) for param in + (self.MANDATORY_PARAMETERS - set(port.keys()))) + raise exceptions.KubernetesServiceObjectDefinitionError( + missing_parameters=missing_parameters) + port_number = port.pop('port') + name = port.pop('name') + if not self.NAME_REGEX.match(name): + raise exceptions.KubernetesServiceObjectNameError(name=name) + self._add_port(port_number, name, **port) + + def _add_port(self, port, name, protocol=None, targetPort=None, + nodePort=None): + _port = {'port': port, + 'name': name} + if protocol: + _port['protocol'] = protocol + if targetPort: + _port['targetPort'] = targetPort + if nodePort: + _port['nodePort'] = nodePort + self.template['spec']['ports'].append(_port) + def create(self): k8s_utils.create_service(self.template) def delete(self): - k8s_utils.delete_service(self.name) + k8s_utils.delete_service(self._name, skip_codes=[404]) + + +class CustomResourceDefinitionObject(object): + + MANDATORY_PARAMETERS = {'name'} + + def __init__(self, ctx_name, **kwargs): + if not self.MANDATORY_PARAMETERS.issubset(kwargs): + missing_parameters = ', '.join( + str(param) for param in + (self.MANDATORY_PARAMETERS - set(kwargs))) + raise exceptions.KubernetesCRDObjectDefinitionError( + missing_parameters=missing_parameters) + + singular = kwargs['name'] + plural = singular + 's' + kind = singular.title() + version = kwargs.get('version', 'v1') + scope = kwargs.get('scope', constants.SCOPE_NAMESPACED) + group = ctx_name + '.com' + self._name = metadata_name = plural + '.' + group + + self._template = { + 'metadata': { + 'name': metadata_name + }, + 'spec': { + 'group': group, + 'version': version, + 'scope': scope, + 'names': {'plural': plural, + 'singular': singular, + 'kind': kind} + } + } + + def create(self): + k8s_utils.create_custom_resource_definition(self._template) + + def delete(self): + k8s_utils.delete_custom_resource_definition(self._name, skip_codes=[404]) + + +class NetworkObject(object): + + MANDATORY_PARAMETERS = {'plugin', 'args'} + KIND = 'Network' + + def __init__(self, name, **kwargs): + if not self.MANDATORY_PARAMETERS.issubset(kwargs): + missing_parameters = ', '.join( + str(param) for param in + (self.MANDATORY_PARAMETERS - set(kwargs))) + raise exceptions.KubernetesNetworkObjectDefinitionError( + missing_parameters=missing_parameters) + + self._name = name + self._plugin = kwargs['plugin'] + self._args = kwargs['args'] + self._crd = None + self._template = None + self._group = None + self._version = None + self._plural = None + self._scope = None + + @property + def crd(self): + if self._crd: + return self._crd + crd = k8s_utils.get_custom_resource_definition(self.KIND) + if not crd: + raise exceptions.KubernetesNetworkObjectKindMissing() + self._crd = crd + return self._crd + + @property + def group(self): + if self._group: + return self._group + self._group = self.crd.spec.group + return self._group + + @property + def version(self): + if self._version: + return self._version + self._version = self.crd.spec.version + return self._version + + @property + def plural(self): + if self._plural: + return self._plural + self._plural = self.crd.spec.names.plural + return self._plural + + @property + def scope(self): + if self._scope: + return self._scope + self._scope = self.crd.spec.scope + return self._scope + + @property + def template(self): + """"Network" object template + + This template can be rendered only once the CRD "Network" is created in + Kubernetes. This function call must be delayed until the creation of + the CRD "Network". + """ + if self._template: + return self._template + + self._template = { + 'apiVersion': '{}/{}'.format(self.group, self.version), + 'kind': self.KIND, + 'metadata': { + 'name': self._name + }, + 'plugin': self._plugin, + 'args': self._args + } + return self._template + + def create(self): + k8s_utils.create_network(self.scope, self.group, self.version, + self.plural, self.template, self._name) + + def delete(self): + k8s_utils.delete_network(self.scope, self.group, self.version, + self.plural, self._name, skip_codes=[404]) class KubernetesTemplate(object): - def __init__(self, name, template_cfg): + def __init__(self, name, context_cfg): + """KubernetesTemplate object initialization + + :param name: (str) name of the Kubernetes context + :param context_cfg: (dict) context definition + """ + context_cfg = copy.deepcopy(context_cfg) + servers_cfg = context_cfg.pop('servers', {}) + crd_cfg = context_cfg.pop('custom_resources', []) + networks_cfg = context_cfg.pop('networks', {}) self.name = name self.ssh_key = '{}-key'.format(name) - self.rcs = [self._get_rc_name(rc) for rc in template_cfg] - self.k8s_objs = [KubernetesObject(self._get_rc_name(rc), - ssh_key=self.ssh_key, - **cfg) - for rc, cfg in template_cfg.items()] - self.service_objs = [ServiceObject(s) for s in self.rcs] - + self.rcs = {self._get_rc_name(rc): cfg + for rc, cfg in servers_cfg.items()} + self.rc_objs = [ReplicationControllerObject( + rc, ssh_key=self.ssh_key, **cfg) for rc, cfg in self.rcs.items()] + self.service_objs = [ServiceNodePortObject(rc, **cfg) + for rc, cfg in self.rcs.items()] + self.crd = [CustomResourceDefinitionObject(self.name, **crd) + for crd in crd_cfg] + self.network_objs = [NetworkObject(net_name, **net_data) + for net_name, net_data in networks_cfg.items()] self.pods = [] def _get_rc_name(self, rc_name): @@ -167,3 +480,8 @@ class KubernetesTemplate(object): if p.metadata.name.startswith(s)] return self.pods + + def get_rc_by_name(self, rc_name): + """Returns a ``ReplicationControllerObject``, searching by name""" + for rc in (rc for rc in self.rc_objs if rc.name == rc_name): + return rc