import os
+import os.path
import urllib2
import subprocess
import sys
+import requests
+import json
+import shutil
from git import Repo
"""
Check if the OpenStack credentials (openrc) are sourced
"""
- # TODO: there must be a short way to do this
- # doing if os.environ["something"] == "" throws an error
- try:
- os.environ['OS_AUTH_URL']
- except KeyError:
- return False
- try:
- os.environ['OS_USERNAME']
- except KeyError:
- return False
- try:
- os.environ['OS_PASSWORD']
- except KeyError:
- return False
- try:
- os.environ['OS_TENANT_NAME']
- except KeyError:
- return False
- return True
+ env_vars = ['OS_AUTH_URL','OS_USERNAME','OS_PASSWORD','OS_TENANT_NAME']
+ return all(map(lambda v: v in os.environ and os.environ[v], env_vars))
def get_credentials(service):
# ################ NOVA #################
+def get_instances(nova_client):
+ try:
+ instances = nova_client.servers.list(search_opts={'all_tenants': 1})
+ return instances
+ except:
+ return None
+
def get_instance_status(nova_client, instance):
try:
instance = nova_client.servers.get(instance.id)
except:
return None
-
def get_instance_by_name(nova_client, instance_name):
try:
instance = nova_client.servers.find(name=instance_name)
return None
+
def get_flavor_id(nova_client, flavor_name):
flavors = nova_client.flavors.list(detailed=True)
id = ''
return id
+def delete_instance(nova_client, instance_id):
+ try:
+ nova_client.servers.force_delete(instance_id)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+
+def get_floating_ips(nova_client):
+ try:
+ floating_ips = nova_client.floating_ips.list()
+ return floating_ips
+ except:
+ return None
+
+def delete_floating_ip(nova_client, floatingip_id):
+ try:
+ nova_client.floating_ips.delete(floatingip_id)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
+ return None
+
+
# ################ NEUTRON #################
def create_neutron_net(neutron_client, name):
json_body = {'network': {'name': name,
return False
+def update_neutron_net(neutron_client, network_id, shared=False):
+ json_body = {'network': {'shared': shared}}
+ try:
+ neutron_client.update_network(network_id, body=json_body)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+
def delete_neutron_net(neutron_client, network_id):
try:
neutron_client.delete_network(network_id)
print "Error:", sys.exc_info()[0]
return False
+def remove_gateway_router(neutron_client, router_id):
+ try:
+ neutron_client.remove_gateway_router(router_id)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
def create_neutron_port(neutron_client, name, network_id, ip):
json_body = {'port': {
return False
+def update_neutron_port(neutron_client, port_id, device_owner):
+ json_body = {'port': {
+ 'device_owner': device_owner,
+ }}
+ try:
+ port = neutron_client.update_port(port=port_id,
+ body=json_body)
+ return port['port']['id']
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+
+def delete_neutron_port(neutron_client, port_id):
+ try:
+ neutron_client.delete_port(port_id)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+
def get_network_id(neutron_client, network_name):
networks = neutron_client.list_networks()['networks']
id = ''
return network_list
+def get_router_list(neutron_client):
+ router_list = neutron_client.list_routers()['routers']
+ if len(router_list) == 0:
+ return None
+ else:
+ return router_list
+
+def get_port_list(neutron_client):
+ port_list = neutron_client.list_ports()['ports']
+ if len(port_list) == 0:
+ return None
+ else:
+ return port_list
+
+
+
def get_external_net(neutron_client):
for network in neutron_client.list_networks()['networks']:
if network['router:external']:
print "Error:", sys.exc_info()[0]
return False
+def update_cinder_quota(cinder_client, tenant_id, vols_quota, snapshots_quota,gigabytes_quota):
+ quotas_values = {
+ "volumes": vols_quota,
+ "snapshots": snapshots_quota,
+ "gigabytes": gigabytes_quota
+ }
+
+ try:
+ quotas_default=cinder_client.quotas.update(tenant_id,**quotas_values)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+def get_private_net(neutron_client):
+ # Checks if there is an existing shared private network
+ networks = neutron_client.list_networks()['networks']
+ if len(networks) == 0:
+ return None
+ for net in networks:
+ if (net['router:external'] == False) and (net['shared'] == True):
+ return net
+ return None
+
# ################ GLANCE #################
+def get_images(nova_client):
+ try:
+ images = nova_client.images.list()
+ return images
+ except:
+ return None
def get_image_id(glance_client, image_name):
return id
-def create_glance_image(glance_client, image_name, file_path, is_public=True):
+def create_glance_image(glance_client, image_name, file_path, public=True):
+ if not os.path.isfile(file_path):
+ print "Error: file "+file_path+" does not exist."
+ return False
try:
with open(file_path) as fimage:
image = glance_client.images.create(name=image_name,
- is_public=is_public,
+ is_public=public,
disk_format="qcow2",
container_format="bare",
data=fimage)
return image.id
except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+def delete_glance_image(nova_client, image_id):
+ try:
+ nova_client.images.delete(image_id)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+# ################ CINDER #################
+def get_volumes(cinder_client):
+ try:
+ volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1})
+ return volumes
+ except:
+ return None
+
+def delete_volume(cinder_client, volume_id):
+ try:
+ cinder_client.volumes.delete(volume_id)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+# ################ CINDER #################
+def get_security_groups(neutron_client):
+ try:
+ security_groups = neutron_client.list_security_groups()['security_groups']
+ return security_groups
+ except:
+ return None
+
+def delete_security_group(neutron_client, secgroup_id):
+ try:
+ neutron_client.delete_security_group(secgroup_id)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
return False
# ################ KEYSTONE #################
+def get_tenants(keystone_client):
+ try:
+ tenants = keystone_client.tenants.list()
+ return tenants
+ except:
+ return None
+
+
def get_tenant_id(keystone_client, tenant_name):
tenants = keystone_client.tenants.list()
id = ''
break
return id
+def get_users(keystone_client):
+ try:
+ users = keystone_client.users.list()
+ return users
+ except:
+ return None
def get_role_id(keystone_client, role_name):
roles = keystone_client.roles.list()
return False
with open(dest, 'wb') as f:
- f.write(response.read())
+ shutil.copyfileobj(response, f)
return True
"""
repo = Repo(repo_path)
branch = repo.active_branch
- print branch.name
- return branch
+ return branch.name
def get_installer_type(logger=None):
installer = "Unkown"
return installer
+
+
+def get_pod_name(logger=None):
+ """
+ Get PoD Name from env variable NODE_NAME
+ """
+ try:
+ return os.environ['NODE_NAME']
+ except KeyError:
+ if logger:
+ logger.error("Unable to retrieve the POD name from environment.Using pod name 'unknown-pod'")
+ return "unknown-pod"
+
+
+def push_results_to_db(db_url, case_name, logger, pod_name, git_version, payload):
+ url = db_url + "/results"
+ installer = get_installer_type(logger)
+ params = {"project_name": "functest", "case_name": case_name,
+ "pod_name": pod_name, "installer": installer,
+ "version": git_version, "details": payload}
+
+ headers = {'Content-Type': 'application/json'}
+ try:
+ r = requests.post(url, data=json.dumps(params), headers=headers)
+ logger.debug(r)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False