'OS_PROJECT_DOMAIN_NAME': 'project_domain_name',
'OS_PROJECT_NAME': 'project_name',
'OS_ENDPOINT_TYPE': 'endpoint_type',
- 'OS_REGION_NAME': 'region_name'
+ 'OS_REGION_NAME': 'region_name',
+ 'OS_CACERT': 'https_cacert'
}
return env_cred_dict
if region_name is not None:
cred_key = env_cred_dict.get('OS_REGION_NAME')
rally_conf[cred_key] = region_name
+
+ cacert = os.getenv('OS_CACERT')
+ if cacert is not None:
+ cred_key = env_cred_dict.get('OS_CACERT')
+ rally_conf[cred_key] = cacert
return rally_conf
def get_session(other_creds={}):
auth = get_session_auth(other_creds)
- return session.Session(auth=auth)
+ cacert = os.getenv('OS_CACERT')
+ if cacert is not None:
+ if not os.path.isfile(cacert):
+ raise Exception("The 'OS_CACERT' environment"
+ "variable is set to %s but the file"
+ "does not exist.", cacert)
+
+ return session.Session(auth=auth, verify=cacert)
# *********************************************
return False
+def get_security_group_rules(neutron_client, sg_id):
+ try:
+ security_rules = neutron_client.list_security_group_rules()[
+ 'security_group_rules']
+ security_rules = [rule for rule in security_rules
+ if rule["security_group_id"] == sg_id]
+ return security_rules
+ except Exception, e:
+ logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:"
+ " %s" % e)
+ return None
+
+
+def check_security_group_rules(neutron_client, sg_id, direction, protocol,
+ port_min=None, port_max=None):
+ try:
+ security_rules = get_security_group_rules(neutron_client, sg_id)
+ security_rules = [rule for rule in security_rules
+ if (rule["direction"].lower() == direction
+ and rule["protocol"].lower() == protocol
+ and rule["port_range_min"] == port_min
+ and rule["port_range_max"] == port_max)]
+ if len(security_rules) == 0:
+ return True
+ else:
+ return False
+ except Exception, e:
+ logger.error("Error [check_security_group_rules("
+ " neutron_client, sg_id, direction,"
+ " protocol, port_min=None, port_max=None)]: "
+ "%s" % e)
+ return None
+
+
def create_security_group_full(neutron_client,
sg_name, sg_description):
sg_id = get_security_group_id(neutron_client, sg_name)
return None
+def get_or_create_tenant(keystone_client, tenant_name, tenant_description):
+ tenant_id = get_tenant_id(keystone_client, tenant_name)
+ if not tenant_id:
+ tenant_id = create_tenant(keystone_client, tenant_name,
+ tenant_description)
+
+ return tenant_id
+
+
def create_user(keystone_client, user_name, user_password,
user_email, tenant_id):
try:
return None
+def get_or_create_user(keystone_client, user_name, user_password,
+ tenant_id, user_email=None):
+ user_id = get_user_id(keystone_client, user_name)
+ if not user_id:
+ user_id = create_user(keystone_client, user_name, user_password,
+ user_email, tenant_id)
+ return user_id
+
+
def add_role_user(keystone_client, user_id, role_id, tenant_id):
try:
if is_keystone_v3():