from functest.utils.constants import CONST
import functest.utils.openstack_utils as os_utils
- from snaps.openstack.os_credentials import OSCreds
from snaps.openstack.create_network import (NetworkSettings, SubnetSettings,
OpenStackNetwork)
from snaps.openstack.create_security_group import (SecurityGroupSettings,
raise Exception("VNF config file not found")
self.snaps_creds = ''
- self.created_object = []
config_file = os.path.join(self.case_dir, self.config)
self.orchestrator = dict(
self.__logger.info("Additional pre-configuration steps")
- self.snaps_creds = OSCreds(
- username=self.creds['username'],
- password=self.creds['password'],
- auth_url=self.creds['auth_url'],
- project_name=self.creds['tenant'],
- identity_api_version=int(os_utils.get_keystone_client_version()))
+ compute_quotas = self.os_project.get_compute_quotas()
+ network_quotas = self.os_project.get_network_quotas()
+
+ for key, value in (
+ self.orchestrator['requirements']['compute_quotas'].items()):
+ setattr(compute_quotas, key, value)
+
+ for key, value in (
+ self.orchestrator['requirements']['network_quotas'].items()):
+ setattr(network_quotas, key, value)
+
+ compute_quotas = self.project_ims.update_compute_quotas(compute_quotas)
+ network_quotas = self.project_ims.update_network_quotas(network_quotas)
# needs some images
self.__logger.info("Upload some OS images if it doesn't exist")
- for image_name, image_url in self.images.iteritems():
- self.__logger.info("image: %s, url: %s", image_name, image_url)
- if image_url and image_name:
+ for image_name, image_file in self.images.iteritems():
+ self.__logger.info("image: %s, file: %s", image_name, image_file)
+ if image_file and image_name:
image_creator = OpenStackImage(
self.snaps_creds,
ImageSettings(name=image_name,
image_user='cloud',
img_format='qcow2',
- url=image_url))
+ image_file=image_file))
image_creator.create()
# self.created_object.append(image_creator)
external_network_name=ext_net_name,
network_name=network_settings.name
))
+ self.result = 1/3 * 100
return True
def deploy_vnf(self):
self.__logger.info(execution)
if execution.status == 'terminated':
self.details['vnf'].update(status='PASS', duration=duration)
+ self.result += 1/3 * 100
result = True
else:
self.details['vnf'].update(status='FAIL', duration=duration)
dns_ip=dns_ip,
public_domain=self.vnf['inputs']["public_domain"])
duration = time.time() - start_time
- short_result = sig_test_format(vims_test_result)
+ short_result, nb_test = sig_test_format(vims_test_result)
self.__logger.info(short_result)
- self.details['test_vnf'].update(status='PASS',
- result=short_result,
+ self.details['test_vnf'].update(result=short_result,
full_result=vims_test_result,
duration=duration)
+ try:
+ vnf_test_rate = short_result['passed'] / nb_test
+ # orchestrator + vnf + test_vnf
+ self.result += vnf_test_rate / 3 * 100
+ except ZeroDivisionError:
+ self.__logger.error("No test has been executed")
+ self.details['test_vnf'].update(status='FAIL')
+ return False
+
return True
def clean(self):
self.__logger.warn("Some issue during the undeployment ..")
self.__logger.warn("Tenant clean continue ..")
- self.__logger.info('Remove the cloudify manager OS object ..')
- for creator in reversed(self.created_object):
- try:
- creator.clean()
- except Exception as exc:
- self.logger.error('Unexpected error cleaning - %s', exc)
super(CloudifyIms, self).clean()
@energy.enable_recording
nb_failures += 1
elif data_test['result'] == "Skipped":
nb_skipped += 1
- total_sig_test_result = {}
- total_sig_test_result['passed'] = nb_passed
- total_sig_test_result['failures'] = nb_failures
- total_sig_test_result['skipped'] = nb_skipped
- return total_sig_test_result
+ short_sig_test_result = {}
+ short_sig_test_result['passed'] = nb_passed
+ short_sig_test_result['failures'] = nb_failures
+ short_sig_test_result['skipped'] = nb_skipped
+ nb_test = nb_passed + nb_skipped
+ return (short_sig_test_result, nb_test)
def run_blocking_ssh_command(ssh, cmd, error_msg="Unable to run this command"):
tenant_images:
- ubuntu_14.04: http://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
- cloudify_manager_4.0: http://repository.cloudifysource.org/cloudify/4.0.1/sp-release/cloudify-manager-premium-4.0.1.qcow2
+ ubuntu_14.04: /home/opnfv/functest/images/trusty-server-cloudimg-amd64-disk1.img
+ cloudify_manager_4.0: /home/opnfv/functest/images/cloudify-manager-premium-4.0.1.qcow2
orchestrator:
name: cloudify
version: '4.0'
flavor:
name: m1.small
ram_min: 2048
+ compute_quotas:
+ cores: 50
+ instances: 15
+ network_quotas:
+ security_group: 20
+ security_group_rule: 100
+ port: 50
inputs:
image_id: 'ubuntu_14.04'
flavor_id: 'm1.small'
SubnetSettings,
PortSettings)
from snaps.openstack.create_router import OpenStackRouter, RouterSettings
- from snaps.openstack.os_credentials import OSCreds
from snaps.openstack.create_instance import (
VmInstanceSettings,
OpenStackVmInstance)
if not os.path.exists(self.data_dir):
os.makedirs(self.data_dir)
- self.images = get_config(
- "tenant_images.%s" %
- self.case_name, config_file)
+ self.images = get_config("tenant_images.orchestrator", config_file)
self.images.update(
get_config(
"tenant_images.%s" %
super(ClearwaterImsVnf, self).prepare()
self.logger.info("Additional pre-configuration steps")
- self.logger.info("creds %s", (self.creds))
-
- self.snaps_creds = OSCreds(
- username=self.creds['username'],
- password=self.creds['password'],
- auth_url=self.creds['auth_url'],
- project_name=self.creds['tenant'],
- identity_api_version=int(os_utils.get_keystone_client_version()))
+ self.creds = {
+ "tenant": self.tenant_name,
+ "username": self.tenant_name,
+ "password": self.tenant_name,
+ "auth_url": os_utils.get_credentials()['auth_url']
+ }
self.prepare_images()
self.prepare_flavor()
self.prepare_security_groups()
def prepare_images(self):
"""Upload images if they doen't exist yet"""
self.logger.info("Upload images if they doen't exist yet")
- for image_name, image_url in self.images.iteritems():
- self.logger.info("image: %s, url: %s", image_name, image_url)
- if image_url and image_name:
+ for image_name, image_file in self.images.iteritems():
+ self.logger.info("image: %s, file: %s", image_name, image_file)
+ if image_file and image_name:
image = OpenStackImage(
self.snaps_creds,
ImageSettings(name=image_name,
image_user='cloud',
img_format='qcow2',
- url=image_url))
+ image_file=image_file,
+ public=True))
image.create()
# self.created_resources.append(image);
try:
neutron_client = os_utils.get_neutron_client(self.creds)
self.logger.info("Deleting Open Baton Port...")
- port = snaps_utils.neutron_utils.get_port_by_name(
- neutron_client, '%s_port' % self.case_name)
+ port = snaps_utils.neutron_utils.get_port(
+ neutron_client,
+ port_name='%s_port' % self.case_name)
snaps_utils.neutron_utils.delete_port(neutron_client, port)
time.sleep(10)
except Exception as exc:
SubnetSettings,
PortSettings)
from snaps.openstack.create_router import OpenStackRouter, RouterSettings
- from snaps.openstack.os_credentials import OSCreds
from snaps.openstack.create_instance import (
VmInstanceSettings, OpenStackVmInstance)
from functest.opnfv_tests.openstack.snaps import snaps_utils
if not os.path.exists(self.data_dir):
os.makedirs(self.data_dir)
- self.images = get_config("tenant_images.%s" %
- self.case_name, config_file)
+ self.images = get_config("tenant_images.orchestrator", config_file)
self.images.update(get_config("tenant_images.%s" %
self.case_name, config_file))
- self.snaps_creds = None
def prepare(self):
"""Prepare testscase (Additional pre-configuration steps)."""
super(OpenImsVnf, self).prepare()
self.logger.info("Additional pre-configuration steps")
- self.logger.info("creds %s", (self.creds))
-
- self.snaps_creds = OSCreds(
- username=self.creds['username'],
- password=self.creds['password'],
- auth_url=self.creds['auth_url'],
- project_name=self.creds['tenant'],
- identity_api_version=int(os_utils.get_keystone_client_version()))
-
+ self.creds = {
+ "tenant": self.tenant_name,
+ "username": self.tenant_name,
+ "password": self.tenant_name,
+ "auth_url": os_utils.get_credentials()['auth_url']
+ }
self.prepare_images()
self.prepare_flavor()
self.prepare_security_groups()
def prepare_images(self):
"""Upload images if they doen't exist yet"""
self.logger.info("Upload images if they doen't exist yet")
- for image_name, image_url in self.images.iteritems():
- self.logger.info("image: %s, url: %s", image_name, image_url)
- if image_url and image_name:
+ for image_name, image_file in self.images.iteritems():
+ self.logger.info("image: %s, file: %s", image_name, image_file)
+ if image_file and image_name:
image = OpenStackImage(
self.snaps_creds,
ImageSettings(name=image_name,
image_user='cloud',
img_format='qcow2',
- url=image_url))
+ image_file=image_file,
+ public=True))
image.create()
# self.created_resources.append(image);
try:
neutron_client = os_utils.get_neutron_client(self.creds)
self.logger.info("Deleting Open Baton Port...")
- port = snaps_utils.neutron_utils.get_port_by_name(
- neutron_client, '%s_port' % self.case_name)
+ port = snaps_utils.neutron_utils.get_port(
+ neutron_client,
+ port_name='%s_port' % self.case_name)
snaps_utils.neutron_utils.delete_port(neutron_client, port)
time.sleep(10)
except Exception as exc: