"""CloudifyIms testcase implementation."""
+from __future__ import division
+
import logging
import os
import time
from cloudify_rest_client import CloudifyClient
from cloudify_rest_client.executions import Execution
from scp import SCPClient
-import yaml
-
+import six
from snaps.config.flavor import FlavorConfig
from snaps.config.image import ImageConfig
from snaps.config.keypair import KeypairConfig
from snaps.openstack.create_user import OpenStackUser
from snaps.openstack.utils import keystone_utils
from xtesting.energy import energy
+import yaml
from functest.opnfv_tests.openstack.snaps import snaps_utils
-import functest.opnfv_tests.vnf.ims.clearwater_ims_base as clearwater_ims_base
+from functest.opnfv_tests.vnf.ims import clearwater_ims_base
from functest.utils import config
from functest.utils import env
# needs some images
self.__logger.info("Upload some OS images if it doesn't exist")
- for image_name, image_file in self.images.iteritems():
+ for image_name, image_file in six.iteritems(self.images):
self.__logger.info("image: %s, file: %s", image_name, image_file)
if image_file and image_name:
image_creator = OpenStackImage(
keystone_password=snaps_creds.password,
keystone_tenant_name=snaps_creds.project_name,
keystone_url=public_auth_url,
- region=snaps_creds.region_name,
+ region=snaps_creds.region_name if snaps_creds.region_name else (
+ 'RegionOne'),
user_domain_name=snaps_creds.user_domain_name,
project_domain_name=snaps_creds.project_domain_name)
self.__logger.info("Set creds for cloudify manager %s", cfy_creds)
cfy_client = CloudifyClient(
host=manager_creator.get_floating_ip().ip,
- username='admin', password='admin', tenant='default_tenant')
+ username='admin', password='admin', tenant='default_tenant',
+ api_version='v3')
self.orchestrator['object'] = cfy_client
raise Exception("Cloudify Manager isn't up and running")
self.__logger.info("Put OpenStack creds in manager")
secrets_list = cfy_client.secrets.list()
- for k, val in cfy_creds.iteritems():
+ for k, val in six.iteritems(cfy_creds):
if not any(d.get('key', None) == k for d in secrets_list):
cfy_client.secrets.create(k, val)
else:
self.run_blocking_ssh_command(ssh, cmd)
cmd = "sudo chmod 444 /etc/cloudify/cloudify_ims.pem"
self.run_blocking_ssh_command(ssh, cmd)
- cmd = "sudo yum install -y gcc python-devel"
+ # cmd2 is badly unpinned by Cloudify
+ cmd = "sudo yum install -y gcc python-devel python-cmd2"
self.run_blocking_ssh_command(
ssh, cmd, "Unable to install packages on manager")
self.run_blocking_ssh_command(ssh, 'cfy status')
descriptor.get('name'),
self.vnf.get('inputs'))
- wait_for_execution(cfy_client,
- _get_deployment_environment_creation_execution(
- cfy_client, descriptor.get('name')),
- self.__logger,
- timeout=300)
+ wait_for_execution(
+ cfy_client,
+ get_execution_id(cfy_client, descriptor.get('name')),
+ self.__logger, timeout=300)
self.__logger.info("Start the VNF Instance deployment")
execution = cfy_client.executions.start(descriptor.get('name'),
if not dns_ip:
return False
- vims_test_result = self.run_clearwater_live_test(
+ short_result = self.run_clearwater_live_test(
dns_ip=dns_ip,
public_domain=self.vnf['inputs']["public_domain"])
duration = time.time() - start_time
- short_result, nb_test = sig_test_format(vims_test_result)
self.__logger.info(short_result)
self.details['test_vnf'].update(result=short_result,
- full_result=vims_test_result,
duration=duration)
try:
- vnf_test_rate = short_result['passed'] / nb_test
+ vnf_test_rate = short_result['passed'] / (
+ short_result['total'] - short_result['skipped'])
# orchestrator + vnf + test_vnf
self.result += vnf_test_rate / 3 * 100
except ZeroDivisionError:
self.__logger.error("No test has been executed")
self.details['test_vnf'].update(status='FAIL')
return False
-
- return True
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot calculate results")
+ self.details['test_vnf'].update(status='FAIL')
+ return False
+ return True if vnf_test_rate > 0 else False
def clean(self):
"""Clean created objects/functions."""
return execution
-def _get_deployment_environment_creation_execution(client, deployment_id):
+def get_execution_id(client, deployment_id):
"""
Get the execution id of a env preparation.
raise RuntimeError('Failed to get create_deployment_environment '
'workflow execution.'
'Available executions: {0}'.format(executions))
-
-
-def sig_test_format(sig_test):
- """Process the signaling result to have a short result."""
- nb_passed = 0
- nb_failures = 0
- nb_skipped = 0
- for data_test in sig_test:
- if data_test['result'] == "Passed":
- nb_passed += 1
- elif data_test['result'] == "Failed":
- nb_failures += 1
- elif data_test['result'] == "Skipped":
- nb_skipped += 1
- short_sig_test_result = {}
- short_sig_test_result['passed'] = nb_passed
- short_sig_test_result['failures'] = nb_failures
- short_sig_test_result['skipped'] = nb_skipped
- nb_test = nb_passed + nb_skipped
- return (short_sig_test_result, nb_test)