X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=functest%2Fopnfv_tests%2Fvnf%2Fims%2Fcloudify_ims.py;h=a4144ad957c667e944145d112d209fae58a6b7f2;hb=91929a9263638847491f1423eebfdcd528aa08cc;hp=89c84afdd815005766c355e2986e47aa3cd0f723;hpb=fb3ef61a03cdf2fb938d771e9d23f6435cff240f;p=functest.git diff --git a/functest/opnfv_tests/vnf/ims/cloudify_ims.py b/functest/opnfv_tests/vnf/ims/cloudify_ims.py index 89c84afdd..a4144ad95 100644 --- a/functest/opnfv_tests/vnf/ims/cloudify_ims.py +++ b/functest/opnfv_tests/vnf/ims/cloudify_ims.py @@ -1,42 +1,60 @@ #!/usr/bin/env python -# Copyright (c) 2016 Orange and others. +# Copyright (c) 2017 Orange and others. # # All rights reserved. This program and the accompanying materials # are made available under the terms of the Apache License, Version 2.0 # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 +"""CloudifyIms testcase implementation.""" + import logging import os -import sys import time -import requests +from cloudify_rest_client import CloudifyClient +from cloudify_rest_client.executions import Execution +from scp import SCPClient import yaml -from functest.opnfv_tests.vnf.ims.clearwater import Clearwater +from functest.energy import energy +from functest.opnfv_tests.openstack.snaps import snaps_utils import functest.opnfv_tests.vnf.ims.clearwater_ims_base as clearwater_ims_base -from functest.opnfv_tests.vnf.ims.orchestrator_cloudify import Orchestrator from functest.utils.constants import CONST -import functest.utils.functest_utils as ft_utils -import functest.utils.openstack_utils as os_utils + +from snaps.config.flavor import FlavorConfig +from snaps.config.image import ImageConfig +from snaps.config.keypair import KeypairConfig +from snaps.config.network import NetworkConfig, PortConfig, SubnetConfig +from snaps.config.router import RouterConfig +from snaps.config.security_group import ( + Direction, Protocol, SecurityGroupConfig, SecurityGroupRuleConfig) +from snaps.config.vm_inst import FloatingIpConfig, VmInstanceConfig + +from snaps.openstack.create_flavor import OpenStackFlavor +from snaps.openstack.create_image import OpenStackImage +from snaps.openstack.create_instance import OpenStackVmInstance +from snaps.openstack.create_keypairs import OpenStackKeypair +from snaps.openstack.create_network import OpenStackNetwork +from snaps.openstack.create_router import OpenStackRouter +from snaps.openstack.create_security_group import OpenStackSecurityGroup +from snaps.openstack.utils import keystone_utils + __author__ = "Valentin Boucher " class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase): - """Clearwater vIMS deployed with Cloudify Orchestrator Case""" + """Clearwater vIMS deployed with Cloudify Orchestrator Case.""" + + __logger = logging.getLogger(__name__) def __init__(self, **kwargs): + """Initialize CloudifyIms testcase object.""" if "case_name" not in kwargs: kwargs["case_name"] = "cloudify_ims" super(CloudifyIms, self).__init__(**kwargs) - self.logger = logging.getLogger(__name__) - self.neutron_client = '' - self.glance_client = '' - self.keystone_client = '' - self.nova_client = '' # Retrieve the configuration try: @@ -47,234 +65,373 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase): config_file = os.path.join(self.case_dir, self.config) self.orchestrator = dict( - requirements=get_config("cloudify.requirements", config_file), - blueprint=get_config("cloudify.blueprint", config_file), - inputs=get_config("cloudify.inputs", config_file) + requirements=get_config("orchestrator.requirements", config_file), + ) + self.details['orchestrator'] = dict( + name=get_config("orchestrator.name", config_file), + version=get_config("orchestrator.version", config_file), + status='ERROR', + result='' ) - self.logger.debug("Orchestrator configuration: %s", self.orchestrator) + self.__logger.debug("Orchestrator configuration %s", self.orchestrator) self.vnf = dict( - blueprint=get_config("clearwater.blueprint", config_file), - deployment_name=get_config("clearwater.deployment_name", - config_file), - inputs=get_config("clearwater.inputs", config_file), - requirements=get_config("clearwater.requirements", config_file) + descriptor=get_config("vnf.descriptor", config_file), + inputs=get_config("vnf.inputs", config_file), + requirements=get_config("vnf.requirements", config_file) ) - self.logger.debug("VNF configuration: %s", self.vnf) + self.details['vnf'] = dict( + descriptor_version=self.vnf['descriptor']['version'], + name=get_config("vnf.name", config_file), + version=get_config("vnf.version", config_file), + ) + self.__logger.debug("VNF configuration: %s", self.vnf) + self.details['test_vnf'] = dict( + name=get_config("vnf_test_suite.name", config_file), + version=get_config("vnf_test_suite.version", config_file) + ) self.images = get_config("tenant_images", config_file) - self.logger.info("Images needed for vIMS: %s", self.images) + self.__logger.info("Images needed for vIMS: %s", self.images) - def deploy_orchestrator(self): + def prepare(self): + """Prepare testscase (Additional pre-configuration steps).""" + super(CloudifyIms, self).prepare() + + self.__logger.info("Additional pre-configuration steps") + + compute_quotas = self.os_project.get_compute_quotas() + network_quotas = self.os_project.get_network_quotas() - self.logger.info("Additional pre-configuration steps") - self.neutron_client = os_utils.get_neutron_client(self.admin_creds) - self.glance_client = os_utils.get_glance_client(self.admin_creds) - self.keystone_client = os_utils.get_keystone_client(self.admin_creds) - self.nova_client = os_utils.get_nova_client(self.admin_creds) + for key, value in ( + self.vnf['requirements']['compute_quotas'].items()): + setattr(compute_quotas, key, value) + + for key, value in ( + self.vnf['requirements']['network_quotas'].items()): + setattr(network_quotas, key, value) + + compute_quotas = self.os_project.update_compute_quotas(compute_quotas) + network_quotas = self.os_project.update_network_quotas(network_quotas) # needs some images - self.logger.info("Upload some OS images if it doesn't exist") - temp_dir = os.path.join(self.data_dir, "tmp/") - for image_name, image_url in self.images.iteritems(): - self.logger.info("image: %s, url: %s", image_name, image_url) - try: - image_id = os_utils.get_image_id(self.glance_client, - image_name) - self.logger.debug("image_id: %s", image_id) - except Exception: - self.logger.error("Unexpected error: %s", sys.exc_info()[0]) - - if image_id == '': - self.logger.info("""%s image does not exist on glance repo. - Try downloading this image - and upload on glance !""", - image_name) - image_id = os_utils.download_and_add_image_on_glance( - self.glance_client, - image_name, - image_url, - temp_dir) - # Need to extend quota - self.logger.info("Update security group quota for this tenant") - tenant_id = os_utils.get_tenant_id(self.keystone_client, - self.tenant_name) - self.logger.debug("Tenant id found %s", tenant_id) - if not os_utils.update_sg_quota(self.neutron_client, - tenant_id, 50, 100): - self.logger.error("Failed to update security group quota" - " for tenant " + self.tenant_name) - - self.logger.debug("group quota extended") - - # start the deployment of cloudify - public_auth_url = os_utils.get_endpoint('identity') - - self.logger.debug("CFY inputs: %s", self.orchestrator['inputs']) - cfy = Orchestrator(self.data_dir, self.orchestrator['inputs']) - self.orchestrator['object'] = cfy - self.logger.debug("Orchestrator object created") - - self.logger.debug("Tenant name: %s", self.tenant_name) - - cfy.set_credentials(username=self.tenant_name, - password=self.tenant_name, - tenant_name=self.tenant_name, - auth_url=public_auth_url) - self.logger.info("Credentials set in CFY") + self.__logger.info("Upload some OS images if it doesn't exist") + for image_name, image_file in self.images.iteritems(): + self.__logger.info("image: %s, file: %s", image_name, image_file) + if image_file and image_name: + image_creator = OpenStackImage( + self.snaps_creds, + ImageConfig( + name=image_name, image_user='cloud', + img_format='qcow2', image_file=image_file)) + image_creator.create() + # self.created_object.append(image_creator) + + def deploy_orchestrator(self): + """ + Deploy Cloudify Manager. + + network, security group, fip, VM creation + """ + # network creation + + start_time = time.time() + self.__logger.info("Creating keypair ...") + kp_file = os.path.join(self.data_dir, "cloudify_ims.pem") + keypair_settings = KeypairConfig(name='cloudify_ims_kp', + private_filepath=kp_file) + keypair_creator = OpenStackKeypair(self.snaps_creds, keypair_settings) + keypair_creator.create() + self.created_object.append(keypair_creator) + + self.__logger.info("Creating full network ...") + subnet_settings = SubnetConfig(name='cloudify_ims_subnet', + cidr='10.67.79.0/24') + network_settings = NetworkConfig(name='cloudify_ims_network', + subnet_settings=[subnet_settings]) + network_creator = OpenStackNetwork(self.snaps_creds, network_settings) + network_creator.create() + self.created_object.append(network_creator) + ext_net_name = snaps_utils.get_ext_net_name(self.snaps_creds) + router_creator = OpenStackRouter( + self.snaps_creds, + RouterConfig( + name='cloudify_ims_router', + external_gateway=ext_net_name, + internal_subnets=[subnet_settings.name])) + router_creator.create() + self.created_object.append(router_creator) + + # security group creation + self.__logger.info("Creating security group for cloudify manager vm") + sg_rules = list() + sg_rules.append( + SecurityGroupRuleConfig( + sec_grp_name="sg-cloudify-manager", + direction=Direction.ingress, protocol=Protocol.tcp, + port_range_min=1, port_range_max=65535)) + sg_rules.append( + SecurityGroupRuleConfig( + sec_grp_name="sg-cloudify-manager", + direction=Direction.ingress, protocol=Protocol.udp, + port_range_min=1, port_range_max=65535)) + + security_group_creator = OpenStackSecurityGroup( + self.snaps_creds, + SecurityGroupConfig( + name="sg-cloudify-manager", + rule_settings=sg_rules)) + + security_group_creator.create() + self.created_object.append(security_group_creator) # orchestrator VM flavor - self.logger.info("Check Flavor is available, if not create one") - self.logger.debug("Flavor details %s ", - self.orchestrator['requirements']['ram_min']) - flavor_exist, flavor_id = os_utils.get_or_create_flavor( - "m1.large", - self.orchestrator['requirements']['ram_min'], - '50', - '2', - public=True) - self.logger.debug("Flavor id: %s", flavor_id) - - if not flavor_id: - self.logger.info("Available flavors are: ") - self.logger.info(self.nova_client.flavor.list()) - - cfy.set_flavor_id(flavor_id) - self.logger.debug("Flavor OK") - - # orchestrator VM image - self.logger.debug("Orchestrator image") - if 'os_image' in self.orchestrator['requirements'].keys(): - image_id = os_utils.get_image_id( - self.glance_client, - self.orchestrator['requirements']['os_image']) - self.logger.debug("Orchestrator image id: %s", image_id) - if image_id == '': - self.logger.error("CFY image not found") - - cfy.set_image_id(image_id) - self.logger.debug("Orchestrator image set") - - self.logger.debug("Get External network") - ext_net = os_utils.get_external_net(self.neutron_client) - self.logger.debug("External network: %s", ext_net) - - cfy.set_external_network_name(ext_net) - self.logger.debug("CFY External network set") - - self.logger.debug("get resolvconf") - ns = ft_utils.get_resolvconf_ns() - if ns: - cfy.set_nameservers(ns) - self.logger.debug("Resolvconf set") - - self.logger.info("Prepare virtualenv for cloudify-cli") - venv_scrit_dir = os.path.join(self.case_dir, "create_venv.sh") - cmd = "chmod +x " + venv_scrit_dir - ft_utils.execute_command(cmd) - time.sleep(3) - cmd = venv_scrit_dir + " " + self.data_dir - ft_utils.execute_command(cmd) - - cfy.download_manager_blueprint( - self.orchestrator['blueprint']['url'], - self.orchestrator['blueprint']['branch']) - - error = cfy.deploy_manager() - if error: - self.logger.error(error) - return {'status': 'FAIL', 'result': error} + self.__logger.info("Get or create flavor for cloudify manager vm ...") + + flavor_settings = FlavorConfig( + name=self.orchestrator['requirements']['flavor']['name'], + ram=self.orchestrator['requirements']['flavor']['ram_min'], + disk=50, + vcpus=2) + flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings) + flavor_creator.create() + self.created_object.append(flavor_creator) + image_settings = ImageConfig( + name=self.orchestrator['requirements']['os_image'], + image_user='centos', + exists=True) + + port_settings = PortConfig(name='cloudify_manager_port', + network_name=network_settings.name) + + manager_settings = VmInstanceConfig( + name='cloudify_manager', + flavor=flavor_settings.name, + port_settings=[port_settings], + security_group_names=[ + security_group_creator.sec_grp_settings.name], + floating_ip_settings=[FloatingIpConfig( + name='cloudify_manager_fip', + port_name=port_settings.name, + router_name=router_creator.router_settings.name)]) + + manager_creator = OpenStackVmInstance(self.snaps_creds, + manager_settings, + image_settings, + keypair_settings) + + self.__logger.info("Creating cloudify manager VM") + manager_creator.create() + self.created_object.append(manager_creator) + + public_auth_url = keystone_utils.get_endpoint( + self.snaps_creds, 'identity') + + self.__logger.info("Set creds for cloudify manager") + cfy_creds = dict(keystone_username=self.snaps_creds.username, + keystone_password=self.snaps_creds.password, + keystone_tenant_name=self.snaps_creds.project_name, + keystone_url=public_auth_url) + + cfy_client = CloudifyClient(host=manager_creator.get_floating_ip().ip, + username='admin', + password='admin', + tenant='default_tenant') + + self.orchestrator['object'] = cfy_client + + self.__logger.info("Attemps running status of the Manager") + cfy_status = None + retry = 10 + while str(cfy_status) != 'running' and retry: + try: + cfy_status = cfy_client.manager.get_status()['status'] + self.__logger.debug("The current manager status is %s", + cfy_status) + except Exception: # pylint: disable=broad-except + self.__logger.warning("Cloudify Manager isn't " + + "up and running. Retrying ...") + retry = retry - 1 + time.sleep(30) + + if str(cfy_status) == 'running': + self.__logger.info("Cloudify Manager is up and running") else: - return {'status': 'PASS', 'result': ''} + raise Exception("Cloudify Manager isn't up and running") + + self.__logger.info("Put OpenStack creds in manager") + secrets_list = cfy_client.secrets.list() + for k, val in cfy_creds.iteritems(): + if not any(d.get('key', None) == k for d in secrets_list): + cfy_client.secrets.create(k, val) + else: + cfy_client.secrets.update(k, val) + + duration = time.time() - start_time + + self.__logger.info("Put private keypair in manager") + if manager_creator.vm_ssh_active(block=True): + ssh = manager_creator.ssh_client() + scp = SCPClient(ssh.get_transport(), socket_timeout=15.0) + scp.put(kp_file, '~/') + cmd = "sudo cp ~/cloudify_ims.pem /etc/cloudify/" + run_blocking_ssh_command(ssh, cmd) + cmd = "sudo chmod 444 /etc/cloudify/cloudify_ims.pem" + run_blocking_ssh_command(ssh, cmd) + cmd = "sudo yum install -y gcc python-devel" + run_blocking_ssh_command(ssh, cmd, "Unable to install packages \ + on manager") + + self.details['orchestrator'].update(status='PASS', duration=duration) + + self.vnf['inputs'].update(dict( + external_network_name=ext_net_name, + network_name=network_settings.name + )) + self.result = 1/3 * 100 + return True def deploy_vnf(self): - cw = Clearwater(self.vnf['inputs'], self.orchestrator['object'], - self.logger) - self.vnf['object'] = cw - - self.logger.info("Collect flavor id for all clearwater vm") - flavor_exist, flavor_id = os_utils.get_or_create_flavor( - "m1.small", - self.vnf['requirements']['ram_min'], - '30', - '1', - public=True) - self.logger.debug("Flavor id: %s", flavor_id) - if not flavor_id: - self.logger.info("Available flavors are: ") - self.logger.info(self.nova_client.flavor.list()) - - cw.set_flavor_id(flavor_id) - - # VMs image - if 'os_image' in self.vnf['requirements'].keys(): - image_id = os_utils.get_image_id( - self.glance_client, self.vnf['requirements']['os_image']) - - cw.set_image_id(image_id) - ext_net = os_utils.get_external_net(self.neutron_client) - cw.set_external_network_name(ext_net) - - error = cw.deploy_vnf(self.vnf['blueprint']) - if error: - self.logger.error(error) - return {'status': 'FAIL', 'result': error} + """Deploy Clearwater IMS.""" + start_time = time.time() + + self.__logger.info("Upload VNFD") + cfy_client = self.orchestrator['object'] + descriptor = self.vnf['descriptor'] + cfy_client.blueprints.publish_archive(descriptor.get('url'), + descriptor.get('name'), + descriptor.get('file_name')) + + self.__logger.info("Get or create flavor for all clearwater vm") + flavor_settings = FlavorConfig( + name=self.vnf['requirements']['flavor']['name'], + ram=self.vnf['requirements']['flavor']['ram_min'], + disk=25, + vcpus=1) + flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings) + flavor_creator.create() + self.created_object.append(flavor_creator) + + self.vnf['inputs'].update(dict( + flavor_id=self.vnf['requirements']['flavor']['name'], + )) + + self.__logger.info("Create VNF Instance") + cfy_client.deployments.create(descriptor.get('name'), + descriptor.get('name'), + self.vnf.get('inputs')) + + wait_for_execution(cfy_client, + _get_deployment_environment_creation_execution( + cfy_client, descriptor.get('name')), + self.__logger, + timeout=300) + + self.__logger.info("Start the VNF Instance deployment") + execution = cfy_client.executions.start(descriptor.get('name'), + 'install') + # Show execution log + execution = wait_for_execution(cfy_client, execution, self.__logger) + + duration = time.time() - start_time + + self.__logger.info(execution) + if execution.status == 'terminated': + self.details['vnf'].update(status='PASS', duration=duration) + self.result += 1/3 * 100 + result = True else: - return {'status': 'PASS', 'result': ''} + self.details['vnf'].update(status='FAIL', duration=duration) + result = False + return result def test_vnf(self): - script = "source {0}venv_cloudify/bin/activate; " - script += "cd {0}; " - script += "cfy status | grep -Eo \"([0-9]{{1,3}}\.){{3}}[0-9]{{1,3}}\"" - cmd = "/bin/bash -c '" + script.format(self.data_dir) + "'" + """Run test on clearwater ims instance.""" + start_time = time.time() - try: - self.logger.debug("Trying to get clearwater manager IP ... ") - mgr_ip = os.popen(cmd).read() - mgr_ip = mgr_ip.splitlines()[0] - except Exception: - self.logger.exception("Unable to retrieve the IP of the " - "cloudify manager server !") - - self.logger.info('Cloudify Manager: %s', mgr_ip) - api_url = 'http://{0}/api/v2/deployments/{1}/outputs'.format( - mgr_ip, self.vnf['deployment_name']) - dep_outputs = requests.get(api_url) - self.logger.info(api_url) - outputs = dep_outputs.json()['outputs'] - self.logger.info("Deployment outputs: %s", outputs) + cfy_client = self.orchestrator['object'] + + outputs = cfy_client.deployments.outputs.get( + self.vnf['descriptor'].get('name'))['outputs'] dns_ip = outputs['dns_ip'] ellis_ip = outputs['ellis_ip'] self.config_ellis(ellis_ip) - if dns_ip != "": - vims_test_result = self.run_clearwater_live_test( - dns_ip=dns_ip, - public_domain="") # self.inputs["public_domain"] - if vims_test_result != '': - return {'status': 'PASS', 'result': vims_test_result} - else: - return {'status': 'FAIL', 'result': ''} + if not dns_ip: + return False + + vims_test_result = self.run_clearwater_live_test( + dns_ip=dns_ip, + public_domain=self.vnf['inputs']["public_domain"]) + duration = time.time() - start_time + short_result, nb_test = sig_test_format(vims_test_result) + self.__logger.info(short_result) + self.details['test_vnf'].update(result=short_result, + full_result=vims_test_result, + duration=duration) + try: + vnf_test_rate = short_result['passed'] / nb_test + # orchestrator + vnf + test_vnf + self.result += vnf_test_rate / 3 * 100 + except ZeroDivisionError: + self.__logger.error("No test has been executed") + self.details['test_vnf'].update(status='FAIL') + return False + + return True def clean(self): - self.vnf['object'].undeploy_vnf() - self.orchestrator['object'].undeploy_manager() + """Clean created objects/functions.""" + try: + cfy_client = self.orchestrator['object'] + dep_name = self.vnf['descriptor'].get('name') + # kill existing execution + self.__logger.info('Deleting the current deployment') + exec_list = cfy_client.executions.list(dep_name) + for execution in exec_list: + if execution['status'] == "started": + try: + cfy_client.executions.cancel(execution['id'], + force=True) + except: # pylint: disable=broad-except + self.__logger.warn("Can't cancel the current exec") + + execution = cfy_client.executions.start( + dep_name, + 'uninstall', + parameters=dict(ignore_failure=True), + force=True) + + wait_for_execution(cfy_client, execution, self.__logger) + cfy_client.deployments.delete(self.vnf['descriptor'].get('name')) + cfy_client.blueprints.delete(self.vnf['descriptor'].get('name')) + except: # pylint: disable=broad-except + self.__logger.warn("Some issue during the undeployment ..") + self.__logger.warn("Tenant clean continue ..") + super(CloudifyIms, self).clean() + @energy.enable_recording + def run(self, **kwargs): + """Execute CloudifyIms test case.""" + return super(CloudifyIms, self).run(**kwargs) + # ---------------------------------------------------------- # # YAML UTILS # # ----------------------------------------------------------- -def get_config(parameter, file): +def get_config(parameter, file_path): """ + Get config parameter. + Returns the value of a given parameter in file.yaml parameter must be given in string format with dots Example: general.openstack.image_name """ - with open(file) as f: - file_yaml = yaml.safe_load(f) - f.close() + with open(file_path) as config_file: + file_yaml = yaml.safe_load(config_file) + config_file.close() value = file_yaml for element in parameter.split("."): value = value.get(element) @@ -282,3 +439,94 @@ def get_config(parameter, file): raise ValueError("The parameter %s is not defined in" " reporting.yaml" % parameter) return value + + +def wait_for_execution(client, execution, logger, timeout=1500, ): + """Wait for a workflow execution on Cloudify Manager.""" + # if execution already ended - return without waiting + if execution.status in Execution.END_STATES: + return execution + + if timeout is not None: + deadline = time.time() + timeout + + # Poll for execution status and execution logs, until execution ends + # and we receive an event of type in WORKFLOW_END_TYPES + offset = 0 + batch_size = 50 + event_list = [] + execution_ended = False + while True: + event_list = client.events.list( + execution_id=execution.id, + _offset=offset, + _size=batch_size, + include_logs=False, + sort='@timestamp').items + + offset = offset + len(event_list) + for event in event_list: + logger.debug(event.get('message')) + + if timeout is not None: + if time.time() > deadline: + raise RuntimeError( + 'execution of operation {0} for deployment {1} ' + 'timed out'.format(execution.workflow_id, + execution.deployment_id)) + else: + # update the remaining timeout + timeout = deadline - time.time() + + if not execution_ended: + execution = client.executions.get(execution.id) + execution_ended = execution.status in Execution.END_STATES + + if execution_ended: + break + + time.sleep(5) + + return execution + + +def _get_deployment_environment_creation_execution(client, deployment_id): + """ + Get the execution id of a env preparation. + + network, security group, fip, VM creation + """ + executions = client.executions.list(deployment_id=deployment_id) + for execution in executions: + if execution.workflow_id == 'create_deployment_environment': + return execution + raise RuntimeError('Failed to get create_deployment_environment ' + 'workflow execution.' + 'Available executions: {0}'.format(executions)) + + +def sig_test_format(sig_test): + """Process the signaling result to have a short result.""" + nb_passed = 0 + nb_failures = 0 + nb_skipped = 0 + for data_test in sig_test: + if data_test['result'] == "Passed": + nb_passed += 1 + elif data_test['result'] == "Failed": + nb_failures += 1 + elif data_test['result'] == "Skipped": + nb_skipped += 1 + short_sig_test_result = {} + short_sig_test_result['passed'] = nb_passed + short_sig_test_result['failures'] = nb_failures + short_sig_test_result['skipped'] = nb_skipped + nb_test = nb_passed + nb_skipped + return (short_sig_test_result, nb_test) + + +def run_blocking_ssh_command(ssh, cmd, error_msg="Unable to run this command"): + """Command to run ssh command with the exit status.""" + stdin, stdout, stderr = ssh.exec_command(cmd) + if stdout.channel.recv_exit_status() != 0: + raise Exception(error_msg)