X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Forchestrator%2Fkubernetes.py;h=b0b93a3c2af3c42b9b8e8cac3faa7616d259fe73;hb=06d2c38dfd22956a7f60535eb5b59d6fb94098ff;hp=25adff7d4bdce9580c90f0d01e8eba4c84950983;hpb=1b8b08285fdf58e08ede154bc751cd7f66d76afd;p=yardstick.git diff --git a/yardstick/orchestrator/kubernetes.py b/yardstick/orchestrator/kubernetes.py index 25adff7d4..b0b93a3c2 100644 --- a/yardstick/orchestrator/kubernetes.py +++ b/yardstick/orchestrator/kubernetes.py @@ -8,26 +8,49 @@ ############################################################################## import copy +import re +from oslo_serialization import jsonutils +import six + +from yardstick.common import constants from yardstick.common import exceptions -from yardstick.common import utils from yardstick.common import kubernetes_utils as k8s_utils +from yardstick.common import utils class ContainerObject(object): SSH_MOUNT_PATH = '/tmp/.ssh/' IMAGE_DEFAULT = 'openretriever/yardstick' - COMMAND_DEFAULT = '/bin/bash' + COMMAND_DEFAULT = ['/bin/bash', '-c'] + RESOURCES = ('requests', 'limits') + PORT_OPTIONS = ('containerPort', 'hostIP', 'hostPort', 'name', 'protocol') + IMAGE_PULL_POLICY = ('Always', 'IfNotPresent', 'Never') def __init__(self, name, ssh_key, **kwargs): self._name = name self._ssh_key = ssh_key self._image = kwargs.get('image', self.IMAGE_DEFAULT) - self._command = [kwargs.get('command', self.COMMAND_DEFAULT)] - self._args = kwargs.get('args', []) + self._command = self._parse_commands( + kwargs.get('command', self.COMMAND_DEFAULT)) + self._args = self._parse_commands(kwargs.get('args', [])) self._volume_mounts = kwargs.get('volumeMounts', []) self._security_context = kwargs.get('securityContext') + self._env = kwargs.get('env', []) + self._resources = kwargs.get('resources', {}) + self._ports = kwargs.get('ports', []) + self._image_pull_policy = kwargs.get('imagePullPolicy') + self._tty = kwargs.get('tty') + self._stdin = kwargs.get('stdin') + + @staticmethod + def _parse_commands(command): + if isinstance(command, six.string_types): + return [command] + elif isinstance(command, list): + return command + raise exceptions.KubernetesContainerCommandType() def _create_volume_mounts(self): """Return all "volumeMounts" items per container""" @@ -55,21 +78,56 @@ class ContainerObject(object): 'volumeMounts': self._create_volume_mounts()} if self._security_context: container['securityContext'] = self._security_context + if self._env: + container['env'] = [] + for env in self._env: + container['env'].append({'name': env['name'], + 'value': env['value']}) + if self._ports: + container['ports'] = [] + for port in self._ports: + if 'containerPort' not in port.keys(): + raise exceptions.KubernetesContainerPortNotDefined( + port=port) + _port = {port_option: value for port_option, value + in port.items() if port_option in self.PORT_OPTIONS} + container['ports'].append(_port) + if self._resources: + container['resources'] = {} + for res in (res for res in self._resources if + res in self.RESOURCES): + container['resources'][res] = self._resources[res] + if self._image_pull_policy: + if self._image_pull_policy not in self.IMAGE_PULL_POLICY: + raise exceptions.KubernetesContainerWrongImagePullPolicy() + container['imagePullPolicy'] = self._image_pull_policy + if self._stdin is not None: + container['stdin'] = self._stdin + if self._tty is not None: + container['tty'] = self._tty return container -class KubernetesObject(object): +class ReplicationControllerObject(object): SSHKEY_DEFAULT = 'yardstick_key' + RESTART_POLICY = ('Always', 'OnFailure', 'Never') + TOLERATIONS_KEYS = ('key', 'value', 'effect', 'operator') def __init__(self, name, **kwargs): - super(KubernetesObject, self).__init__() + super(ReplicationControllerObject, self).__init__() parameters = copy.deepcopy(kwargs) self.name = name self.node_selector = parameters.pop('nodeSelector', {}) self.ssh_key = parameters.pop('ssh_key', self.SSHKEY_DEFAULT) self._volumes = parameters.pop('volumes', []) self._security_context = parameters.pop('securityContext', None) + self._networks = parameters.pop('networks', []) + self._tolerations = parameters.pop('tolerations', []) + self._restart_policy = parameters.pop('restartPolicy', 'Always') + if self._restart_policy not in self.RESTART_POLICY: + raise exceptions.KubernetesWrongRestartPolicy( + rpolicy=self._restart_policy) containers = parameters.pop('containers', None) if containers: @@ -90,14 +148,14 @@ class KubernetesObject(object): "replicas": 1, "template": { "metadata": { - "labels": { - "app": name - } + "labels": {"app": name} }, "spec": { "containers": [], "volumes": [], - "nodeSelector": {} + "nodeSelector": {}, + "restartPolicy": self._restart_policy, + "tolerations": [] } } } @@ -108,6 +166,12 @@ class KubernetesObject(object): self._add_node_selector() self._add_volumes() self._add_security_context() + self._add_networks() + self._add_tolerations() + + @property + def networks(self): + return self._networks def get_template(self): return self.template @@ -165,34 +229,219 @@ class KubernetesObject(object): 'spec.template.spec.securityContext', self._security_context) + def _add_networks(self): + networks = [] + for net in self._networks: + networks.append({'name': net}) + + if not networks: + return + + annotations = {'networks': jsonutils.dumps(networks)} + utils.set_dict_value(self.template, + 'spec.template.metadata.annotations', + annotations) + + def _add_tolerations(self): + tolerations = [] + for tol in self._tolerations: + tolerations.append({k: tol[k] for k in tol + if k in self.TOLERATIONS_KEYS}) + + tolerations = ([{'operator': 'Exists'}] if not tolerations + else tolerations) + utils.set_dict_value(self.template, + 'spec.template.spec.tolerations', + tolerations) + + +class ServiceNodePortObject(object): -class ServiceObject(object): + MANDATORY_PARAMETERS = {'port', 'name'} + NAME_REGEX = re.compile(r'^[a-z0-9]([-a-z0-9]*[a-z0-9])?$') - def __init__(self, name): - self.name = '{}-service'.format(name) + def __init__(self, name, **kwargs): + """Service kind "NodePort" object + + :param name: (string) name of the Service + :param kwargs: (dict) node_ports -> (list) port, name, targetPort, + nodePort + """ + self._name = '{}-service'.format(name) self.template = { - 'metadata': { - 'name': '{}-service'.format(name) - }, + 'metadata': {'name': '{}-service'.format(name)}, 'spec': { 'type': 'NodePort', - 'ports': [ - { - 'port': 22, - 'protocol': 'TCP' - } - ], - 'selector': { - 'app': name - } + 'ports': [], + 'selector': {'app': name} } } + self._add_port(22, 'ssh', protocol='TCP') + node_ports = copy.deepcopy(kwargs.get('node_ports', [])) + for port in node_ports: + if not self.MANDATORY_PARAMETERS.issubset(port.keys()): + missing_parameters = ', '.join( + str(param) for param in + (self.MANDATORY_PARAMETERS - set(port.keys()))) + raise exceptions.KubernetesServiceObjectDefinitionError( + missing_parameters=missing_parameters) + port_number = port.pop('port') + name = port.pop('name') + if not self.NAME_REGEX.match(name): + raise exceptions.KubernetesServiceObjectNameError(name=name) + self._add_port(port_number, name, **port) + + def _add_port(self, port, name, protocol=None, targetPort=None, + nodePort=None): + _port = {'port': port, + 'name': name} + if protocol: + _port['protocol'] = protocol + if targetPort: + _port['targetPort'] = targetPort + if nodePort: + _port['nodePort'] = nodePort + self.template['spec']['ports'].append(_port) + def create(self): k8s_utils.create_service(self.template) def delete(self): - k8s_utils.delete_service(self.name) + k8s_utils.delete_service(self._name, skip_codes=[404]) + + +class CustomResourceDefinitionObject(object): + + MANDATORY_PARAMETERS = {'name'} + + def __init__(self, ctx_name, **kwargs): + if not self.MANDATORY_PARAMETERS.issubset(kwargs): + missing_parameters = ', '.join( + str(param) for param in + (self.MANDATORY_PARAMETERS - set(kwargs))) + raise exceptions.KubernetesCRDObjectDefinitionError( + missing_parameters=missing_parameters) + + singular = kwargs['name'] + plural = singular + 's' + kind = singular.title() + version = kwargs.get('version', 'v1') + scope = kwargs.get('scope', constants.SCOPE_NAMESPACED) + group = ctx_name + '.com' + self._name = metadata_name = plural + '.' + group + + self._template = { + 'metadata': { + 'name': metadata_name + }, + 'spec': { + 'group': group, + 'version': version, + 'scope': scope, + 'names': {'plural': plural, + 'singular': singular, + 'kind': kind} + } + } + + def create(self): + k8s_utils.create_custom_resource_definition(self._template) + + def delete(self): + k8s_utils.delete_custom_resource_definition(self._name, skip_codes=[404]) + + +class NetworkObject(object): + + MANDATORY_PARAMETERS = {'plugin', 'args'} + KIND = 'Network' + + def __init__(self, name, **kwargs): + if not self.MANDATORY_PARAMETERS.issubset(kwargs): + missing_parameters = ', '.join( + str(param) for param in + (self.MANDATORY_PARAMETERS - set(kwargs))) + raise exceptions.KubernetesNetworkObjectDefinitionError( + missing_parameters=missing_parameters) + + self._name = name + self._plugin = kwargs['plugin'] + self._args = kwargs['args'] + self._crd = None + self._template = None + self._group = None + self._version = None + self._plural = None + self._scope = None + + @property + def crd(self): + if self._crd: + return self._crd + crd = k8s_utils.get_custom_resource_definition(self.KIND) + if not crd: + raise exceptions.KubernetesNetworkObjectKindMissing() + self._crd = crd + return self._crd + + @property + def group(self): + if self._group: + return self._group + self._group = self.crd.spec.group + return self._group + + @property + def version(self): + if self._version: + return self._version + self._version = self.crd.spec.version + return self._version + + @property + def plural(self): + if self._plural: + return self._plural + self._plural = self.crd.spec.names.plural + return self._plural + + @property + def scope(self): + if self._scope: + return self._scope + self._scope = self.crd.spec.scope + return self._scope + + @property + def template(self): + """"Network" object template + + This template can be rendered only once the CRD "Network" is created in + Kubernetes. This function call must be delayed until the creation of + the CRD "Network". + """ + if self._template: + return self._template + + self._template = { + 'apiVersion': '{}/{}'.format(self.group, self.version), + 'kind': self.KIND, + 'metadata': { + 'name': self._name + }, + 'plugin': self._plugin, + 'args': self._args + } + return self._template + + def create(self): + k8s_utils.create_network(self.scope, self.group, self.version, + self.plural, self.template, self._name) + + def delete(self): + k8s_utils.delete_network(self.scope, self.group, self.version, + self.plural, self._name, skip_codes=[404]) class KubernetesTemplate(object): @@ -205,16 +454,21 @@ class KubernetesTemplate(object): """ context_cfg = copy.deepcopy(context_cfg) servers_cfg = context_cfg.pop('servers', {}) + crd_cfg = context_cfg.pop('custom_resources', []) + networks_cfg = context_cfg.pop('networks', {}) self.name = name self.ssh_key = '{}-key'.format(name) - self.rcs = [self._get_rc_name(rc) for rc in servers_cfg] - self.k8s_objs = [KubernetesObject(self._get_rc_name(rc), - ssh_key=self.ssh_key, - **cfg) - for rc, cfg in servers_cfg.items()] - self.service_objs = [ServiceObject(s) for s in self.rcs] - + self.rcs = {self._get_rc_name(rc): cfg + for rc, cfg in servers_cfg.items()} + self.rc_objs = [ReplicationControllerObject( + rc, ssh_key=self.ssh_key, **cfg) for rc, cfg in self.rcs.items()] + self.service_objs = [ServiceNodePortObject(rc, **cfg) + for rc, cfg in self.rcs.items()] + self.crd = [CustomResourceDefinitionObject(self.name, **crd) + for crd in crd_cfg] + self.network_objs = [NetworkObject(net_name, **net_data) + for net_name, net_data in networks_cfg.items()] self.pods = [] def _get_rc_name(self, rc_name): @@ -226,3 +480,8 @@ class KubernetesTemplate(object): if p.metadata.name.startswith(s)] return self.pods + + def get_rc_by_name(self, rc_name): + """Returns a ``ReplicationControllerObject``, searching by name""" + for rc in (rc for rc in self.rc_objs if rc.name == rc_name): + return rc