#!/usr/bin/env python
-# Copyright (c) 2016 Orange and others.
+# Copyright (c) 2017 Orange and others.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
+"""CloudifyIms testcase implementation."""
+
import logging
import os
import time
-import yaml
-from scp import SCPClient
from cloudify_rest_client import CloudifyClient
from cloudify_rest_client.executions import Execution
+from scp import SCPClient
+import yaml
+from functest.energy import energy
+from functest.opnfv_tests.openstack.snaps import snaps_utils
import functest.opnfv_tests.vnf.ims.clearwater_ims_base as clearwater_ims_base
from functest.utils.constants import CONST
import functest.utils.openstack_utils as os_utils
-from snaps.openstack.os_credentials import OSCreds
-from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
- OpenStackNetwork
-from snaps.openstack.create_security_group import SecurityGroupSettings, \
- SecurityGroupRuleSettings,\
- Direction, Protocol, \
- OpenStackSecurityGroup
+from snaps.openstack.create_network import (NetworkSettings, SubnetSettings,
+ OpenStackNetwork)
+from snaps.openstack.create_security_group import (SecurityGroupSettings,
+ SecurityGroupRuleSettings,
+ Direction, Protocol,
+ OpenStackSecurityGroup)
from snaps.openstack.create_router import RouterSettings, OpenStackRouter
-from snaps.openstack.create_instance import VmInstanceSettings, \
- FloatingIpSettings, \
- OpenStackVmInstance
+from snaps.openstack.create_instance import (VmInstanceSettings,
+ FloatingIpSettings,
+ OpenStackVmInstance)
from snaps.openstack.create_flavor import FlavorSettings, OpenStackFlavor
from snaps.openstack.create_image import ImageSettings, OpenStackImage
from snaps.openstack.create_keypairs import KeypairSettings, OpenStackKeypair
from snaps.openstack.create_network import PortSettings
-from functest.opnfv_tests.openstack.snaps import snaps_utils
-
__author__ = "Valentin Boucher <valentin.boucher@orange.com>"
class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
- """Clearwater vIMS deployed with Cloudify Orchestrator Case"""
+ """Clearwater vIMS deployed with Cloudify Orchestrator Case."""
__logger = logging.getLogger(__name__)
def __init__(self, **kwargs):
+ """Initialize CloudifyIms testcase object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "cloudify_ims"
super(CloudifyIms, self).__init__(**kwargs)
raise Exception("VNF config file not found")
self.snaps_creds = ''
- self.created_object = []
config_file = os.path.join(self.case_dir, self.config)
self.orchestrator = dict(
self.__logger.info("Images needed for vIMS: %s", self.images)
def prepare(self):
+ """Prepare testscase (Additional pre-configuration steps)."""
super(CloudifyIms, self).prepare()
self.__logger.info("Additional pre-configuration steps")
- self.snaps_creds = OSCreds(
- username=self.creds['username'],
- password=self.creds['password'],
- auth_url=self.creds['auth_url'],
- project_name=self.creds['tenant'],
- identity_api_version=int(os_utils.get_keystone_client_version()))
+ compute_quotas = self.os_project.get_compute_quotas()
+ network_quotas = self.os_project.get_network_quotas()
+
+ for key, value in (
+ self.orchestrator['requirements']['compute_quotas'].items()):
+ setattr(compute_quotas, key, value)
+
+ for key, value in (
+ self.orchestrator['requirements']['network_quotas'].items()):
+ setattr(network_quotas, key, value)
+
+ compute_quotas = self.project_ims.update_compute_quotas(compute_quotas)
+ network_quotas = self.project_ims.update_network_quotas(network_quotas)
# needs some images
self.__logger.info("Upload some OS images if it doesn't exist")
def deploy_orchestrator(self):
"""
- Deploy Cloudify Manager
+ Deploy Cloudify Manager.
network, security group, fip, VM creation
"""
while str(cfy_status) != 'running' and retry:
try:
cfy_status = cfy_client.manager.get_status()['status']
+ self.__logger.debug("The current manager status is %s",
+ cfy_status)
except Exception: # pylint: disable=broad-except
self.__logger.warning("Cloudify Manager isn't " +
"up and running. Retrying ...")
self.__logger.info("Put private keypair in manager")
if manager_creator.vm_ssh_active(block=True):
ssh = manager_creator.ssh_client()
- scp = SCPClient(ssh.get_transport())
+ scp = SCPClient(ssh.get_transport(), socket_timeout=15.0)
scp.put(kp_file, '~/')
cmd = "sudo cp ~/cloudify_ims.pem /etc/cloudify/"
- ssh.exec_command(cmd)
+ run_blocking_ssh_command(ssh, cmd)
cmd = "sudo chmod 444 /etc/cloudify/cloudify_ims.pem"
- ssh.exec_command(cmd)
+ run_blocking_ssh_command(ssh, cmd)
cmd = "sudo yum install -y gcc python-devel"
- ssh.exec_command(cmd)
+ run_blocking_ssh_command(ssh, cmd, "Unable to install packages \
+ on manager")
self.details['orchestrator'].update(status='PASS', duration=duration)
return True
def deploy_vnf(self):
- """
- Deploy Clearwater IMS
- """
+ """Deploy Clearwater IMS."""
start_time = time.time()
self.__logger.info("Upload VNFD")
descriptor.get('file_name'))
self.__logger.info("Get or create flavor for all clearwater vm")
- self.exist_obj['flavor2'], flavor_id = os_utils.get_or_create_flavor(
- self.vnf['requirements']['flavor']['name'],
- self.vnf['requirements']['flavor']['ram_min'],
- '30',
- '1',
- public=True)
+ flavor_settings = FlavorSettings(
+ name=self.vnf['requirements']['flavor']['name'],
+ ram=self.vnf['requirements']['flavor']['ram_min'],
+ disk=25,
+ vcpus=1)
+ flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings)
+ flavor_creator.create()
+ self.created_object.append(flavor_creator)
self.vnf['inputs'].update(dict(
- flavor_id=flavor_id,
+ flavor_id=self.vnf['requirements']['flavor']['name'],
))
self.__logger.info("Create VNF Instance")
self.__logger.info(execution)
if execution.status == 'terminated':
self.details['vnf'].update(status='PASS', duration=duration)
- return True
+ result = True
else:
self.details['vnf'].update(status='FAIL', duration=duration)
- return False
+ result = False
+ return result
def test_vnf(self):
- """
- Run test on clearwater ims instance
- """
+ """Run test on clearwater ims instance."""
start_time = time.time()
cfy_client = self.orchestrator['object']
ellis_ip = outputs['ellis_ip']
self.config_ellis(ellis_ip)
- if dns_ip != "":
- vims_test_result = self.run_clearwater_live_test(
- dns_ip=dns_ip,
- public_domain=self.vnf['inputs']["public_domain"])
- duration = time.time() - start_time
- short_result = sig_test_format(vims_test_result)
- self.__logger.info(short_result)
- self.details['test_vnf'].update(status='PASS',
- result=short_result,
- full_result=vims_test_result,
- duration=duration)
- return True
- else:
+ if not dns_ip:
return False
+ vims_test_result = self.run_clearwater_live_test(
+ dns_ip=dns_ip,
+ public_domain=self.vnf['inputs']["public_domain"])
+ duration = time.time() - start_time
+ short_result = sig_test_format(vims_test_result)
+ self.__logger.info(short_result)
+ self.details['test_vnf'].update(status='PASS',
+ result=short_result,
+ full_result=vims_test_result,
+ duration=duration)
+ return True
+
def clean(self):
+ """Clean created objects/functions."""
try:
cfy_client = self.orchestrator['object']
dep_name = self.vnf['descriptor'].get('name')
try:
cfy_client.executions.cancel(execution['id'],
force=True)
- except:
+ except: # pylint: disable=broad-except
self.__logger.warn("Can't cancel the current exec")
execution = cfy_client.executions.start(
wait_for_execution(cfy_client, execution, self.__logger)
cfy_client.deployments.delete(self.vnf['descriptor'].get('name'))
cfy_client.blueprints.delete(self.vnf['descriptor'].get('name'))
- except:
+ except: # pylint: disable=broad-except
self.__logger.warn("Some issue during the undeployment ..")
self.__logger.warn("Tenant clean continue ..")
- self.__logger.info('Remove the cloudify manager OS object ..')
- for creator in reversed(self.created_object):
- try:
- creator.clean()
- except Exception as e:
- self.logger.error('Unexpected error cleaning - %s', e)
super(CloudifyIms, self).clean()
+ @energy.enable_recording
+ def run(self, **kwargs):
+ """Execute CloudifyIms test case."""
+ super(CloudifyIms, self).run(**kwargs)
+
# ----------------------------------------------------------
#
# -----------------------------------------------------------
def get_config(parameter, file_path):
"""
+ Get config parameter.
+
Returns the value of a given parameter in file.yaml
parameter must be given in string format with dots
Example: general.openstack.image_name
def wait_for_execution(client, execution, logger, timeout=2400, ):
- """
- Wait for a workflow execution on Cloudify Manager
- """
+ """Wait for a workflow execution on Cloudify Manager."""
# if execution already ended - return without waiting
if execution.status in Execution.END_STATES:
return execution
def _get_deployment_environment_creation_execution(client, deployment_id):
"""
- Get the execution id of a env preparation
+ Get the execution id of a env preparation.
network, security group, fip, VM creation
"""
def sig_test_format(sig_test):
- """
- Process the signaling result to have a short result
- """
+ """Process the signaling result to have a short result."""
nb_passed = 0
nb_failures = 0
nb_skipped = 0
total_sig_test_result['failures'] = nb_failures
total_sig_test_result['skipped'] = nb_skipped
return total_sig_test_result
+
+
+def run_blocking_ssh_command(ssh, cmd, error_msg="Unable to run this command"):
+ """Command to run ssh command with the exit status."""
+ stdin, stdout, stderr = ssh.exec_command(cmd)
+ if stdout.channel.recv_exit_status() != 0:
+ raise Exception(error_msg)