X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=functest%2Futils%2Fopenstack_utils.py;h=929a761e0edb56113712122cdb3308f02a20e827;hb=0e12e2e5ac8f1ae46a1c19436675da5075ea8f44;hp=e33af63b40c45287ff5ebfd58d2afff259ed2665;hpb=4e0ebc01f1fa41d1b15f7d135c3769a051b1667c;p=functest.git diff --git a/functest/utils/openstack_utils.py b/functest/utils/openstack_utils.py old mode 100755 new mode 100644 index e33af63b4..929a761e0 --- a/functest/utils/openstack_utils.py +++ b/functest/utils/openstack_utils.py @@ -82,7 +82,8 @@ def get_env_cred_dict(): 'OS_PROJECT_DOMAIN_NAME': 'project_domain_name', 'OS_PROJECT_NAME': 'project_name', 'OS_ENDPOINT_TYPE': 'endpoint_type', - 'OS_REGION_NAME': 'region_name' + 'OS_REGION_NAME': 'region_name', + 'OS_CACERT': 'https_cacert' } return env_cred_dict @@ -149,6 +150,11 @@ def get_credentials_for_rally(): if region_name is not None: cred_key = env_cred_dict.get('OS_REGION_NAME') rally_conf[cred_key] = region_name + + cacert = os.getenv('OS_CACERT') + if cacert is not None: + cred_key = env_cred_dict.get('OS_CACERT') + rally_conf[cred_key] = cacert return rally_conf @@ -168,7 +174,14 @@ def get_endpoint(service_type, endpoint_type='publicURL'): def get_session(other_creds={}): auth = get_session_auth(other_creds) - return session.Session(auth=auth) + cacert = os.getenv('OS_CACERT') + if cacert is not None: + if not os.path.isfile(cacert): + raise Exception("The 'OS_CACERT' environment" + "variable is set to %s but the file" + "does not exist.", cacert) + + return session.Session(auth=auth, verify=cacert) # ********************************************* @@ -1041,6 +1054,40 @@ def create_secgroup_rule(neutron_client, sg_id, direction, protocol, return False +def get_security_group_rules(neutron_client, sg_id): + try: + security_rules = neutron_client.list_security_group_rules()[ + 'security_group_rules'] + security_rules = [rule for rule in security_rules + if rule["security_group_id"] == sg_id] + return security_rules + except Exception, e: + logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:" + " %s" % e) + return None + + +def check_security_group_rules(neutron_client, sg_id, direction, protocol, + port_min=None, port_max=None): + try: + security_rules = get_security_group_rules(neutron_client, sg_id) + security_rules = [rule for rule in security_rules + if (rule["direction"].lower() == direction + and rule["protocol"].lower() == protocol + and rule["port_range_min"] == port_min + and rule["port_range_max"] == port_max)] + if len(security_rules) == 0: + return True + else: + return False + except Exception, e: + logger.error("Error [check_security_group_rules(" + " neutron_client, sg_id, direction," + " protocol, port_min=None, port_max=None)]: " + "%s" % e) + return None + + def create_security_group_full(neutron_client, sg_name, sg_description): sg_id = get_security_group_id(neutron_client, sg_name) @@ -1347,6 +1394,15 @@ def create_tenant(keystone_client, tenant_name, tenant_description): return None +def get_or_create_tenant(keystone_client, tenant_name, tenant_description): + tenant_id = get_tenant_id(keystone_client, tenant_name) + if not tenant_id: + tenant_id = create_tenant(keystone_client, tenant_name, + tenant_description) + + return tenant_id + + def create_user(keystone_client, user_name, user_password, user_email, tenant_id): try: @@ -1370,6 +1426,15 @@ def create_user(keystone_client, user_name, user_password, return None +def get_or_create_user(keystone_client, user_name, user_password, + tenant_id, user_email=None): + user_id = get_user_id(keystone_client, user_name) + if not user_id: + user_id = create_user(keystone_client, user_name, user_password, + user_email, tenant_id) + return user_id + + def add_role_user(keystone_client, user_id, role_id, tenant_id): try: if is_keystone_v3():