# limitations under the License.
from cinderclient.exceptions import NotFound, BadRequest
+from snaps.config.volume import VolumeConfig, VolumeConfigError
+from snaps.config.volume_type import VolumeTypeConfig
from snaps.openstack.create_image import OpenStackImage
-from snaps.openstack.create_volume_type import (
- VolumeTypeSettings, OpenStackVolumeType)
+from snaps.openstack.create_volume_type import OpenStackVolumeType
from snaps.openstack.tests import openstack_tests
try:
import unittest
import uuid
-from snaps.openstack import create_volume
from snaps.openstack.create_volume import (
- VolumeSettings, VolumeSettingsError, OpenStackVolume)
+ VolumeSettings, OpenStackVolume)
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
from snaps.openstack.utils import cinder_utils
"""
def test_no_params(self):
- with self.assertRaises(VolumeSettingsError):
+ with self.assertRaises(VolumeConfigError):
VolumeSettings()
def test_empty_config(self):
- with self.assertRaises(VolumeSettingsError):
+ with self.assertRaises(VolumeConfigError):
VolumeSettings(**dict())
def test_name_only(self):
super(self.__class__, self).__start__()
guid = uuid.uuid4()
- self.volume_settings = VolumeSettings(
+ self.volume_settings = VolumeConfig(
name=self.__class__.__name__ + '-' + str(guid))
self.cinder = cinder_utils.cinder_client(self.os_creds)
def test_create_volume_simple(self):
"""
- Tests the creation of an OpenStack volume from a URL.
+ Tests the creation of a simple OpenStack volume.
"""
# Create Volume
- self.volume_creator = create_volume.OpenStackVolume(
+ self.volume_creator = OpenStackVolume(
self.os_creds, self.volume_settings)
created_volume = self.volume_creator.create(block=True)
self.assertIsNotNone(created_volume)
clean() does not raise an Exception.
"""
# Create Volume
- self.volume_creator = create_volume.OpenStackVolume(
+ self.volume_creator = OpenStackVolume(
self.os_creds, self.volume_settings)
created_volume = self.volume_creator.create(block=True)
self.assertIsNotNone(created_volume)
self.assertEqual(volume1, volume2)
+class CreateSimpleVolumeFailureTests(OSIntegrationTestCase):
+ """
+ Test for the CreateVolume class defined in create_volume.py
+ """
+
+ def setUp(self):
+ """
+ Instantiates the CreateVolume object that is responsible for
+ downloading and creating an OS volume file within OpenStack
+ """
+ super(self.__class__, self).__start__()
+
+ self.guid = uuid.uuid4()
+ self.cinder = cinder_utils.cinder_client(self.os_creds)
+ self.volume_creator = None
+
+ def tearDown(self):
+ """
+ Cleans the volume and downloaded volume file
+ """
+ if self.volume_creator:
+ self.volume_creator.clean()
+
+ super(self.__class__, self).__clean__()
+
+ def test_create_volume_bad_size(self):
+ """
+ Tests the creation of an OpenStack volume with a negative size to
+ ensure it raises a BadRequest exception.
+ """
+ volume_settings = VolumeConfig(
+ name=self.__class__.__name__ + '-' + str(self.guid), size=-1)
+
+ # Create Volume
+ self.volume_creator = OpenStackVolume(self.os_creds, volume_settings)
+
+ with self.assertRaises(BadRequest):
+ self.volume_creator.create(block=True)
+
+ def test_create_volume_bad_type(self):
+ """
+ Tests the creation of an OpenStack volume with a type that does not
+ exist to ensure it raises a NotFound exception.
+ """
+ volume_settings = VolumeConfig(
+ name=self.__class__.__name__ + '-' + str(self.guid),
+ type_name='foo')
+
+ # Create Volume
+ self.volume_creator = OpenStackVolume(self.os_creds, volume_settings)
+
+ with self.assertRaises(NotFound):
+ self.volume_creator.create(block=True)
+
+ def test_create_volume_bad_image(self):
+ """
+ Tests the creation of an OpenStack volume with an image that does not
+ exist to ensure it raises a BadRequest exception.
+ """
+ volume_settings = VolumeConfig(
+ name=self.__class__.__name__ + '-' + str(self.guid),
+ image_name='foo')
+
+ # Create Volume
+ self.volume_creator = OpenStackVolume(self.os_creds, volume_settings)
+
+ with self.assertRaises(BadRequest):
+ self.volume_creator.create(block=True)
+
+ def test_create_volume_bad_zone(self):
+ """
+ Tests the creation of an OpenStack volume with an availability zone
+ that does not exist to ensure it raises a BadRequest exception.
+ """
+ volume_settings = VolumeConfig(
+ name=self.__class__.__name__ + '-' + str(self.guid),
+ availability_zone='foo')
+
+ # Create Volume
+ self.volume_creator = OpenStackVolume(self.os_creds, volume_settings)
+
+ with self.assertRaises(BadRequest):
+ self.volume_creator.create(block=True)
+
+
class CreateVolumeWithTypeTests(OSIntegrationTestCase):
"""
Test cases for the CreateVolume when attempting to associate it to a
self.volume_type_name = guid + '-vol-type'
self.volume_type_creator = OpenStackVolumeType(
- self.os_creds, VolumeTypeSettings(name=self.volume_type_name))
+ self.os_creds, VolumeTypeConfig(name=self.volume_type_name))
self.volume_type_creator.create()
self.volume_creator = None
"""
self.volume_creator = OpenStackVolume(
self.os_creds,
- VolumeSettings(name=self.volume_name, type_name='foo'))
+ VolumeConfig(name=self.volume_name, type_name='foo'))
with self.assertRaises(NotFound):
self.volume_creator.create()
"""
self.volume_creator = OpenStackVolume(
self.os_creds,
- VolumeSettings(name=self.volume_name,
- type_name=self.volume_type_name))
+ VolumeConfig(
+ name=self.volume_name, type_name=self.volume_type_name))
- created_volume = self.volume_creator.create()
+ created_volume = self.volume_creator.create(block=True)
self.assertIsNotNone(created_volume)
self.assertEqual(self.volume_type_name, created_volume.type)
def setUp(self):
super(self.__class__, self).__start__()
+ self.cinder = cinder_utils.cinder_client(self.os_creds)
+
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
self.volume_name = guid + '-vol'
self.image_name = guid + '-image'
def test_bad_image_name(self):
"""
- Expect a NotFound to be raised when the volume type does not exist
+ Tests OpenStackVolume#create() method to ensure a volume is NOT created
+ when associating it to an invalid image name
"""
self.volume_creator = OpenStackVolume(
self.os_creds,
- VolumeSettings(name=self.volume_name, image_name='foo'))
+ VolumeConfig(name=self.volume_name, image_name='foo'))
with self.assertRaises(BadRequest):
self.volume_creator.create(block=True)
def test_valid_volume_image(self):
"""
- Expect a NotFound to be raised when the volume type does not exist
+ Tests OpenStackVolume#create() method to ensure a volume is NOT created
+ when associating it to an invalid image name
"""
self.volume_creator = OpenStackVolume(
self.os_creds,
- VolumeSettings(name=self.volume_name, image_name=self.image_name))
+ VolumeConfig(name=self.volume_name, image_name=self.image_name))
created_volume = self.volume_creator.create(block=True)
self.assertIsNotNone(created_volume)
- self.assertIsNone(created_volume.type)
+ self.assertEqual(
+ self.volume_creator.volume_settings.name, created_volume.name)
self.assertTrue(self.volume_creator.volume_active())
+
+ retrieved_volume = cinder_utils.get_volume_by_id(
+ self.cinder, created_volume.id)
+
+ self.assertEqual(created_volume, retrieved_volume)