from cinderclient import client as cinderclient
import functest.utils.functest_logger as ft_logger
+import functest.utils.functest_utils as ft_utils
from glanceclient import client as glanceclient
from keystoneclient.v2_0 import client as keystoneclient
from neutronclient.v2_0 import client as neutronclient
"auth_url": os.environ.get("OS_AUTH_URL"),
tenant: os.environ.get("OS_TENANT_NAME")
})
+ if os.getenv('OS_ENDPOINT_TYPE') is not None:
+ creds.update({
+ "endpoint_type": os.environ.get("OS_ENDPOINT_TYPE")
+ })
+ if os.getenv('OS_REGION_NAME') is not None:
+ creds.update({
+ "region_name": os.environ.get("OS_REGION_NAME")
+ })
cacert = os.environ.get("OS_CACERT")
if cacert is not None:
# each openstack client uses differnt kwargs for this
return env
+def get_credentials_for_rally():
+ creds = get_credentials("keystone")
+ admin_keys = ['username', 'tenant_name', 'password']
+ endpoint_types = [('internalURL', 'internal'),
+ ('publicURL', 'public'), ('adminURL', 'admin')]
+ if 'endpoint_type' in creds.keys():
+ for k, v in endpoint_types:
+ if creds['endpoint_type'] == k:
+ creds['endpoint_type'] = v
+ rally_conf = {"type": "ExistingCloud", "admin": {}}
+ for key in creds:
+ if key in admin_keys:
+ rally_conf['admin'][key] = creds[key]
+ else:
+ rally_conf[key] = creds[key]
+ return rally_conf
+
+
# *********************************************
# CLIENTS
# *********************************************
def get_cinder_client():
creds_cinder = get_credentials("cinder")
- return cinderclient.Client('2', creds_cinder['username'],
- creds_cinder['api_key'],
- creds_cinder['project_id'],
- creds_cinder['auth_url'],
- service_type="volume")
+ creds_cinder.update({
+ "service_type": "volume"
+ })
+ return cinderclient.Client('2', **creds_cinder)
def get_neutron_client():
def get_glance_client():
keystone_client = get_keystone_client()
+ glance_endpoint_type = 'publicURL'
+ os_endpoint_type = os.getenv('OS_ENDPOINT_TYPE')
+ if os_endpoint_type is not None:
+ glance_endpoint_type = os_endpoint_type
glance_endpoint = keystone_client.service_catalog.url_for(
- service_type='image', endpoint_type='publicURL')
+ service_type='image', endpoint_type=glance_endpoint_type)
return glanceclient.Client(1, glance_endpoint,
token=keystone_client.auth_token)
return id
+def create_flavor(nova_client, flavor_name, ram, disk, vcpus):
+ try:
+ flavor = nova_client.flavors.create(flavor_name, ram, vcpus, disk)
+ try:
+ extra_specs = ft_utils.get_functest_config(
+ 'general.flavor_extra_specs')
+ flavor.set_keys(extra_specs)
+ except ValueError:
+ # flavor extra specs are not configured, therefore skip the update
+ pass
+
+ except Exception, e:
+ logger.error("Error [create_flavor(nova_client, '%s', '%s', '%s', "
+ "'%s')]: %s" % (flavor_name, ram, disk, vcpus, e))
+ return None
+ return flavor.id
+
+
+def get_or_create_flavor(flavor_name, ram, disk, vcpus):
+ flavor_exists = False
+ nova_client = get_nova_client()
+
+ flavor_id = get_flavor_id(nova_client, flavor_name)
+ if flavor_id != '':
+ logger.info("Using existing flavor '%s'..." % flavor_name)
+ flavor_exists = True
+ else:
+ logger.info("Creating flavor '%s' with '%s' RAM, '%s' disk size, "
+ "'%s' vcpus..." % (flavor_name, ram, disk, vcpus))
+ flavor_id = create_flavor(nova_client, flavor_name, ram, disk, vcpus)
+ if not flavor_id:
+ logger.error("Failed to create flavor '%s'..." % (flavor_name))
+ else:
+ logger.debug("Flavor '%s' with ID=%s created successfully."
+ % (flavor_name, flavor_id))
+
+ return flavor_exists, flavor_id
+
+
def get_floating_ips(nova_client):
try:
floating_ips = nova_client.floating_ips.list()
return None
-def create_flavor(nova_client, flavor_name, ram, disk, vcpus):
- try:
- flavor = nova_client.flavors.create(flavor_name, ram, vcpus, disk)
- except Exception, e:
- logger.error("Error [create_flavor(nova_client, '%s', '%s', '%s', "
- "'%s')]: %s" % (flavor_name, ram, disk, vcpus, e))
- return None
- return flavor.id
-
-
def create_instance(flavor_name,
image_id,
network_id,
flavors = nova_client.flavors.list()
logger.error("Error: Flavor '%s' not found. Available flavors are: "
"\n%s" % (flavor_name, flavors))
- return -1
+ return None
if fixed_ip is not None:
nics = {"net-id": network_id, "v4-fixed-ip": fixed_ip}
else:
nova_client.floating_ips.delete(floatingip_id)
return True
except Exception, e:
- logger.error("Error [delete_floating_ip(nova_client, '%s')]:"
+ logger.error("Error [delete_floating_ip(nova_client, '%s')]: %s"
% (floatingip_id, e))
return False
for network in neutron_client.list_networks()['networks']:
if network['router:external']:
return network['name']
- return False
+ return None
def get_external_net_id(neutron_client):
for network in neutron_client.list_networks()['networks']:
if network['router:external']:
return network['id']
- return False
+ return None
def check_neutron_net(neutron_client, net_name):
except Exception, e:
logger.error("Error [create_neutron_net(neutron_client, '%s')]: %s"
% (name, e))
- return False
+ return None
def create_neutron_subnet(neutron_client, name, cidr, net_id):
except Exception, e:
logger.error("Error [create_neutron_subnet(neutron_client, '%s', "
"'%s', '%s')]: %s" % (name, cidr, net_id, e))
- return False
+ return None
def create_neutron_router(neutron_client, name):
except Exception, e:
logger.error("Error [create_neutron_router(neutron_client, '%s')]: %s"
% (name, e))
- return False
+ return None
def create_neutron_port(neutron_client, name, network_id, ip):
except Exception, e:
logger.error("Error [create_neutron_port(neutron_client, '%s', '%s', "
"'%s')]: %s" % (name, network_id, ip, e))
- return False
+ return None
def update_neutron_net(neutron_client, network_id, shared=False):
except Exception, e:
logger.error("Error [update_neutron_port(neutron_client, '%s', '%s')]:"
" %s" % (port_id, device_owner, e))
- return False
+ return None
def add_interface_router(neutron_client, router_id, subnet_id):
subnet_id = create_neutron_subnet(neutron_client, subnet_name,
cidr, network_id)
if not subnet_id:
- return False
+ return None
logger.debug("Subnet '%s' created successfully" % subnet_id)
logger.debug('Creating Router...')
router_id = create_neutron_router(neutron_client, router_name)
if not router_id:
- return False
+ return None
logger.debug("Router '%s' created successfully" % router_id)
logger.debug('Adding router to subnet...')
if not add_interface_router(neutron_client, router_id, subnet_id):
- return False
+ return None
logger.debug("Interface added successfully.")
logger.debug('Adding gateway to router...')
if not add_gateway_router(neutron_client, router_id):
- return False
+ return None
logger.debug("Gateway added successfully.")
return network_dic
+def create_shared_network_full(net_name, subnt_name, router_name, subnet_cidr):
+ neutron_client = get_neutron_client()
+
+ network_dic = create_network_full(neutron_client,
+ net_name,
+ subnt_name,
+ router_name,
+ subnet_cidr)
+ if network_dic:
+ if not update_neutron_net(neutron_client,
+ network_dic['net_id'],
+ shared=True):
+ logger.error("Failed to update network %s..." % net_name)
+ return None
+ else:
+ logger.debug("Network '%s' is available..." % net_name)
+ else:
+ logger.error("Network %s creation failed" % net_name)
+ return None
+ return network_dic
+
+
def create_bgpvpn(neutron_client, **kwargs):
# route_distinguishers
# route_targets
return neutron_client.create_network_association(bgpvpn_id, json_body)
+def create_router_association(neutron_client, bgpvpn_id, router_id):
+ json_body = {"router_association": {"router_id": router_id}}
+ return neutron_client.create_router_association(bgpvpn_id, json_body)
+
+
def update_bgpvpn(neutron_client, bgpvpn_id, **kwargs):
json_body = {"bgpvpn": kwargs}
return neutron_client.update_bgpvpn(bgpvpn_id, json_body)
except Exception, e:
logger.error("Error [create_security_group(neutron_client, '%s', "
"'%s')]: %s" % (sg_name, sg_description, e))
- return False
+ return None
def create_secgroup_rule(neutron_client, sg_id, direction, protocol,
sg_description)
if not SECGROUP:
logger.error("Failed to create the security group...")
- return False
+ return None
sg_id = SECGROUP['id']
if not create_secgroup_rule(neutron_client, sg_id,
'ingress', 'icmp'):
logger.error("Failed to create the security group rule...")
- return False
+ return None
logger.debug("Adding SSH rules in security group '%s'..."
% sg_name)
if not create_secgroup_rule(
neutron_client, sg_id, 'ingress', 'tcp', '22', '22'):
logger.error("Failed to create the security group rule...")
- return False
+ return None
if not create_secgroup_rule(
neutron_client, sg_id, 'egress', 'tcp', '22', '22'):
logger.error("Failed to create the security group rule...")
- return False
+ return None
return sg_id
container="bare", public=True):
if not os.path.isfile(file_path):
logger.error("Error: file %s does not exist." % file_path)
- return False
+ return None
try:
image_id = get_image_id(glance_client, image_name)
if image_id != '':
if logger:
logger.info("Creating image '%s' from '%s'..." % (image_name,
file_path))
+ try:
+ properties = ft_utils.get_functest_config(
+ 'general.image_properties')
+ except ValueError:
+ # image properties are not configured
+ # therefore don't add any properties
+ properties = {}
with open(file_path) as fimage:
image = glance_client.images.create(name=image_name,
is_public=public,
disk_format=disk,
container_format=container,
+ properties=properties,
data=fimage)
image_id = image.id
return image_id
except Exception, e:
logger.error("Error [create_glance_image(glance_client, '%s', '%s', "
"'%s')]: %s" % (image_name, file_path, str(public), e))
- return False
+ return None
+
+
+def get_or_create_image(name, path, format):
+ image_exists = False
+ glance_client = get_glance_client()
+
+ image_id = get_image_id(glance_client, name)
+ if image_id != '':
+ logger.info("Using existing image '%s'..." % name)
+ image_exists = True
+ else:
+ logger.info("Creating image '%s' from '%s'..." % (name, path))
+ image_id = create_glance_image(glance_client, name, path, format)
+ if not image_id:
+ logger.error("Failed to create a Glance image...")
+ else:
+ logger.debug("Image '%s' with ID=%s created successfully."
+ % (name, image_id))
+
+ return image_exists, image_id
def delete_glance_image(nova_client, image_id):
enabled=True)
return tenant.id
except Exception, e:
- logger.error("Error [create_tenant(cinder_client, '%s', '%s')]: %s"
+ logger.error("Error [create_tenant(keystone_client, '%s', '%s')]: %s"
% (tenant_name, tenant_description, e))
- return False
+ return None
def create_user(keystone_client, user_name, user_password,
logger.error("Error [create_user(keystone_client, '%s', '%s', '%s'"
"'%s')]: %s" % (user_name, user_password,
user_email, tenant_id, e))
- return False
+ return None
def add_role_user(keystone_client, user_id, role_id, tenant_id):