from doctor_tests.identity_auth import get_session
from doctor_tests.os_clients import keystone_client
from doctor_tests.os_clients import nova_client
+from keystoneclient import exceptions as ks_exceptions
OPTS = [
default='_member_',
help='the role of test user',
required=True),
+ cfg.StrOpt('doctor_domain_id',
+ default=os.environ.get('OS_PROJECT_DOMAIN_ID', 'default'),
+ help='the domain id of the doctor project',
+ required=True),
cfg.IntOpt('quota_instances',
default=os.environ.get('VM_COUNT', 1),
help='the quota of instances in test user',
def __init__(self, conf, log):
self.conf = conf
self.log = log
- self.keystone = \
- keystone_client(get_session())
+ self.keystone = keystone_client(
+ self.conf.keystone_version, get_session())
self.nova = \
nova_client(conf.nova_version, get_session())
self.users = {}
self.projects = {}
self.roles = {}
+ self.use_exist_role = False
self.roles_for_user = {}
self.roles_for_admin = {}
def _create_project(self):
"""create test project"""
- self.projects = {project.name: project
- for project in self.keystone.tenants.list()}
+ self.projects = {project.name: project for project in
+ self.keystone.projects.list(
+ domain=self.conf.doctor_domain_id)}
if self.conf.doctor_project not in self.projects:
+ self.log.info('create project......')
test_project = \
- self.keystone.tenants.create(self.conf.doctor_project)
+ self.keystone.projects.create(
+ self.conf.doctor_project,
+ self.conf.doctor_domain_id)
self.projects[test_project.name] = test_project
+ else:
+ self.log.info('project %s already created......'
+ % self.conf.doctor_project)
+ self.log.info('test project %s'
+ % str(self.projects[self.conf.doctor_project]))
def _create_user(self):
"""create test user"""
- project = self.projects.get(self.conf.doctor_project)
- self.users = {user.name: user for user in self.keystone.users.list()}
+ self.users = {user.name: user for user in
+ self.keystone.users.list(
+ domain=self.conf.doctor_domain_id)}
if self.conf.doctor_user not in self.users:
+ self.log.info('create user......')
test_user = self.keystone.users.create(
self.conf.doctor_user,
password=self.conf.doctor_passwd,
- tenant_id=project.id)
+ domain=self.conf.doctor_domain_id)
self.users[test_user.name] = test_user
+ else:
+ self.log.info('user %s already created......'
+ % self.conf.doctor_user)
+ self.log.info('test user %s'
+ % str(self.users[self.conf.doctor_user]))
def _create_role(self):
"""create test role"""
- self.roles = {role.name: role for role in self.keystone.roles.list()}
+ self.roles = {role.name: role for role in
+ self.keystone.roles.list()}
if self.conf.doctor_role not in self.roles:
- test_role = self.keystone.roles.create(self.conf.doctor_role)
+ self.log.info('create role......')
+ test_role = self.keystone.roles.create(
+ self.conf.doctor_role)
self.roles[test_role.name] = test_role
+ else:
+ self.use_exist_role = True
+ self.log.info('role %s already created......'
+ % self.conf.doctor_role)
+ self.log.info('test role %s' % str(self.roles[self.conf.doctor_role]))
def _add_user_role_in_project(self, is_admin=False):
"""add test user with test role in test project"""
+
project = self.projects.get(self.conf.doctor_project)
user_name = 'admin' if is_admin else self.conf.doctor_user
roles_for_user = self.roles_for_admin \
if is_admin else self.roles_for_user
- roles_for_user = \
- {role.name: role for role in
- self.keystone.roles.roles_for_user(user, tenant=project)}
- if role_name not in roles_for_user:
- self.keystone.roles.add_user_role(user, role, tenant=project)
+ try:
+ self.keystone.roles.check(role, user=user, project=project)
+ self.log.info('Already grant a role:%s to user: %s on'
+ ' project: %s'
+ % (role_name, user_name,
+ self.conf.doctor_project))
+ except ks_exceptions.NotFound:
+ self.keystone.roles.grant(role, user=user, project=project)
roles_for_user[role_name] = role
def delete(self):
if project:
if 'admin' in self.roles_for_admin:
- self.keystone.roles.remove_user_role(
- self.users['admin'],
+ self.keystone.roles.revoke(
self.roles['admin'],
- tenant=project)
+ user=self.users['admin'],
+ project=project)
if user:
if role and self.conf.doctor_role in self.roles_for_user:
- self.keystone.roles.remove_user_role(
- user, role, tenant=project)
- self.keystone.roles.delete(role)
+ self.keystone.roles.revoke(
+ role, user=user, project=project)
+ if not self.use_exist_role:
+ self.keystone.roles.delete(role)
self.keystone.users.delete(user)
- self.keystone.tenants.delete(project)
+ self.keystone.projects.delete(project)
self.log.info('user delete end......')
def update_quota(self):
self.quota = self.nova.quotas.get(project.id,
user_id=user.id)
if self.conf.quota_instances > self.quota.instances:
- self.nova.quotas.update(project.id,
- instances=self.conf.quota_instances,
- user_id=user.id)
+ self.nova.quotas.update(
+ project.id,
+ instances=self.conf.quota_instances,
+ user_id=user.id)
if self.conf.quota_cores > self.quota.cores:
self.nova.quotas.update(project.id,
cores=self.conf.quota_cores,