from keystoneclient import client as keystoneclient
from neutronclient.neutron import client as neutronclient
+from functest.utils.constants import CONST
import functest.utils.functest_utils as ft_utils
logger = logging.getLogger(__name__)
def source_credentials(rc_file):
with open(rc_file, "r") as f:
for line in f:
- var = line.rstrip('"\n').replace('export ', '').split("=")
+ var = (line.rstrip('"\n').replace('export ', '').split("=")
+ if re.search(r'(.*)=(.*)', line) else None)
# The two next lines should be modified as soon as rc_file
# conforms with common rules. Be aware that it could induce
# issues if value starts with '
- key = re.sub(r'^["\' ]*|[ \'"]*$', '', var[0])
- value = re.sub(r'^["\' ]*|[ \'"]*$', '', "".join(var[1:]))
- os.environ[key] = value
+ if var:
+ key = re.sub(r'^["\' ]*|[ \'"]*$', '', var[0])
+ value = re.sub(r'^["\' ]*|[ \'"]*$', '', "".join(var[1:]))
+ os.environ[key] = value
def get_credentials_for_rally():
return auth
-def get_endpoint(service_type, endpoint_type='publicURL'):
+def get_endpoint(service_type, interface='public'):
auth = get_session_auth()
return get_session().get_endpoint(auth=auth,
service_type=service_type,
- endpoint_type=endpoint_type)
+ interface=interface)
def get_session(other_creds={}):
def download_and_add_image_on_glance(glance, image_name, image_url, data_dir):
- dest_path = data_dir
- if not os.path.exists(dest_path):
- os.makedirs(dest_path)
- file_name = image_url.rsplit('/')[-1]
- if not ft_utils.download_url(image_url, dest_path):
- return False
-
- image = create_glance_image(
- glance, image_name, dest_path + file_name)
- if not image:
- return False
+ try:
+ dest_path = data_dir
+ if not os.path.exists(dest_path):
+ os.makedirs(dest_path)
+ file_name = image_url.rsplit('/')[-1]
+ if not ft_utils.download_url(image_url, dest_path):
+ return False
+ except Exception:
+ raise Exception("Impossible to download image from {}".format(
+ image_url))
- return image
+ try:
+ image = create_glance_image(
+ glance, image_name, dest_path + file_name)
+ if not image:
+ return False
+ else:
+ return image
+ except Exception:
+ raise Exception("Impossible to put image {} in glance".format(
+ image_name))
# *********************************************
flavor_id = create_flavor(
nova_client, flavor_name, ram, disk, vcpus, public=public)
if not flavor_id:
- logger.error("Failed to create flavor '%s'..." % (flavor_name))
+ raise Exception("Failed to create flavor '%s'..." % (flavor_name))
else:
logger.debug("Flavor '%s' with ID=%s created successfully."
% (flavor_name, flavor_id))
def get_external_net(neutron_client):
+ if (hasattr(CONST, 'EXTERNAL_NETWORK')):
+ return CONST.__getattribute__('EXTERNAL_NETWORK')
for network in neutron_client.list_networks()['networks']:
if network['router:external']:
return network['name']
def get_external_net_id(neutron_client):
+ if (hasattr(CONST, 'EXTERNAL_NETWORK')):
+ networks = neutron_client.list_networks(
+ name=CONST.__getattribute__('EXTERNAL_NETWORK'))
+ net_id = networks['networks'][0]['id']
+ return net_id
for network in neutron_client.list_networks()['networks']:
if network['router:external']:
return network['id']
return None
-def create_neutron_subnet(neutron_client, name, cidr, net_id):
+def create_neutron_subnet(neutron_client, name, cidr, net_id,
+ dns=['8.8.8.8', '8.8.4.4']):
json_body = {'subnets': [{'name': name, 'cidr': cidr,
- 'ip_version': 4, 'network_id': net_id}]}
+ 'ip_version': 4, 'network_id': net_id,
+ 'dns_nameservers': dns}]}
+
try:
subnet = neutron_client.create_subnet(body=json_body)
return subnet['subnets'][0]['id']
net_name,
subnet_name,
router_name,
- cidr):
+ cidr,
+ dns=['8.8.8.8', '8.8.4.4']):
# Check if the network already exists
network_id = get_network_id(neutron_client, net_name)
logger.debug("Network '%s' created successfully" % network_id)
logger.debug('Creating Subnet....')
subnet_id = create_neutron_subnet(neutron_client, subnet_name,
- cidr, network_id)
+ cidr, network_id, dns)
if not subnet_id:
return None
return id
-def create_glance_image(glance_client, image_name, file_path, disk="qcow2",
- container="bare", public="public"):
+def create_glance_image(glance_client,
+ image_name,
+ file_path,
+ disk="qcow2",
+ extra_properties={},
+ container="bare",
+ public="public"):
if not os.path.isfile(file_path):
logger.error("Error: file %s does not exist." % file_path)
return None
image = glance_client.images.create(name=image_name,
visibility=public,
disk_format=disk,
- container_format=container)
+ container_format=container,
+ **extra_properties)
image_id = image.id
with open(file_path) as image_data:
glance_client.images.upload(image_id, image_data)
return None
-def get_or_create_image(name, path, format):
+def get_or_create_image(name, path, format, extra_properties):
image_exists = False
glance_client = get_glance_client()
image_exists = True
else:
logger.info("Creating image '%s' from '%s'..." % (name, path))
- image_id = create_glance_image(glance_client, name, path, format)
+ image_id = create_glance_image(glance_client,
+ name,
+ path,
+ format,
+ extra_properties)
if not image_id:
logger.error("Failed to create a Glance image...")
else:
return id
+def get_domain_id(keystone_client, domain_name):
+ domains = keystone_client.domains.list()
+ id = ''
+ for d in domains:
+ if d.name == domain_name:
+ id = d.id
+ break
+ return id
+
+
def create_tenant(keystone_client, tenant_name, tenant_description):
try:
if is_keystone_v3():
+ domain_name = CONST.__getattribute__('OS_PROJECT_DOMAIN_NAME')
+ domain_id = get_domain_id(keystone_client, domain_name)
tenant = keystone_client.projects.create(
name=tenant_name,
description=tenant_description,
- domain="default",
+ domain=domain_id,
enabled=True)
else:
tenant = keystone_client.tenants.create(tenant_name,
try:
user_id = get_user_id(keystone_client, vnf_ref)
tenant_id = get_tenant_id(keystone_client, vnf_ref)
+ created = False
if not user_id:
user_id = create_user(keystone_client, vnf_ref, vnf_ref,
"", tenant_id)
- return True
- else:
- return False
- add_role_user(keystone_client, user_id, 'admin', vnf_ref)
+ created = True
+ try:
+ role_id = get_role_id(keystone_client, 'admin')
+ tenant_id = get_tenant_id(keystone_client, vnf_ref)
+ add_role_user(keystone_client, user_id, role_id, tenant_id)
+ except:
+ logger.warn("Cannot associate user to role admin on tenant")
+ return created
except:
raise Exception("Impossible to create a user for the VNF {}".format(
vnf_ref))
except Exception as e:
logger.error("Error [get_resource]: %s" % e)
return None
+
+
+# *********************************************
+# TEMPEST
+# *********************************************
+def init_tempest_cleanup(tempest_config_dir=None,
+ tempest_config_filename='tempest.conf',
+ output_file=None):
+ """
+ Initialize the Tempest Cleanup utility.
+ See https://docs.openstack.org/tempest/latest/cleanup.html for docs.
+
+ :param tempest_config_dir: The directory where the Tempest config file is
+ located. If not specified, we let Tempest pick both the directory
+ and the filename (i.e. second parameter is ignored)
+ :param tempest_config_filename: The filename of the Tempest config file
+ :param output_file: Optional file where to save output
+ """
+ # The Tempest cleanup utility currently offers no cmd argument to specify
+ # the config file, therefore it has to be configured with env variables
+ env = None
+ if tempest_config_dir:
+ env = os.environ.copy()
+ env['TEMPEST_CONFIG_DIR'] = tempest_config_dir
+ env['TEMPEST_CONFIG'] = tempest_config_filename
+
+ # If this command fails, an exception must be raised to stop the script
+ # otherwise the later cleanup would destroy also other resources
+ cmd_line = "tempest cleanup --init-saved-state"
+ ft_utils.execute_command_raise(cmd_line, env=env, output_file=output_file,
+ error_msg="Tempest cleanup init failed")
+
+
+def perform_tempest_cleanup(tempest_config_dir=None,
+ tempest_config_filename='tempest.conf',
+ output_file=None):
+ """
+ Perform cleanup using the Tempest Cleanup utility.
+ See https://docs.openstack.org/tempest/latest/cleanup.html for docs.
+
+ :param tempest_config_dir: The directory where the Tempest config file is
+ located. If not specified, we let Tempest pick both the directory
+ and the filename (i.e. second parameter is ignored)
+ :param tempest_config_filename: The filename of the Tempest config file
+ :param output_file: Optional file where to save output
+ """
+ # The Tempest cleanup utility currently offers no cmd argument to specify
+ # the config file, therefore it has to be configured with env variables
+ env = None
+ if tempest_config_dir:
+ env = os.environ.copy()
+ env['TEMPEST_CONFIG_DIR'] = tempest_config_dir
+ env['TEMPEST_CONFIG'] = tempest_config_filename
+
+ # If this command fails, an exception must be raised to stop the script
+ # otherwise the later cleanup would destroy also other resources
+ cmd_line = "tempest cleanup"
+ ft_utils.execute_command(cmd_line, env=env, output_file=output_file,
+ error_msg="Tempest cleanup failed")