-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import uuid
import unittest
+import uuid
+
+from keystoneclient.exceptions import BadRequest
-from snaps.openstack.create_project import OpenStackProject, ProjectSettings
+from snaps.domain.project import ComputeQuotas, NetworkQuotas
+from snaps.openstack.create_project import (
+ OpenStackProject, ProjectSettings, ProjectSettingsError)
from snaps.openstack.create_security_group import OpenStackSecurityGroup
from snaps.openstack.create_security_group import SecurityGroupSettings
from snaps.openstack.create_user import OpenStackUser
from snaps.openstack.create_user import UserSettings
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
-from snaps.openstack.utils import keystone_utils
+from snaps.openstack.utils import keystone_utils, nova_utils, neutron_utils
__author__ = 'spisarski'
"""
def test_no_params(self):
- with self.assertRaises(Exception):
+ with self.assertRaises(ProjectSettingsError):
ProjectSettings()
def test_empty_config(self):
- with self.assertRaises(Exception):
- ProjectSettings(config=dict())
+ with self.assertRaises(ProjectSettingsError):
+ ProjectSettings(**dict())
def test_name_only(self):
settings = ProjectSettings(name='foo')
- self.assertEquals('foo', settings.name)
- self.assertEquals('default', settings.domain)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('Default', settings.domain_name)
self.assertIsNone(settings.description)
self.assertTrue(settings.enabled)
def test_config_with_name_only(self):
- settings = ProjectSettings(config={'name': 'foo'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('default', settings.domain)
+ settings = ProjectSettings(**{'name': 'foo'})
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('Default', settings.domain_name)
self.assertIsNone(settings.description)
self.assertTrue(settings.enabled)
def test_all(self):
- settings = ProjectSettings(name='foo', domain='bar', description='foobar', enabled=False)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.domain)
- self.assertEquals('foobar', settings.description)
+ settings = ProjectSettings(name='foo', domain='bar',
+ description='foobar', enabled=False)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.domain_name)
+ self.assertEqual('foobar', settings.description)
self.assertFalse(settings.enabled)
def test_config_all(self):
- settings = ProjectSettings(config={'name': 'foo', 'domain': 'bar', 'description': 'foobar', 'enabled': False})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.domain)
- self.assertEquals('foobar', settings.description)
+ settings = ProjectSettings(
+ **{'name': 'foo', 'domain': 'bar', 'description': 'foobar',
+ 'enabled': False})
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.domain_name)
+ self.assertEqual('foobar', settings.description)
self.assertFalse(settings.enabled)
def setUp(self):
"""
- Instantiates the CreateImage object that is responsible for downloading and creating an OS image file
- within OpenStack
+ Instantiates the CreateImage object that is responsible for downloading
+ and creating an OS image file within OpenStack
"""
guid = str(uuid.uuid4())[:-19]
guid = self.__class__.__name__ + '-' + guid
- self.project_settings = ProjectSettings(name=guid + '-name')
+ self.project_settings = ProjectSettings(
+ name=guid + '-name',
+ domain=self.os_creds.project_domain_name)
self.keystone = keystone_utils.keystone_client(self.os_creds)
if self.project_creator:
self.project_creator.clean()
+ def test_create_project_bad_domain(self):
+ """
+ Tests the creation of an OpenStack project with an invalid domain
+ value. This test will not do anything with a keystone v2.0 client.
+ """
+ if self.keystone.version != keystone_utils.V2_VERSION_STR:
+ self.project_settings.domain_name = 'foo'
+ self.project_creator = OpenStackProject(self.os_creds,
+ self.project_settings)
+
+ with self.assertRaises(BadRequest):
+ self.project_creator.create()
+
def test_create_project(self):
"""
Tests the creation of an OpenStack project.
"""
- self.project_creator = OpenStackProject(self.os_creds, self.project_settings)
+ self.project_creator = OpenStackProject(self.os_creds,
+ self.project_settings)
created_project = self.project_creator.create()
self.assertIsNotNone(created_project)
- retrieved_project = keystone_utils.get_project(keystone=self.keystone, project_name=self.project_settings.name)
+ retrieved_project = keystone_utils.get_project(
+ keystone=self.keystone, project_settings=self.project_settings)
self.assertIsNotNone(retrieved_project)
- self.assertEquals(created_project, retrieved_project)
+ self.assertEqual(created_project, retrieved_project)
+ self.assertTrue(validate_project(self.keystone, self.project_settings,
+ created_project))
def test_create_project_2x(self):
"""
- Tests the creation of an OpenStack project twice to ensure it only creates one.
+ Tests the creation of an OpenStack project twice to ensure it only
+ creates one.
"""
- self.project_creator = OpenStackProject(self.os_creds, self.project_settings)
+ self.project_creator = OpenStackProject(self.os_creds,
+ self.project_settings)
created_project = self.project_creator.create()
self.assertIsNotNone(created_project)
- retrieved_project = keystone_utils.get_project(keystone=self.keystone, project_name=self.project_settings.name)
+ retrieved_project = keystone_utils.get_project(
+ keystone=self.keystone, project_settings=self.project_settings)
self.assertIsNotNone(retrieved_project)
- self.assertEquals(created_project, retrieved_project)
+ self.assertEqual(created_project, retrieved_project)
- project2 = OpenStackProject(self.os_creds, self.project_settings).create()
- self.assertEquals(retrieved_project, project2)
+ project2 = OpenStackProject(self.os_creds,
+ self.project_settings).create()
+ self.assertEqual(retrieved_project, project2)
+ self.assertTrue(validate_project(self.keystone, self.project_settings,
+ created_project))
def test_create_delete_project(self):
"""
- Tests the creation of an OpenStack project, it's deletion, then cleanup.
+ Tests the creation of an OpenStack project, it's deletion, then cleanup
"""
# Create Image
- self.project_creator = OpenStackProject(self.os_creds, self.project_settings)
+ self.project_creator = OpenStackProject(self.os_creds,
+ self.project_settings)
created_project = self.project_creator.create()
self.assertIsNotNone(created_project)
self.assertIsNone(self.project_creator.get_project())
- # TODO - Expand tests
+ self.assertTrue(validate_project(self.keystone, self.project_settings,
+ created_project))
+
+ def test_update_quotas(self):
+ """
+ Tests the creation of an OpenStack project where the quotas get
+ updated.
+ """
+ self.project_creator = OpenStackProject(self.os_creds,
+ self.project_settings)
+ created_project = self.project_creator.create()
+ self.assertIsNotNone(created_project)
+
+ retrieved_project = keystone_utils.get_project(
+ keystone=self.keystone, project_settings=self.project_settings)
+ self.assertIsNotNone(retrieved_project)
+ self.assertEqual(created_project, retrieved_project)
+ self.assertTrue(validate_project(self.keystone, self.project_settings,
+ created_project))
+
+ update_compute_quotas = ComputeQuotas(
+ **{'metadata_items': 64, 'cores': 5, 'instances': 5,
+ 'injected_files': 3, 'injected_file_content_bytes': 5120,
+ 'ram': 25600, 'fixed_ips': 100, 'key_pairs': 50})
+ self.project_creator.update_compute_quotas(update_compute_quotas)
+
+ update_network_quotas = NetworkQuotas(
+ **{'security_group': 5, 'security_group_rule': 50,
+ 'floatingip': 25, 'network': 5, 'port': 25, 'router': 6,
+ 'subnet': 7})
+ self.project_creator.update_network_quotas(update_network_quotas)
+
+ self.assertEqual(update_compute_quotas,
+ self.project_creator.get_compute_quotas())
+ self.assertEqual(update_network_quotas,
+ self.project_creator.get_network_quotas())
+
+ nova = nova_utils.nova_client(self.os_creds)
+ new_compute_quotas = nova_utils.get_compute_quotas(
+ nova, self.project_creator.get_project().id)
+ self.assertEqual(update_compute_quotas, new_compute_quotas)
+
+ neutron = neutron_utils.neutron_client(self.os_creds)
+ new_network_quotas = neutron_utils.get_network_quotas(
+ neutron, self.project_creator.get_project().id)
+ self.assertEqual(update_network_quotas, new_network_quotas)
class CreateProjectUserTests(OSComponentTestCase):
def setUp(self):
"""
- Instantiates the CreateImage object that is responsible for downloading and creating an OS image file
- within OpenStack
+ Instantiates the CreateImage object that is responsible for downloading
+ and creating an OS image file within OpenStack
"""
guid = str(uuid.uuid4())[:-19]
self.guid = self.__class__.__name__ + '-' + guid
- self.project_settings = ProjectSettings(name=self.guid + '-name')
+ self.project_settings = ProjectSettings(
+ name=self.guid + '-name',
+ domain=self.os_creds.project_domain_name)
self.keystone = keystone_utils.keystone_client(self.os_creds)
def test_create_project_sec_grp_one_user(self):
"""
- Tests the creation of an OpenStack object to a project with a new users and to create a security group
+ Tests the creation of an OpenStack object to a project with a new users
+ and to create a security group
"""
- self.project_creator = OpenStackProject(self.os_creds, self.project_settings)
+ self.project_creator = OpenStackProject(self.os_creds,
+ self.project_settings)
created_project = self.project_creator.create()
self.assertIsNotNone(created_project)
- user_creator = OpenStackUser(self.os_creds, UserSettings(name=self.guid + '-user', password=self.guid))
+ user_creator = OpenStackUser(
+ self.os_creds, UserSettings(
+ name=self.guid + '-user',
+ password=self.guid, roles={'admin': self.project_settings.name},
+ domain_name=self.os_creds.user_domain_name))
self.project_creator.assoc_user(user_creator.create())
self.user_creators.append(user_creator)
- sec_grp_os_creds = user_creator.get_os_creds(self.project_creator.get_project().name)
+ sec_grp_os_creds = user_creator.get_os_creds(
+ self.project_creator.get_project().name)
sec_grp_creator = OpenStackSecurityGroup(
- sec_grp_os_creds, SecurityGroupSettings(name=self.guid + '-name', description='hello group'))
+ sec_grp_os_creds, SecurityGroupSettings(name=self.guid + '-name',
+ description='hello group'))
sec_grp = sec_grp_creator.create()
self.assertIsNotNone(sec_grp)
self.sec_grp_creators.append(sec_grp_creator)
- if 'tenant_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id'])
- elif 'project_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['project_id'])
- else:
- self.fail('Cannot locate the project or tenant ID')
+ self.assertEqual(self.project_creator.get_project().id,
+ sec_grp.project_id)
def test_create_project_sec_grp_two_users(self):
"""
- Tests the creation of an OpenStack object to a project with two new users and use each user to create a
- security group
+ Tests the creation of an OpenStack object to a project with two new
+ users and use each user to create a security group
"""
- self.project_creator = OpenStackProject(self.os_creds, self.project_settings)
+ self.project_creator = OpenStackProject(self.os_creds,
+ self.project_settings)
created_project = self.project_creator.create()
self.assertIsNotNone(created_project)
- user_creator_1 = OpenStackUser(self.os_creds, UserSettings(name=self.guid + '-user1', password=self.guid))
+ user_creator_1 = OpenStackUser(
+ self.os_creds, UserSettings(
+ name=self.guid + '-user1', password=self.guid,
+ roles={'admin': self.project_settings.name},
+ domain_name=self.os_creds.user_domain_name))
self.project_creator.assoc_user(user_creator_1.create())
self.user_creators.append(user_creator_1)
- user_creator_2 = OpenStackUser(self.os_creds, UserSettings(name=self.guid + '-user2', password=self.guid))
+ user_creator_2 = OpenStackUser(
+ self.os_creds, UserSettings(
+ name=self.guid + '-user2', password=self.guid,
+ roles={'admin': self.project_settings.name},
+ domain_name=self.os_creds.user_domain_name))
self.project_creator.assoc_user(user_creator_2.create())
self.user_creators.append(user_creator_2)
ctr = 0
for user_creator in self.user_creators:
ctr += 1
- sec_grp_os_creds = user_creator.get_os_creds(self.project_creator.get_project().name)
+ sec_grp_os_creds = user_creator.get_os_creds(
+ self.project_creator.get_project().name)
sec_grp_creator = OpenStackSecurityGroup(
- sec_grp_os_creds, SecurityGroupSettings(name=self.guid + '-name', description='hello group'))
+ sec_grp_os_creds,
+ SecurityGroupSettings(name=self.guid + '-name',
+ description='hello group'))
sec_grp = sec_grp_creator.create()
self.assertIsNotNone(sec_grp)
self.sec_grp_creators.append(sec_grp_creator)
- if 'tenant_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id'])
- elif 'project_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['project_id'])
- else:
- self.fail('Cannot locate the project or tenant ID')
+ self.assertEqual(self.project_creator.get_project().id,
+ sec_grp.project_id)
+
+
+def validate_project(keystone, project_settings, project):
+ """
+ Validates that the project_settings used to create the project have been
+ properly set
+ :param keystone: the keystone client for version checking
+ :param project_settings: the settings used to create the project
+ :param project: the SNAPS-OO Project domain object
+ :return: T/F
+ """
+ if keystone.version == keystone_utils.V2_VERSION_STR:
+ return project_settings.name == project.name
+ else:
+ domain = keystone_utils.get_domain_by_id(keystone, project.domain_id)
+ return (project_settings.name == project.name and
+ project_settings.domain_name == domain.name)