import logging
import os.path
-import re
import sys
import time
from keystoneclient import client as keystoneclient
from neutronclient.neutron import client as neutronclient
+from functest.utils.constants import CONST
import functest.utils.functest_utils as ft_utils
logger = logging.getLogger(__name__)
'OS_PROJECT_NAME': 'project_name',
'OS_ENDPOINT_TYPE': 'endpoint_type',
'OS_REGION_NAME': 'region_name',
- 'OS_CACERT': 'https_cacert'
+ 'OS_CACERT': 'https_cacert',
+ 'OS_INSECURE': 'https_insecure'
}
return env_cred_dict
return creds
-def source_credentials(rc_file):
- with open(rc_file, "r") as f:
- for line in f:
- var = line.rstrip('"\n').replace('export ', '').split("=")
- # The two next lines should be modified as soon as rc_file
- # conforms with common rules. Be aware that it could induce
- # issues if value starts with '
- key = re.sub(r'^["\' ]*|[ \'"]*$', '', var[0])
- value = re.sub(r'^["\' ]*|[ \'"]*$', '', "".join(var[1:]))
- os.environ[key] = value
-
-
-def get_credentials_for_rally():
- creds = get_credentials()
- env_cred_dict = get_env_cred_dict()
- rally_conf = {"type": "ExistingCloud", "admin": {}}
- for key in creds:
- if key == 'auth_url':
- rally_conf[key] = creds[key]
- else:
- rally_conf['admin'][key] = creds[key]
-
- endpoint_types = [('internalURL', 'internal'),
- ('publicURL', 'public'), ('adminURL', 'admin')]
-
- endpoint_type = get_endpoint_type_from_env()
- if endpoint_type is not None:
- cred_key = env_cred_dict.get('OS_ENDPOINT_TYPE')
- for k, v in endpoint_types:
- if endpoint_type == v:
- rally_conf[cred_key] = v
-
- region_name = os.getenv('OS_REGION_NAME')
- if region_name is not None:
- cred_key = env_cred_dict.get('OS_REGION_NAME')
- rally_conf[cred_key] = region_name
-
- cacert = os.getenv('OS_CACERT')
- if cacert is not None:
- cred_key = env_cred_dict.get('OS_CACERT')
- rally_conf[cred_key] = cacert
- return rally_conf
-
-
-def get_endpoint_type_from_env():
- endpoint_type = os.environ.get("OS_ENDPOINT_TYPE",
- os.environ.get("OS_INTERFACE"))
- if endpoint_type and "URL" in endpoint_type:
- endpoint_type = endpoint_type.replace("URL", "")
- return endpoint_type
-
-
def get_session_auth(other_creds={}):
loader = loading.get_plugin_loader('password')
creds = get_credentials(other_creds)
return auth
-def get_endpoint(service_type, endpoint_type='publicURL'):
+def get_endpoint(service_type, interface='public'):
auth = get_session_auth()
return get_session().get_endpoint(auth=auth,
service_type=service_type,
- endpoint_type=endpoint_type)
+ interface=interface)
def get_session(other_creds={}):
def download_and_add_image_on_glance(glance, image_name, image_url, data_dir):
- dest_path = data_dir
- if not os.path.exists(dest_path):
- os.makedirs(dest_path)
- file_name = image_url.rsplit('/')[-1]
- if not ft_utils.download_url(image_url, dest_path):
- return False
-
- image = create_glance_image(
- glance, image_name, dest_path + file_name)
- if not image:
- return False
+ try:
+ dest_path = data_dir
+ if not os.path.exists(dest_path):
+ os.makedirs(dest_path)
+ file_name = image_url.rsplit('/')[-1]
+ if not ft_utils.download_url(image_url, dest_path):
+ return False
+ except Exception:
+ raise Exception("Impossible to download image from {}".format(
+ image_url))
- return image
+ try:
+ image = create_glance_image(
+ glance, image_name, dest_path + file_name)
+ if not image:
+ return False
+ else:
+ return image
+ except Exception:
+ raise Exception("Impossible to put image {} in glance".format(
+ image_name))
# *********************************************
flavor_id = create_flavor(
nova_client, flavor_name, ram, disk, vcpus, public=public)
if not flavor_id:
- logger.error("Failed to create flavor '%s'..." % (flavor_name))
+ raise Exception("Failed to create flavor '%s'..." % (flavor_name))
else:
logger.debug("Flavor '%s' with ID=%s created successfully."
% (flavor_name, flavor_id))
count = VM_BOOT_TIMEOUT / SLEEP
for n in range(count, -1, -1):
status = get_instance_status(nova_client, instance)
- if status.lower() == "active":
+ if status is None:
+ time.sleep(SLEEP)
+ continue
+ elif status.lower() == "active":
return instance
elif status.lower() == "error":
logger.error("The instance %s went to ERROR status."
def get_external_net(neutron_client):
+ if (hasattr(CONST, 'EXTERNAL_NETWORK')):
+ return CONST.__getattribute__('EXTERNAL_NETWORK')
for network in neutron_client.list_networks()['networks']:
if network['router:external']:
return network['name']
def get_external_net_id(neutron_client):
+ if (hasattr(CONST, 'EXTERNAL_NETWORK')):
+ networks = neutron_client.list_networks(
+ name=CONST.__getattribute__('EXTERNAL_NETWORK'))
+ net_id = networks['networks'][0]['id']
+ return net_id
for network in neutron_client.list_networks()['networks']:
if network['router:external']:
return network['id']
return None
-def create_neutron_subnet(neutron_client, name, cidr, net_id):
+def create_neutron_subnet(neutron_client, name, cidr, net_id,
+ dns=['8.8.8.8', '8.8.4.4']):
json_body = {'subnets': [{'name': name, 'cidr': cidr,
- 'ip_version': 4, 'network_id': net_id}]}
+ 'ip_version': 4, 'network_id': net_id,
+ 'dns_nameservers': dns}]}
+
try:
subnet = neutron_client.create_subnet(body=json_body)
return subnet['subnets'][0]['id']
net_name,
subnet_name,
router_name,
- cidr):
+ cidr,
+ dns=['8.8.8.8', '8.8.4.4']):
# Check if the network already exists
network_id = get_network_id(neutron_client, net_name)
logger.debug("Network '%s' created successfully" % network_id)
logger.debug('Creating Subnet....')
subnet_id = create_neutron_subnet(neutron_client, subnet_name,
- cidr, network_id)
+ cidr, network_id, dns)
if not subnet_id:
return None
return network_dic
-def create_bgpvpn(neutron_client, **kwargs):
- # route_distinguishers
- # route_targets
- json_body = {"bgpvpn": kwargs}
- return neutron_client.create_bgpvpn(json_body)
-
-
-def create_network_association(neutron_client, bgpvpn_id, neutron_network_id):
- json_body = {"network_association": {"network_id": neutron_network_id}}
- return neutron_client.create_network_association(bgpvpn_id, json_body)
-
-
-def create_router_association(neutron_client, bgpvpn_id, router_id):
- json_body = {"router_association": {"router_id": router_id}}
- return neutron_client.create_router_association(bgpvpn_id, json_body)
-
-
-def update_bgpvpn(neutron_client, bgpvpn_id, **kwargs):
- json_body = {"bgpvpn": kwargs}
- return neutron_client.update_bgpvpn(bgpvpn_id, json_body)
-
-
-def delete_bgpvpn(neutron_client, bgpvpn_id):
- return neutron_client.delete_bgpvpn(bgpvpn_id)
-
-
-def get_bgpvpn(neutron_client, bgpvpn_id):
- return neutron_client.show_bgpvpn(bgpvpn_id)
-
-
-def get_bgpvpn_routers(neutron_client, bgpvpn_id):
- return get_bgpvpn(neutron_client, bgpvpn_id)['bgpvpn']['routers']
-
-
-def get_bgpvpn_networks(neutron_client, bgpvpn_id):
- return get_bgpvpn(neutron_client, bgpvpn_id)['bgpvpn']['networks']
-
# *********************************************
# SEC GROUPS
# *********************************************
# *********************************************
# GLANCE
# *********************************************
-def get_images(nova_client):
+def get_images(glance_client):
try:
- images = nova_client.images.list()
+ images = glance_client.images.list()
return images
except Exception as e:
logger.error("Error [get_images]: %s" % e)
return id
-def create_glance_image(glance_client, image_name, file_path, disk="qcow2",
- container="bare", public="public"):
+def create_glance_image(glance_client,
+ image_name,
+ file_path,
+ disk="qcow2",
+ extra_properties={},
+ container="bare",
+ public="public"):
if not os.path.isfile(file_path):
logger.error("Error: file %s does not exist." % file_path)
return None
image = glance_client.images.create(name=image_name,
visibility=public,
disk_format=disk,
- container_format=container)
+ container_format=container,
+ **extra_properties)
image_id = image.id
with open(file_path) as image_data:
glance_client.images.upload(image_id, image_data)
return None
-def get_or_create_image(name, path, format):
+def get_or_create_image(name, path, format, extra_properties):
image_exists = False
glance_client = get_glance_client()
image_exists = True
else:
logger.info("Creating image '%s' from '%s'..." % (name, path))
- image_id = create_glance_image(glance_client, name, path, format)
+ image_id = create_glance_image(glance_client,
+ name,
+ path,
+ format,
+ extra_properties)
if not image_id:
logger.error("Failed to create a Glance image...")
else:
return image_exists, image_id
-def delete_glance_image(nova_client, image_id):
+def delete_glance_image(glance_client, image_id):
try:
- nova_client.images.delete(image_id)
+ glance_client.images.delete(image_id)
return True
except Exception as e:
- logger.error("Error [delete_glance_image(nova_client, '%s')]: %s"
+ logger.error("Error [delete_glance_image(glance_client, '%s')]: %s"
% (image_id, e))
return False
return None
-def list_volume_types(cinder_client, public=True, private=True):
- try:
- volume_types = cinder_client.volume_types.list()
- if not public:
- volume_types = [vt for vt in volume_types if not vt.is_public]
- if not private:
- volume_types = [vt for vt in volume_types if vt.is_public]
- return volume_types
- except Exception as e:
- logger.error("Error [list_volume_types(cinder_client)]: %s" % e)
- return None
-
-
-def create_volume_type(cinder_client, name):
- try:
- volume_type = cinder_client.volume_types.create(name)
- return volume_type
- except Exception as e:
- logger.error("Error [create_volume_type(cinder_client, '%s')]: %s"
- % (name, e))
- return None
-
-
def update_cinder_quota(cinder_client, tenant_id, vols_quota,
snapshots_quota, gigabytes_quota):
quotas_values = {"volumes": vols_quota,
return False
-def delete_volume_type(cinder_client, volume_type):
- try:
- cinder_client.volume_types.delete(volume_type)
- return True
- except Exception as e:
- logger.error("Error [delete_volume_type(cinder_client, '%s')]: %s"
- % (volume_type, e))
- return False
-
-
# *********************************************
# KEYSTONE
# *********************************************
return id
+def get_domain_id(keystone_client, domain_name):
+ domains = keystone_client.domains.list()
+ id = ''
+ for d in domains:
+ if d.name == domain_name:
+ id = d.id
+ break
+ return id
+
+
def create_tenant(keystone_client, tenant_name, tenant_description):
try:
if is_keystone_v3():
+ domain_name = os.environ['OS_PROJECT_DOMAIN_NAME']
+ domain_id = get_domain_id(keystone_client, domain_name)
tenant = keystone_client.projects.create(
name=tenant_name,
description=tenant_description,
- domain="default",
+ domain=domain_id,
enabled=True)
else:
tenant = keystone_client.tenants.create(tenant_name,
try:
user_id = get_user_id(keystone_client, vnf_ref)
tenant_id = get_tenant_id(keystone_client, vnf_ref)
+ created = False
if not user_id:
user_id = create_user(keystone_client, vnf_ref, vnf_ref,
"", tenant_id)
- return True
- else:
- return False
- add_role_user(keystone_client, user_id, 'admin', vnf_ref)
+ created = True
+ try:
+ role_id = get_role_id(keystone_client, 'admin')
+ tenant_id = get_tenant_id(keystone_client, vnf_ref)
+ add_role_user(keystone_client, user_id, role_id, tenant_id)
+ except:
+ logger.warn("Cannot associate user to role admin on tenant")
+ return created
except:
raise Exception("Impossible to create a user for the VNF {}".format(
vnf_ref))