from identity_auth import get_session
from os_clients import glance_client
-import logger as doctor_log
-IMAGE_OPTS = [
- cfg.StrOpt('name',
+OPTS = [
+ cfg.StrOpt('image_name',
default=os.environ.get('IMAGE_NAME', 'cirros'),
help='the name of test image',
required=True),
- cfg.StrOpt('format',
+ cfg.StrOpt('image_format',
default='qcow2',
help='the format of test image',
required=True),
- cfg.StrOpt('file_name',
+ cfg.StrOpt('image_filename',
default='cirros.img',
help='the name of image file',
required=True),
- cfg.StrOpt('url',
+ cfg.StrOpt('image_download_url',
default='https://launchpad.net/cirros/trunk/0.3.0/+download/cirros-0.3.0-x86_64-disk.img',
help='the url where to get the image',
required=True),
]
-LOG = doctor_log.Logger('doctor').getLogger()
-
class Image(object):
- def __init__(self, conf):
+ def __init__(self, conf, log):
self.conf = conf
+ self.log = log
self.glance = \
- glance_client(conf.os_clients.glance_version,
- get_session())
+ glance_client(conf.glance_version, get_session())
self.use_existing_image = False
self.image = None
def create(self):
- LOG.info('image create start......')
+ self.log.info('image create start......')
images = {image.name: image for image in self.glance.images.list()}
- if self.conf.image.name not in images:
- if not os.path.exists(self.conf.image.file_name):
- resp = urllib2.urlopen(self.conf.image.url)
- with open(self.conf.image.file_name, "wb") as file:
+ if self.conf.image_name not in images:
+ if not os.path.exists(self.conf.image_filename):
+ resp = urllib2.urlopen(self.conf.image_download_url)
+ with open(self.conf.image_filename, "wb") as file:
file.write(resp.read())
- self.image = self.glance.images.create(name=self.conf.image.name,
- disk_format=self.conf.image.format,
+ self.image = self.glance.images.create(name=self.conf.image_name,
+ disk_format=self.conf.image_format,
container_format="bare",
visibility="public")
self.glance.images.upload(self.image['id'],
- open(self.conf.image.file_name, 'rb'))
+ open(self.conf.image_filename, 'rb'))
else:
self.use_existing_image = True
- self.image = images[self.conf.image.name]
+ self.image = images[self.conf.image_name]
- LOG.info('image create end......')
+ self.log.info('image create end......')
def delete(self):
- LOG.info('image delete start.......')
+ self.log.info('image delete start.......')
if not self.use_existing_image and self.image:
self.glance.images.delete(self.image['id'])
- LOG.info('image delete end.......')
+ self.log.info('image delete end.......')
--- /dev/null
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import os
+
+from oslo_config import cfg
+
+from identity_auth import get_session
+from os_clients import keystone_client
+from os_clients import nova_client
+
+
+OPTS = [
+ cfg.StrOpt('doctor_user',
+ default='doctor',
+ help='the name of test user',
+ required=True),
+ cfg.StrOpt('doctor_passwd',
+ default='doctor',
+ help='the password of test user',
+ required=True),
+ cfg.StrOpt('doctor_project',
+ default='doctor',
+ help='the name of test project',
+ required=True),
+ cfg.StrOpt('doctor_role',
+ default='_member_',
+ help='the role of test user',
+ required=True),
+ cfg.IntOpt('quota_instances',
+ default=os.environ.get('VM_COUNT', 1),
+ help='the quota of instances in test user',
+ required=True),
+ cfg.IntOpt('quota_cores',
+ default=os.environ.get('VM_COUNT', 1),
+ help='the quota of cores in test user',
+ required=True),
+]
+
+
+class User(object):
+
+ def __init__(self, conf, log):
+ self.conf = conf
+ self.log = log
+ self.keystone = \
+ keystone_client(get_session())
+ self.nova = \
+ nova_client(conf.nova_version, get_session())
+ self.users = {}
+ self.projects = {}
+ self.roles = {}
+ self.roles_for_user = {}
+ self.roles_for_admin = {}
+
+ def create(self):
+ """create test user, project and etc"""
+ self.log.info('user create start......')
+
+ self._create_project()
+ self._create_user()
+ self._create_role()
+ self._add_user_role_in_project(is_admin=False)
+ self._add_user_role_in_project(is_admin=True)
+
+ self.log.info('user create end......')
+
+ def _create_project(self):
+ """create test project"""
+ self.projects = {project.name: project
+ for project in self.keystone.tenants.list()}
+ if self.conf.doctor_project not in self.projects:
+ test_project = \
+ self.keystone.tenants.create(self.conf.doctor_project)
+ self.projects[test_project.name] = test_project
+
+ def _create_user(self):
+ """create test user"""
+ project = self.projects.get(self.conf.doctor_project)
+ self.users = {user.name: user for user in self.keystone.users.list()}
+ if self.conf.doctor_user not in self.users:
+ test_user = self.keystone.users.create(
+ self.conf.doctor_user,
+ password=self.conf.doctor_passwd,
+ tenant_id=project.id)
+ self.users[test_user.name] = test_user
+
+ def _create_role(self):
+ """create test role"""
+ self.roles = {role.name: role for role in self.keystone.roles.list()}
+ if self.conf.doctor_role not in self.roles:
+ test_role = self.keystone.roles.create(self.conf.doctor_role)
+ self.roles[test_role.name] = test_role
+
+ def _add_user_role_in_project(self, is_admin=False):
+ """add test user with test role in test project"""
+ project = self.projects.get(self.conf.doctor_project)
+
+ user_name = 'admin' if is_admin else self.conf.doctor_user
+ user = self.users.get(user_name)
+
+ role_name = 'admin' if is_admin else self.conf.doctor_role
+ role = self.roles.get(role_name)
+
+ roles_for_user = self.roles_for_admin \
+ if is_admin else self.roles_for_user
+
+ roles_for_user = \
+ {role.name: role for role in
+ self.keystone.roles.roles_for_user(user, tenant=project)}
+ if role_name not in roles_for_user:
+ self.keystone.roles.add_user_role(user, role, tenant=project)
+ roles_for_user[role_name] = role
+
+ def delete(self):
+ """delete the test user, project and role"""
+ self.log.info('user delete start......')
+
+ project = self.projects.get(self.conf.doctor_project)
+ user = self.users.get(self.conf.doctor_user)
+ role = self.roles.get(self.conf.doctor_role)
+
+ if project:
+ if 'admin' in self.roles_for_admin:
+ self.keystone.roles.remove_user_role(
+ self.users['admin'],
+ self.roles['admin'],
+ tenant=project)
+
+ if user:
+ if role and self.conf.doctor_role in self.roles_for_user:
+ self.keystone.roles.remove_user_role(
+ user, role, tenant=project)
+ self.keystone.roles.delete(role)
+ self.keystone.users.delete(user)
+
+ self.keystone.tenants.delete(project)
+ self.log.info('user delete end......')
+
+ def update_quota(self):
+ self.log.info('user quota update start......')
+ project = self.projects.get(self.conf.doctor_project)
+ user = self.users.get(self.conf.doctor_user)
+
+ if project and user:
+ self.quota = self.nova.quotas.get(project.id,
+ user_id=user.id)
+ if self.conf.quota_instances > self.quota.instances:
+ self.nova.quotas.update(project.id,
+ instances=self.conf.quota_instances,
+ user_id=user.id)
+ if self.conf.quota_cores > self.quota.cores:
+ self.nova.quotas.update(project.id,
+ cores=self.conf.quota_cores,
+ user_id=user.id)
+ self.log.info('user quota update end......')
+ else:
+ raise Exception('No project or role for update quota')