- compute\_api\_version (Nova version 2 currently only validated)
- heat\_api\_version (Heat version 1 currently only validated)
- user\_domain\_id (default='default')
+- user\_domain\_name (default='default')
- project\_domain\_id (default='default')
+- project\_domain\_name (default='default')
- interface (default='admin', used to specify the endpoint type for keystone: public, admin, internal)
- cacert (default=False, expected values T|F to denote server certificate verification, else value contains the path to an HTTPS certificate)
- region_name (The region name default=None)
:return: The OpenStack Image object
"""
self.__glance = glance_utils.glance_client(self.__os_creds)
- self.__image = glance_utils.get_image(self.__glance,
- self.image_settings.name)
+ self.__image = glance_utils.get_image(
+ self.__glance, image_settings=self.image_settings)
if self.__image:
logger.info('Found image with name - ' + self.image_settings.name)
return self.__image
if self.image_settings.kernel_image_settings:
self.__kernel_image = glance_utils.get_image(
self.__glance,
- self.image_settings.kernel_image_settings.name)
+ image_settings=self.image_settings.kernel_image_settings)
if not self.__kernel_image and not cleanup:
logger.info(
if self.image_settings.ramdisk_image_settings:
self.__ramdisk_image = glance_utils.get_image(
self.__glance,
- self.image_settings.ramdisk_image_settings.name)
+ image_settings=self.image_settings.ramdisk_image_settings)
if not self.__ramdisk_image and not cleanup:
logger.info(
"""
self.__keystone = keystone_utils.keystone_client(self.__os_creds)
self.__project = keystone_utils.get_project(
- keystone=self.__keystone, project_name=self.project_settings.name)
+ keystone=self.__keystone, project_settings=self.project_settings)
if self.__project:
logger.info(
'Found project with name - ' + self.project_settings.name)
if self.admin_state_up is not None:
out['admin_state_up'] = self.admin_state_up
if self.external_gateway:
- ext_net = neutron_utils.get_network(neutron, self.external_gateway,
- project_id)
+ ext_net = neutron_utils.get_network(neutron, self.external_gateway)
if ext_net:
ext_gw['network_id'] = ext_net.id
out['external_gateway_info'] = ext_gw
clients
:param heat_api_version: The OpenStack's API version to use for Heat
clients
- :param user_domain_id: Used for v3 APIs (default='default')
- :param project_domain_id: Used for v3 APIs (default='default')
+ :param user_domain_id: Used for v3 APIs (default=None)
+ :param user_domain_name: Used for v3 APIs (default='default')
+ :param project_domain_id: Used for v3 APIs (default=None)
+ :param project_domain_name: Used for v3 APIs (default='default')
:param interface: Used to specify the endpoint type for keystone as
public, admin, internal
:param proxy_settings: instance of os_credentials.ProxySettings class
else:
self.heat_api_version = float(kwargs['heat_api_version'])
- if kwargs.get('user_domain_id') is None:
- self.user_domain_id = 'default'
+ self.user_domain_id = kwargs.get('user_domain_id')
+
+ if kwargs.get('user_domain_name') is None:
+ self.user_domain_name = 'default'
else:
- self.user_domain_id = kwargs['user_domain_id']
+ self.user_domain_name = kwargs['user_domain_name']
+
+ self.project_domain_id = kwargs.get('project_domain_id')
- if kwargs.get('project_domain_id') is None:
- self.project_domain_id = 'default'
+ if kwargs.get('project_domain_name') is None:
+ self.project_domain_name = 'default'
else:
- self.project_domain_id = kwargs['project_domain_id']
+ self.project_domain_name = kwargs['project_domain_name']
if kwargs.get('interface') is None:
self.interface = 'admin'
', image_api_version=' + str(self.image_api_version) +
', network_api_version=' + str(self.network_api_version) +
', compute_api_version=' + str(self.compute_api_version) +
+ ', heat_api_version=' + str(self.heat_api_version) +
', user_domain_id=' + str(self.user_domain_id) +
+ ', user_domain_name=' + str(self.user_domain_name) +
+ ', project_domain_id=' + str(self.project_domain_id) +
+ ', project_domain_name=' + str(self.project_domain_name) +
', interface=' + str(self.interface) +
+ ', region_name=' + str(self.region_name) +
', proxy_settings=' + str(self.proxy_settings) +
', cacert=' + str(self.cacert))
import logging
import unittest
-from snaps.openstack.os_credentials import OSCredsError, OSCreds, \
- ProxySettings, ProxySettingsError
+from snaps.openstack.os_credentials import (
+ OSCredsError, OSCreds, ProxySettings, ProxySettingsError)
__author__ = 'spisarski'
self.assertEqual(2, os_creds.image_api_version)
self.assertEqual(2, os_creds.compute_api_version)
self.assertEqual(1, os_creds.heat_api_version)
- self.assertEqual('default', os_creds.user_domain_id)
- self.assertEqual('default', os_creds.project_domain_id)
+ self.assertIsNone(os_creds.user_domain_id)
+ self.assertEqual('default', os_creds.user_domain_name)
+ self.assertIsNone(os_creds.project_domain_id)
+ self.assertEqual('default', os_creds.project_domain_name)
self.assertEqual('admin', os_creds.interface)
self.assertFalse(os_creds.cacert)
self.assertIsNone(os_creds.proxy_settings)
self.assertEqual(2, os_creds.image_api_version)
self.assertEqual(2, os_creds.compute_api_version)
self.assertEqual(1, os_creds.heat_api_version)
- self.assertEqual('default', os_creds.user_domain_id)
- self.assertEqual('default', os_creds.project_domain_id)
+ self.assertIsNone(os_creds.user_domain_id)
+ self.assertEqual('default', os_creds.user_domain_name)
+ self.assertIsNone(os_creds.project_domain_id)
+ self.assertEqual('default', os_creds.project_domain_name)
self.assertEqual('admin', os_creds.interface)
self.assertFalse(os_creds.cacert)
self.assertIsNone(os_creds.proxy_settings)
self.assertEqual(6, os_creds.image_api_version)
self.assertEqual(7, os_creds.compute_api_version)
self.assertEqual(8.0, os_creds.heat_api_version)
- self.assertEqual('default', os_creds.user_domain_id)
- self.assertEqual('default', os_creds.project_domain_id)
+ self.assertIsNone(os_creds.user_domain_id)
+ self.assertEqual('default', os_creds.user_domain_name)
+ self.assertIsNone(os_creds.project_domain_id)
+ self.assertEqual('default', os_creds.project_domain_name)
self.assertEqual('admin', os_creds.interface)
self.assertTrue(os_creds.cacert)
self.assertIsNone(os_creds.proxy_settings)
self.assertEqual(6, os_creds.image_api_version)
self.assertEqual(7, os_creds.compute_api_version)
self.assertEqual(8.0, os_creds.heat_api_version)
- self.assertEqual('default', os_creds.user_domain_id)
- self.assertEqual('default', os_creds.project_domain_id)
+ self.assertIsNone(os_creds.user_domain_id)
+ self.assertEqual('default', os_creds.user_domain_name)
+ self.assertIsNone(os_creds.project_domain_id)
+ self.assertEqual('default', os_creds.project_domain_name)
self.assertEqual('admin', os_creds.interface)
self.assertTrue(os_creds.cacert)
self.assertIsNone(os_creds.proxy_settings)
self.assertEqual(2, os_creds.image_api_version)
self.assertEqual(2, os_creds.compute_api_version)
self.assertEqual(1, os_creds.heat_api_version)
- self.assertEqual('default', os_creds.user_domain_id)
- self.assertEqual('default', os_creds.project_domain_id)
+ self.assertIsNone(os_creds.user_domain_id)
+ self.assertEqual('default', os_creds.user_domain_name)
+ self.assertIsNone(os_creds.project_domain_id)
+ self.assertEqual('default', os_creds.project_domain_name)
self.assertEqual('admin', os_creds.interface)
self.assertFalse(os_creds.cacert)
self.assertEqual('foo', os_creds.proxy_settings.host)
def test_proxy_settings_obj_kwargs(self):
proxy_settings = ProxySettings(host='foo', port=1234)
- os_creds = OSCreds(**{'username': 'foo', 'password': 'bar',
- 'auth_url': 'http://foo.bar:5000/v2',
- 'project_name': 'hello',
- 'proxy_settings': proxy_settings,
- 'region_name': 'test_region'})
+ os_creds = OSCreds(
+ **{'username': 'foo', 'password': 'bar',
+ 'auth_url': 'http://foo.bar:5000/v2', 'project_name': 'hello',
+ 'proxy_settings': proxy_settings, 'region_name': 'test_region',
+ 'user_domain_id': 'domain1', 'user_domain_name': 'domain2',
+ 'project_domain_id': 'domain3',
+ 'project_domain_name': 'domain4'})
self.assertEqual('foo', os_creds.username)
self.assertEqual('bar', os_creds.password)
self.assertEqual('http://foo.bar:5000/v2', os_creds.auth_url)
self.assertEqual(2, os_creds.image_api_version)
self.assertEqual(2, os_creds.compute_api_version)
self.assertEqual(1, os_creds.heat_api_version)
- self.assertEqual('default', os_creds.user_domain_id)
- self.assertEqual('default', os_creds.project_domain_id)
+ self.assertEqual('domain1', os_creds.user_domain_id)
+ self.assertEqual('domain2', os_creds.user_domain_name)
+ self.assertEqual('domain3', os_creds.project_domain_id)
+ self.assertEqual('domain4', os_creds.project_domain_name)
self.assertEqual('admin', os_creds.interface)
self.assertFalse(os_creds.cacert)
self.assertEqual('foo', os_creds.proxy_settings.host)
def test_proxy_settings_dict(self):
os_creds = OSCreds(
username='foo', password='bar', auth_url='http://foo.bar:5000/v2',
- project_name='hello', proxy_settings={'host': 'foo', 'port': 1234})
+ project_name='hello', proxy_settings={'host': 'foo', 'port': 1234},
+ user_domain_id='domain1', user_domain_name='domain2',
+ project_domain_id='domain3', project_domain_name='domain4')
self.assertEqual('foo', os_creds.username)
self.assertEqual('bar', os_creds.password)
self.assertEqual('http://foo.bar:5000/v2', os_creds.auth_url)
self.assertEqual(2, os_creds.image_api_version)
self.assertEqual(2, os_creds.compute_api_version)
self.assertEqual(1, os_creds.heat_api_version)
- self.assertEqual('default', os_creds.user_domain_id)
- self.assertEqual('default', os_creds.project_domain_id)
+ self.assertEqual('domain1', os_creds.user_domain_id)
+ self.assertEqual('domain2', os_creds.user_domain_name)
+ self.assertEqual('domain3', os_creds.project_domain_id)
+ self.assertEqual('domain4', os_creds.project_domain_name)
self.assertEqual('admin', os_creds.interface)
self.assertFalse(os_creds.cacert)
self.assertEqual('foo', os_creds.proxy_settings.host)
self.assertIsNone(os_creds.proxy_settings.ssh_proxy_cmd)
def test_proxy_settings_dict_kwargs(self):
- os_creds = OSCreds(**{'username': 'foo', 'password': 'bar',
- 'auth_url': 'http://foo.bar:5000/v2',
- 'project_name': 'hello',
- 'proxy_settings': {'host': 'foo', 'port': 1234},
- 'region_name': 'test_region'})
+ os_creds = OSCreds(
+ **{'username': 'foo', 'password': 'bar',
+ 'auth_url': 'http://foo.bar:5000/v2', 'project_name': 'hello',
+ 'proxy_settings': {'host': 'foo', 'port': 1234},
+ 'region_name': 'test_region'})
self.assertEqual('foo', os_creds.username)
self.assertEqual('bar', os_creds.password)
self.assertEqual('http://foo.bar:5000/v2', os_creds.auth_url)
self.assertEqual(2, os_creds.image_api_version)
self.assertEqual(2, os_creds.compute_api_version)
self.assertEqual(1, os_creds.heat_api_version)
- self.assertEqual('default', os_creds.user_domain_id)
- self.assertEqual('default', os_creds.project_domain_id)
+ self.assertIsNone(os_creds.user_domain_id)
+ self.assertEqual('default', os_creds.user_domain_name)
+ self.assertIsNone(os_creds.project_domain_id)
+ self.assertEqual('default', os_creds.project_domain_name)
self.assertEqual('admin', os_creds.interface)
self.assertFalse(os_creds.cacert)
self.assertEqual('foo', os_creds.proxy_settings.host)
created_image = self.image_creator.create()
self.assertIsNotNone(created_image)
- retrieved_image = glance_utils.get_image(self.glance,
- self.image_settings.name)
+ retrieved_image = glance_utils.get_image(
+ self.glance, image_settings=self.image_settings)
self.assertIsNotNone(retrieved_image)
self.assertEqual(created_image.size, retrieved_image.size)
self.assertEqual(get_image_size(self.image_settings),
created_image = self.image_creator.create()
self.assertIsNotNone(created_image)
- retrieved_image = glance_utils.get_image(self.glance,
- self.image_settings.name)
+ retrieved_image = glance_utils.get_image(
+ self.glance, image_settings=self.image_settings)
self.assertIsNotNone(retrieved_image)
self.assertEqual(self.image_creator.get_image().size,
retrieved_image.size)
self.assertEqual(self.image_name, created_image.name)
retrieved_image = glance_utils.get_image(
- self.glance, file_image_settings.name)
+ self.glance, image_settings=file_image_settings)
self.assertIsNotNone(retrieved_image)
self.assertEqual(self.image_creator.get_image().size,
retrieved_image.size)
created_image = self.image_creator.create()
self.assertIsNotNone(created_image)
- retrieved_image = glance_utils.get_image(self.glance,
- self.image_settings.name)
+ retrieved_image = glance_utils.get_image(
+ self.glance, image_settings=self.image_settings)
self.assertIsNotNone(retrieved_image)
self.assertEqual(self.image_creator.get_image().size,
retrieved_image.size)
glance_utils.delete_image(self.glance, created_image)
self.assertIsNone(glance_utils.get_image(
- self.glance, self.image_creator.image_settings.name))
+ self.glance, image_settings=self.image_creator.image_settings))
# Must not throw an exception when attempting to cleanup non-existent
# image
self.image_settings)
image1 = self.image_creator.create()
- retrieved_image = glance_utils.get_image(self.glance,
- self.image_settings.name)
+ retrieved_image = glance_utils.get_image(
+ self.glance, image_settings=self.image_settings)
self.assertIsNotNone(retrieved_image)
self.assertEqual(self.image_creator.get_image().size,
retrieved_image.size)
self.image_settings)
image1 = self.image_creator.create()
- retrieved_image = glance_utils.get_image(self.glance,
- self.image_settings.name)
+ retrieved_image = glance_utils.get_image(
+ self.glance, image_settings=self.image_settings)
self.assertIsNotNone(retrieved_image)
self.assertEqual(self.image_creator.get_image().size,
retrieved_image.size)
image_creator.create()
main_image = glance_utils.get_image(self.glance,
- image_settings.name)
+ image_settings=image_settings)
self.assertIsNotNone(main_image)
self.assertIsNotNone(image_creator.get_image())
self.assertEqual(image_creator.get_image().id, main_image.id)
kernel_image = glance_utils.get_image(
- self.glance, image_settings.kernel_image_settings.name)
+ self.glance,
+ image_settings=image_settings.kernel_image_settings)
self.assertIsNotNone(kernel_image)
self.assertIsNotNone(image_creator.get_kernel_image())
self.assertEqual(kernel_image.id,
image_creator.get_kernel_image().id)
ramdisk_image = glance_utils.get_image(
- self.glance, image_settings.ramdisk_image_settings.name)
+ self.glance,
+ image_settings=image_settings.ramdisk_image_settings)
self.assertIsNotNone(ramdisk_image)
self.assertIsNotNone(image_creator.get_ramdisk_image())
self.assertEqual(ramdisk_image.id,
self.assertIsNotNone(created_image)
self.assertEqual(self.image_name, created_image.name)
- retrieved_image = glance_utils.get_image(self.glance,
- file_image_settings.name)
+ retrieved_image = glance_utils.get_image(
+ self.glance, image_settings=file_image_settings)
self.assertIsNotNone(retrieved_image)
self.assertEqual(self.image_creators[-1].get_image().size,
retrieved_image.size)
self.assertIsNotNone(created_image)
self.assertEqual(self.image_name, created_image.name)
- retrieved_image = glance_utils.get_image(self.glance,
- os_image_settings.name)
+ retrieved_image = glance_utils.get_image(
+ self.glance, image_settings=os_image_settings)
self.assertIsNotNone(retrieved_image)
self.assertEqual(self.image_creators[-1].get_image().size,
self.assertIsNotNone(created_project)
retrieved_project = keystone_utils.get_project(
- keystone=self.keystone, project_name=self.project_settings.name)
+ keystone=self.keystone, project_settings=self.project_settings)
self.assertIsNotNone(retrieved_project)
self.assertEqual(created_project, retrieved_project)
self.assertTrue(validate_project(self.keystone, self.project_settings,
self.assertIsNotNone(created_project)
retrieved_project = keystone_utils.get_project(
- keystone=self.keystone, project_name=self.project_settings.name)
+ keystone=self.keystone, project_settings=self.project_settings)
self.assertIsNotNone(retrieved_project)
self.assertEqual(created_project, retrieved_project)
'network_api_version': config.get('OS_NETWORK_API_VERSION'),
'compute_api_version': config.get('OS_COMPUTE_API_VERSION'),
'heat_api_version': config.get('OS_HEAT_API_VERSION'),
- 'user_domain_id': config.get(
- 'OS_USER_DOMAIN_ID', config.get('OS_USER_DOMAIN_NAME')),
- 'project_domain_id': config.get(
- 'OS_PROJECT_DOMAIN_ID', config.get('OS_PROJECT_DOMAIN_NAME')),
+ 'user_domain_id': config.get('OS_USER_DOMAIN_ID'),
+ 'user_domain_name': config.get('OS_USER_DOMAIN_NAME'),
+ 'project_domain_id': config.get('OS_PROJECT_DOMAIN_ID'),
+ 'project_domain_name': config.get('OS_PROJECT_DOMAIN_NAME'),
'interface': interface,
'proxy_settings': proxy_settings,
'cacert': https_cacert,
creds_dict.update(overrides)
os_creds = OSCreds(**creds_dict)
- logger.info('OS Credentials = %s', os_creds)
+ logger.info('OS Credentials = %s', os_creds.__str__)
return os_creds
region_name=os_creds.region_name)
-def get_image(glance, image_name=None):
+def get_image(glance, image_name=None, image_settings=None):
"""
Returns an OpenStack image object for a given name
:param glance: the Glance client
:param image_name: the image name to lookup
+ :param image_settings: the image settings used for lookups
:return: the image object or None
"""
- images = glance.images.list()
+ img_filter = dict()
+ if image_settings:
+ if image_settings.exists:
+ img_filter = {'name': image_settings.name}
+ else:
+ img_filter = {'name': image_settings.name,
+ 'disk_format': image_settings.format}
+ elif image_name:
+ img_filter = {'name': image_name}
+
+ images = glance.images.list(**{'filters': img_filter})
for image in images:
if glance.version == VERSION_1:
- if image.name == image_name:
- image = glance.images.get(image.id)
- return Image(name=image.name, image_id=image.id,
- size=image.size, properties=image.properties)
+ image = glance.images.get(image.id)
+ return Image(name=image.name, image_id=image.id,
+ size=image.size, properties=image.properties)
elif glance.version == VERSION_2:
- if image['name'] == image_name:
- return Image(
- name=image['name'], image_id=image['id'],
- size=image['size'], properties=image.get('properties'))
- return None
+ return Image(
+ name=image['name'], image_id=image['id'],
+ size=image['size'], properties=image.get('properties'))
def get_image_by_id(glance, image_id):
password=os_creds.password,
project_name=os_creds.project_name,
user_domain_id=os_creds.user_domain_id,
- project_domain_id=os_creds.project_domain_id)
+ user_domain_name=os_creds.user_domain_name,
+ project_domain_id=os_creds.project_domain_id,
+ project_domain_name=os_creds.project_domain_name)
else:
auth = v2.Password(auth_url=os_creds.auth_url,
username=os_creds.username,
auth=auth, service_type=service_type, interface=interface)
-def get_project(keystone=None, os_creds=None, project_name=None):
+def get_project(keystone=None, os_creds=None, project_settings=None,
+ project_name=None):
"""
Returns the first project object or None if not found
:param keystone: the Keystone client
:param os_creds: the OpenStack credentials used to obtain the Keystone
client if the keystone parameter is None
+ :param project_settings: a ProjectSettings object
:param project_name: the name to query
:return: the SNAPS-OO Project domain object or None
"""
- if not project_name:
- return None
-
if not keystone:
if os_creds:
keystone = keystone_client(os_creds)
raise KeystoneException(
'Cannot lookup project without the proper credentials')
+ proj_filter = dict()
+
+ if project_name:
+ proj_filter['name'] = project_name
+ elif project_settings:
+ proj_filter['name'] = project_settings.name
+ proj_filter['description'] = project_settings.description
+ proj_filter['domain'] = project_settings.domain
+ proj_filter['enabled'] = project_settings.enabled
+
if keystone.version == V2_VERSION_STR:
projects = keystone.tenants.list()
else:
- projects = keystone.projects.list(**{'name': project_name})
+ projects = keystone.projects.list(**proj_filter)
for project in projects:
- domain_id = None
- if keystone.version != V2_VERSION_STR:
- domain_id = project.domain_id
- if project.name == project_name:
+ if project.name == proj_filter['name']:
+ domain_id = None
+ if keystone.version != V2_VERSION_STR:
+ domain_id = project.domain_id
+
return Project(name=project.name, project_id=project.id,
domain_id=domain_id)
- return None
-
def create_project(keystone, project_settings):
"""
:param project_name: the associated project (optional)
:return: a SNAPS-OO User domain object or None
"""
- project = get_project(keystone=keystone, project_name=project_name)
+ project = None
+ if project_name:
+ project = get_project(keystone=keystone, project_name=project_name)
if project:
users = keystone.users.list(tenant_id=project.id)
raise NovaException(
'Flavor not found with name - %s', instance_settings.flavor)
- image = glance_utils.get_image(glance, image_settings.name)
+ image = glance_utils.get_image(glance, image_settings=image_settings)
if image:
args = {'name': instance_settings.name,
'flavor': flavor,
Tests to ensure that the proper credentials can connect.
"""
glance = glance_utils.glance_client(self.os_creds)
- image = glance_utils.get_image(glance, 'foo')
+ image = glance_utils.get_image(glance, image_name='foo')
self.assertIsNone(image)
def test_glance_connect_fail(self):
glance = glance_utils.glance_client(OSCreds(
username='user', password='pass', auth_url='url',
project_name='project'))
- glance_utils.get_image(glance, 'foo')
+ glance_utils.get_image(glance, image_name='foo')
class GlanceUtilsTests(OSComponentTestCase):
self.assertEqual(self.image_name, self.image.name)
- image = glance_utils.get_image(self.glance, os_image_settings.name)
+ image = glance_utils.get_image(self.glance,
+ image_settings=os_image_settings)
self.assertIsNotNone(image)
validation_utils.objects_equivalent(self.image, image)
self.assertIsNotNone(self.image)
self.assertEqual(self.image_name, self.image.name)
- image = glance_utils.get_image(self.glance, file_image_settings.name)
+ image = glance_utils.get_image(
+ self.glance, image_settings=file_image_settings)
self.assertIsNotNone(image)
validation_utils.objects_equivalent(self.image, image)
self.assertEqual(self.project_name, self.project.name)
project = keystone_utils.get_project(
- keystone=self.keystone, project_name=project_settings.name)
+ keystone=self.keystone, project_settings=project_settings)
self.assertIsNotNone(project)
self.assertEqual(self.project_name, self.project.name)