1 ##############################################################################
2 # Copyright (c) 2017 ZTE Corporation and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
11 from oslo_config import cfg
13 from doctor_tests.identity_auth import get_session
14 from doctor_tests.os_clients import keystone_client
15 from doctor_tests.os_clients import nova_client
19 cfg.StrOpt('doctor_user',
21 help='the name of test user',
23 cfg.StrOpt('doctor_passwd',
25 help='the password of test user',
27 cfg.StrOpt('doctor_project',
29 help='the name of test project',
31 cfg.StrOpt('doctor_role',
33 help='the role of test user',
35 cfg.StrOpt('doctor_domain_id',
36 default=os.environ.get('OS_PROJECT_DOMAIN_ID', 'default'),
37 help='the domain id of the doctor project',
39 cfg.IntOpt('quota_instances',
40 default=os.environ.get('VM_COUNT', 1),
41 help='the quota of instances in test user',
43 cfg.IntOpt('quota_cores',
44 default=os.environ.get('VM_COUNT', 1),
45 help='the quota of cores in test user',
52 def __init__(self, conf, log):
55 self.keystone = keystone_client(
56 self.conf.keystone_version, get_session())
58 nova_client(conf.nova_version, get_session())
62 self.use_exist_role = False
63 self.roles_for_user = {}
64 self.roles_for_admin = {}
67 """create test user, project and etc"""
68 self.log.info('user create start......')
70 self._create_project()
73 self._add_user_role_in_project(is_admin=False)
74 self._add_user_role_in_project(is_admin=True)
76 self.log.info('user create end......')
78 def _create_project(self):
79 """create test project"""
80 self.projects = {project.name: project for project in
81 self.keystone.projects.list(
82 domain=self.conf.doctor_domain_id)}
83 if self.conf.doctor_project not in self.projects:
84 self.log.info('create project......')
86 self.keystone.projects.create(
87 self.conf.doctor_project,
88 self.conf.doctor_domain_id)
89 self.projects[test_project.name] = test_project
91 self.log.info('project %s already created......'
92 % self.conf.doctor_project)
93 self.log.info('test project %s'
94 % str(self.projects[self.conf.doctor_project]))
96 def _create_user(self):
97 """create test user"""
98 self.users = {user.name: user for user in
99 self.keystone.users.list(
100 domain=self.conf.doctor_domain_id)}
101 if self.conf.doctor_user not in self.users:
102 self.log.info('create user......')
103 test_user = self.keystone.users.create(
104 self.conf.doctor_user,
105 password=self.conf.doctor_passwd,
106 domain=self.conf.doctor_domain_id)
107 self.users[test_user.name] = test_user
109 self.log.info('user %s already created......'
110 % self.conf.doctor_user)
111 self.log.info('test user %s'
112 % str(self.users[self.conf.doctor_user]))
114 def _create_role(self):
115 """create test role"""
116 self.roles = {role.name: role for role in
117 self.keystone.roles.list()}
118 if self.conf.doctor_role not in self.roles:
119 self.log.info('create role......')
120 test_role = self.keystone.roles.create(
121 self.conf.doctor_role)
122 self.roles[test_role.name] = test_role
124 self.use_exist_role = True
125 self.log.info('role %s already created......'
126 % self.conf.doctor_role)
127 self.log.info('test role %s' % str(self.roles[self.conf.doctor_role]))
129 def _add_user_role_in_project(self, is_admin=False):
130 """add test user with test role in test project"""
132 project = self.projects.get(self.conf.doctor_project)
134 user_name = 'admin' if is_admin else self.conf.doctor_user
135 user = self.users.get(user_name)
137 role_name = 'admin' if is_admin else self.conf.doctor_role
138 role = self.roles.get(role_name)
140 roles_for_user = self.roles_for_admin \
141 if is_admin else self.roles_for_user
143 if not self.keystone.roles.check(role, user=user, project=project):
144 self.keystone.roles.grant(role, user=user, project=project)
145 roles_for_user[role_name] = role
147 self.log.info('Already grant a role:%s to user: %s on project: %s'
148 % (role_name, user_name, self.conf.doctor_project))
151 """delete the test user, project and role"""
152 self.log.info('user delete start......')
154 project = self.projects.get(self.conf.doctor_project)
155 user = self.users.get(self.conf.doctor_user)
156 role = self.roles.get(self.conf.doctor_role)
159 if 'admin' in self.roles_for_admin:
160 self.keystone.roles.revoke(
162 user=self.users['admin'],
166 if role and self.conf.doctor_role in self.roles_for_user:
167 self.keystone.roles.revoke(
168 role, user=user, project=project)
169 if not self.use_exist_role:
170 self.keystone.roles.delete(role)
171 self.keystone.users.delete(user)
173 self.keystone.projects.delete(project)
174 self.log.info('user delete end......')
176 def update_quota(self):
177 self.log.info('user quota update start......')
178 project = self.projects.get(self.conf.doctor_project)
179 user = self.users.get(self.conf.doctor_user)
182 self.quota = self.nova.quotas.get(project.id,
184 if self.conf.quota_instances > self.quota.instances:
185 self.nova.quotas.update(
187 instances=self.conf.quota_instances,
189 if self.conf.quota_cores > self.quota.cores:
190 self.nova.quotas.update(project.id,
191 cores=self.conf.quota_cores,
193 self.log.info('user quota update end......')
195 raise Exception('No project or role for update quota')