Rally test case troubleshooting
[functest.git] / testcases / functest_utils.py
index d09ae83..0734ccb 100644 (file)
 
 
 import os
+import os.path
 import urllib2
 import subprocess
 import sys
 import requests
 import json
+import shutil
 from git import Repo
 
 
@@ -60,6 +62,13 @@ def get_credentials(service):
 
 
 # ################ NOVA #################
+def get_instances(nova_client):
+    try:
+        instances = nova_client.servers.list(search_opts={'all_tenants': 1})
+        return instances
+    except:
+        return None
+
 def get_instance_status(nova_client, instance):
     try:
         instance = nova_client.servers.get(instance.id)
@@ -67,7 +76,6 @@ def get_instance_status(nova_client, instance):
     except:
         return None
 
-
 def get_instance_by_name(nova_client, instance_name):
     try:
         instance = nova_client.servers.find(name=instance_name)
@@ -76,6 +84,7 @@ def get_instance_by_name(nova_client, instance_name):
         return None
 
 
+
 def get_flavor_id(nova_client, flavor_name):
     flavors = nova_client.flavors.list(detailed=True)
     id = ''
@@ -96,6 +105,31 @@ def get_flavor_id_by_ram_range(nova_client, min_ram, max_ram):
     return id
 
 
+def delete_instance(nova_client, instance_id):
+    try:
+        nova_client.servers.force_delete(instance_id)
+        return True
+    except:
+        print "Error:", sys.exc_info()[0]
+        return False
+
+
+def get_floating_ips(nova_client):
+    try:
+        floating_ips = nova_client.floating_ips.list()
+        return floating_ips
+    except:
+        return None
+
+def delete_floating_ip(nova_client, floatingip_id):
+    try:
+        nova_client.floating_ips.delete(floatingip_id)
+        return True
+    except:
+        print "Error:", sys.exc_info()[0]
+        return None
+
+
 # ################ NEUTRON #################
 def create_neutron_net(neutron_client, name):
     json_body = {'network': {'name': name,
@@ -109,6 +143,16 @@ def create_neutron_net(neutron_client, name):
         return False
 
 
+def update_neutron_net(neutron_client, network_id, shared=False):
+    json_body = {'network': {'shared': shared}}
+    try:
+        neutron_client.update_network(network_id, body=json_body)
+        return True
+    except:
+        print "Error:", sys.exc_info()[0]
+        return False
+
+
 def delete_neutron_net(neutron_client, network_id):
     try:
         neutron_client.delete_network(network_id)
@@ -178,6 +222,14 @@ def remove_interface_router(neutron_client, router_id, subnet_id):
         print "Error:", sys.exc_info()[0]
         return False
 
+def remove_gateway_router(neutron_client, router_id):
+    try:
+        neutron_client.remove_gateway_router(router_id)
+        return True
+    except:
+        print "Error:", sys.exc_info()[0]
+        return False
+
 
 def create_neutron_port(neutron_client, name, network_id, ip):
     json_body = {'port': {
@@ -194,6 +246,19 @@ def create_neutron_port(neutron_client, name, network_id, ip):
         return False
 
 
+def update_neutron_port(neutron_client, port_id, device_owner):
+    json_body = {'port': {
+                 'device_owner': device_owner,
+                 }}
+    try:
+        port = neutron_client.update_port(port=port_id,
+                                          body=json_body)
+        return port['port']['id']
+    except:
+        print "Error:", sys.exc_info()[0]
+        return False
+
+
 def delete_neutron_port(neutron_client, port_id):
     try:
         neutron_client.delete_port(port_id)
@@ -229,6 +294,22 @@ def get_network_list(neutron_client):
         return network_list
 
 
+def get_router_list(neutron_client):
+    router_list = neutron_client.list_routers()['routers']
+    if len(router_list) == 0:
+        return None
+    else:
+        return router_list
+
+def get_port_list(neutron_client):
+    port_list = neutron_client.list_ports()['ports']
+    if len(port_list) == 0:
+        return None
+    else:
+        return port_list
+
+
+
 def get_external_net(neutron_client):
     for network in neutron_client.list_networks()['networks']:
         if network['router:external']:
@@ -250,7 +331,37 @@ def update_sg_quota(neutron_client, tenant_id, sg_quota, sg_rule_quota):
         print "Error:", sys.exc_info()[0]
         return False
 
+def update_cinder_quota(cinder_client, tenant_id, vols_quota, snapshots_quota,gigabytes_quota):
+    quotas_values = {
+            "volumes": vols_quota,
+            "snapshots": snapshots_quota,
+            "gigabytes": gigabytes_quota
+    }
+
+    try:
+        quotas_default=cinder_client.quotas.update(tenant_id,**quotas_values)
+        return True
+    except:
+        print "Error:", sys.exc_info()[0]
+        return False
+
+def get_private_net(neutron_client):
+    # Checks if there is an existing shared private network
+    networks = neutron_client.list_networks()['networks']
+    if len(networks) == 0:
+        return None
+    for net in networks:
+        if (net['router:external'] == False) and (net['shared'] == True):
+            return net
+    return None
+
 # ################ GLANCE #################
+def get_images(nova_client):
+    try:
+        images = nova_client.images.list()
+        return images
+    except:
+        return None
 
 
 def get_image_id(glance_client, image_name):
@@ -263,20 +374,72 @@ def get_image_id(glance_client, image_name):
     return id
 
 
-def create_glance_image(glance_client, image_name, file_path, is_public=True):
+def create_glance_image(glance_client, image_name, file_path, public=True):
+    if not os.path.isfile(file_path):
+        print "Error: file "+file_path+" does not exist."
+        return False
     try:
         with open(file_path) as fimage:
             image = glance_client.images.create(name=image_name,
-                                                is_public=is_public,
+                                                is_public=public,
                                                 disk_format="qcow2",
                                                 container_format="bare",
                                                 data=fimage)
         return image.id
     except:
+        print "Error:", sys.exc_info()[0]
+        return False
+
+def delete_glance_image(nova_client, image_id):
+    try:
+        nova_client.images.delete(image_id)
+        return True
+    except:
+        print "Error:", sys.exc_info()[0]
+        return False
+
+# ################ CINDER #################
+def get_volumes(cinder_client):
+    try:
+        volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1})
+        return volumes
+    except:
+        return None
+
+def delete_volume(cinder_client, volume_id):
+    try:
+        cinder_client.volumes.delete(volume_id)
+        return True
+    except:
+        print "Error:", sys.exc_info()[0]
+        return False
+
+# ################ CINDER #################
+def get_security_groups(neutron_client):
+    try:
+        security_groups = neutron_client.list_security_groups()['security_groups']
+        return security_groups
+    except:
+        return None
+
+def delete_security_group(neutron_client, secgroup_id):
+    try:
+        neutron_client.delete_security_group(secgroup_id)
+        return True
+    except:
+        print "Error:", sys.exc_info()[0]
         return False
 
 
 # ################ KEYSTONE #################
+def get_tenants(keystone_client):
+    try:
+        tenants = keystone_client.tenants.list()
+        return tenants
+    except:
+        return None
+
+
 def get_tenant_id(keystone_client, tenant_name):
     tenants = keystone_client.tenants.list()
     id = ''
@@ -286,6 +449,12 @@ def get_tenant_id(keystone_client, tenant_name):
             break
     return id
 
+def get_users(keystone_client):
+    try:
+        users = keystone_client.users.list()
+        return users
+    except:
+        return None
 
 def get_role_id(keystone_client, role_name):
     roles = keystone_client.roles.list()
@@ -381,7 +550,7 @@ def download_url(url, dest_path):
         return False
 
     with open(dest, 'wb') as f:
-        f.write(response.read())
+        shutil.copyfileobj(response, f)
     return True