1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
19 from cinderclient.exceptions import NotFound, BadRequest
21 from snaps.config.volume import VolumeConfig
22 from snaps.config.volume_type import (
23 VolumeTypeConfig, ControlLocation, VolumeTypeEncryptionConfig)
24 from snaps.config.qos import Consumer, QoSConfig
25 from snaps.openstack import create_volume
26 from snaps.openstack.create_qos import Consumer
27 from snaps.openstack.tests import validation_utils
28 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
29 from snaps.openstack.utils import cinder_utils, keystone_utils
31 __author__ = 'spisarski'
34 logger = logging.getLogger('cinder_utils_tests')
37 class CinderSmokeTests(OSComponentTestCase):
39 Tests to ensure that the neutron client can communicate with the cloud
42 def test_cinder_connect_success(self):
44 Tests to ensure that the proper credentials can connect.
46 cinder = cinder_utils.cinder_client(self.os_creds, self.os_session)
47 volumes = cinder.volumes.list()
48 self.assertIsNotNone(volumes)
49 self.assertTrue(isinstance(volumes, list))
51 def test_cinder_connect_fail(self):
53 Tests to ensure that the improper credentials cannot connect.
55 from snaps.openstack.os_credentials import OSCreds
57 with self.assertRaises(Exception):
58 cinder = cinder_utils.cinder_client(OSCreds(
59 username='user', password='pass', auth_url='url',
60 project_name='project'))
64 class CinderUtilsVolumeTests(OSComponentTestCase):
66 Test for the CreateVolume class defined in create_volume.py
71 Instantiates the CreateVolume object that is responsible for
72 downloading and creating an OS volume file within OpenStack
75 self.volume_name = self.__class__.__name__ + '-' + str(guid)
77 self.cinder = cinder_utils.cinder_client(
78 self.os_creds, self.os_session)
79 self.keystone = keystone_utils.keystone_client(
80 self.os_creds, self.os_session)
84 Cleans the remote OpenStack objects
88 cinder_utils.delete_volume(self.cinder, self.volume)
92 self.assertTrue(volume_deleted(self.cinder, self.volume))
94 def test_create_simple_volume(self):
96 Tests the cinder_utils.create_volume()
98 volume_settings = VolumeConfig(name=self.volume_name)
99 self.volume = cinder_utils.create_volume(
100 self.cinder, self.keystone, volume_settings)
101 self.assertIsNotNone(self.volume)
102 self.assertEqual(self.volume_name, self.volume.name)
104 self.assertTrue(volume_active(self.cinder, self.volume))
106 volume = cinder_utils.get_volume(
107 self.cinder, self.keystone, volume_settings=volume_settings,
108 project_name=self.os_creds.project_name)
109 self.assertIsNotNone(volume)
110 validation_utils.objects_equivalent(self.volume, volume)
112 def test_create_delete_volume(self):
114 Tests the cinder_utils.create_volume()
116 volume_settings = VolumeConfig(name=self.volume_name)
117 self.volume = cinder_utils.create_volume(
118 self.cinder, self.keystone, volume_settings)
119 self.assertIsNotNone(self.volume)
120 self.assertEqual(self.volume_name, self.volume.name)
122 self.assertTrue(volume_active(self.cinder, self.volume))
124 volume = cinder_utils.get_volume(
125 self.cinder, self.keystone, volume_settings=volume_settings,
126 project_name=self.os_creds.project_name)
127 self.assertIsNotNone(volume)
128 validation_utils.objects_equivalent(self.volume, volume)
130 cinder_utils.delete_volume(self.cinder, self.volume)
131 self.assertTrue(volume_deleted(self.cinder, self.volume))
133 cinder_utils.get_volume(
134 self.cinder, self.keystone, volume_settings,
135 project_name=self.os_creds.project_name))
138 def volume_active(cinder, volume):
140 Returns true if volume becomes active
145 end_time = time.time() + create_volume.VOLUME_ACTIVE_TIMEOUT
146 while time.time() < end_time:
147 status = cinder_utils.get_volume_status(cinder, volume)
148 if status == create_volume.STATUS_ACTIVE:
150 elif status == create_volume.STATUS_FAILED:
157 def volume_deleted(cinder, volume):
159 Returns true if volume becomes active
164 end_time = time.time() + create_volume.VOLUME_ACTIVE_TIMEOUT
165 while time.time() < end_time:
167 status = cinder_utils.get_volume_status(cinder, volume)
168 if status == create_volume.STATUS_DELETED:
178 class CinderUtilsQoSTests(OSComponentTestCase):
180 Test for the CreateQos class defined in create_qos.py
185 Creates objects for testing cinder_utils.py
188 self.qos_name = self.__class__.__name__ + '-' + str(guid)
189 self.specs = {'foo': 'bar '}
191 self.cinder = cinder_utils.cinder_client(
192 self.os_creds, self.os_session)
196 Cleans the remote OpenStack objects
200 cinder_utils.delete_qos(self.cinder, self.qos)
204 super(self.__class__, self).__clean__()
206 def test_create_qos_both(self):
208 Tests the cinder_utils.create_qos()
210 qos_settings = QoSConfig(
211 name=self.qos_name, specs=self.specs, consumer=Consumer.both)
212 self.qos = cinder_utils.create_qos(self.cinder, qos_settings)
213 self.assertIsNotNone(self.qos)
215 qos1 = cinder_utils.get_qos(self.cinder, qos_settings=qos_settings)
216 self.assertIsNotNone(qos1)
217 validation_utils.objects_equivalent(self.qos, qos1)
219 qos2 = cinder_utils.get_qos_by_id(self.cinder, qos1.id)
220 self.assertIsNotNone(qos2)
221 validation_utils.objects_equivalent(self.qos, qos2)
223 def test_create_qos_front(self):
225 Tests the cinder_utils.create_qos()
227 qos_settings = QoSConfig(
228 name=self.qos_name, specs=self.specs, consumer=Consumer.front_end)
229 self.qos = cinder_utils.create_qos(self.cinder, qos_settings)
230 self.assertIsNotNone(self.qos)
232 qos1 = cinder_utils.get_qos(self.cinder, qos_settings=qos_settings)
233 self.assertIsNotNone(qos1)
234 validation_utils.objects_equivalent(self.qos, qos1)
236 qos2 = cinder_utils.get_qos_by_id(self.cinder, qos1.id)
237 self.assertIsNotNone(qos2)
238 validation_utils.objects_equivalent(self.qos, qos2)
240 def test_create_qos_back(self):
242 Tests the cinder_utils.create_qos()
244 qos_settings = QoSConfig(
245 name=self.qos_name, specs=self.specs, consumer=Consumer.back_end)
246 self.qos = cinder_utils.create_qos(self.cinder, qos_settings)
247 self.assertIsNotNone(self.qos)
249 qos1 = cinder_utils.get_qos(self.cinder, qos_settings=qos_settings)
250 self.assertIsNotNone(qos1)
251 validation_utils.objects_equivalent(self.qos, qos1)
253 qos2 = cinder_utils.get_qos_by_id(self.cinder, qos1.id)
254 self.assertIsNotNone(qos2)
255 validation_utils.objects_equivalent(self.qos, qos2)
257 def test_create_delete_qos(self):
259 Tests the cinder_utils.create_qos()
261 qos_settings = QoSConfig(name=self.qos_name, consumer=Consumer.both)
262 self.qos = cinder_utils.create_qos(self.cinder, qos_settings)
263 self.assertIsNotNone(self.qos)
264 self.assertEqual(self.qos_name, self.qos.name)
266 qos = cinder_utils.get_qos(
267 self.cinder, qos_settings=qos_settings)
268 self.assertIsNotNone(qos)
269 validation_utils.objects_equivalent(self.qos, qos)
271 cinder_utils.delete_qos(self.cinder, self.qos)
272 self.assertIsNone(cinder_utils.get_qos(
273 self.cinder, qos_settings=qos_settings))
276 class CinderUtilsSimpleVolumeTypeTests(OSComponentTestCase):
278 Tests the creation of a Volume Type without any external settings such as
279 QoS Specs or encryption
284 Instantiates the CreateVolume object that is responsible for
285 downloading and creating an OS volume file within OpenStack
288 volume_type_name = self.__class__.__name__ + '-' + str(guid)
289 self.volume_type_settings = VolumeTypeConfig(name=volume_type_name)
290 self.volume_type = None
291 self.cinder = cinder_utils.cinder_client(
292 self.os_creds, self.os_session)
296 Cleans the remote OpenStack objects
300 cinder_utils.delete_volume_type(self.cinder, self.volume_type)
304 super(self.__class__, self).__clean__()
306 def test_create_simple_volume_type(self):
308 Tests the cinder_utils.create_volume_type(), get_volume_type(), and
309 get_volume_type_by_id()
311 self.volume_type = cinder_utils.create_volume_type(
312 self.cinder, self.volume_type_settings)
313 self.assertIsNotNone(self.volume_type)
314 self.assertEqual(self.volume_type_settings.name, self.volume_type.name)
316 volume_type1 = cinder_utils.get_volume_type(
317 self.cinder, volume_type_settings=self.volume_type_settings)
318 self.assertEquals(self.volume_type, volume_type1)
319 self.assertEquals(self.volume_type_settings.public,
322 volume_type2 = cinder_utils.get_volume_type_by_id(
323 self.cinder, volume_type1.id)
324 self.assertEquals(self.volume_type, volume_type2)
325 self.assertIsNone(self.volume_type.encryption)
327 def test_create_delete_volume_type(self):
329 Primarily tests the cinder_utils.delete_volume()
331 self.volume_type = cinder_utils.create_volume_type(
332 self.cinder, self.volume_type_settings)
333 self.assertIsNotNone(self.volume_type)
334 self.assertEqual(self.volume_type_settings.name, self.volume_type.name)
336 volume_type = cinder_utils.get_volume_type(
337 self.cinder, volume_type_settings=self.volume_type_settings)
338 self.assertIsNotNone(volume_type)
339 validation_utils.objects_equivalent(self.volume_type, volume_type)
340 self.assertIsNone(self.volume_type.encryption)
342 cinder_utils.delete_volume_type(self.cinder, self.volume_type)
343 self.assertIsNone(cinder_utils.get_volume_type_by_id(
344 self.cinder, self.volume_type.id))
347 class CinderUtilsAddEncryptionTests(OSComponentTestCase):
349 Tests the creation of an encryption and association to and existing
355 Instantiates the CreateVolume object that is responsible for
356 downloading and creating an OS volume file within OpenStack
359 self.encryption_name = self.__class__.__name__ + '-' + str(guid)
360 self.encryption = None
362 self.cinder = cinder_utils.cinder_client(
363 self.os_creds, self.os_session)
365 volume_type_name = self.__class__.__name__ + '-' + str(guid) + '-type'
366 self.volume_type = cinder_utils.create_volume_type(
367 self.cinder, VolumeTypeConfig(name=volume_type_name))
371 Cleans the remote OpenStack objects
375 cinder_utils.delete_volume_type_encryption(
376 self.cinder, self.volume_type)
382 cinder_utils.delete_volume_type(self.cinder, self.volume_type)
386 super(self.__class__, self).__clean__()
388 def test_create_simple_encryption(self):
390 Tests the cinder_utils.create_volume_encryption(),
391 get_volume_encryption(), and get_volume_encryption_by_id()
393 encryption_settings = VolumeTypeEncryptionConfig(
394 name=self.encryption_name, provider_class='foo',
395 control_location=ControlLocation.front_end)
396 self.encryption = cinder_utils.create_volume_encryption(
397 self.cinder, self.volume_type, encryption_settings)
398 self.assertIsNotNone(self.encryption)
399 self.assertEqual('foo', self.encryption.provider)
400 self.assertEqual(ControlLocation.front_end.value,
401 self.encryption.control_location)
403 encryption1 = cinder_utils.get_volume_encryption_by_type(
404 self.cinder, self.volume_type)
405 self.assertEquals(self.encryption, encryption1)
407 def test_create_delete_encryption(self):
409 Primarily tests the cinder_utils.delete_volume_type_encryption()
411 encryption_settings = VolumeTypeEncryptionConfig(
412 name=self.encryption_name, provider_class='LuksEncryptor',
413 control_location=ControlLocation.back_end)
414 self.encryption = cinder_utils.create_volume_encryption(
415 self.cinder, self.volume_type, encryption_settings)
416 self.assertIsNotNone(self.encryption)
417 self.assertEqual('LuksEncryptor', self.encryption.provider)
418 self.assertEqual(ControlLocation.back_end.value,
419 self.encryption.control_location)
421 encryption1 = cinder_utils.get_volume_encryption_by_type(
422 self.cinder, self.volume_type)
423 self.assertEquals(self.encryption, encryption1)
425 cinder_utils.delete_volume_type_encryption(
426 self.cinder, self.volume_type)
428 encryption2 = cinder_utils.get_volume_encryption_by_type(
429 self.cinder, self.volume_type)
430 self.assertIsNone(encryption2)
432 def test_create_with_all_attrs(self):
434 Tests the cinder_utils.create_volume_encryption() with all valid
437 encryption_settings = VolumeTypeEncryptionConfig(
438 name=self.encryption_name, provider_class='foo',
439 cipher='bar', control_location=ControlLocation.back_end,
441 self.encryption = cinder_utils.create_volume_encryption(
442 self.cinder, self.volume_type, encryption_settings)
443 self.assertIsNotNone(self.encryption)
444 self.assertEqual('foo', self.encryption.provider)
445 self.assertEqual('bar', self.encryption.cipher)
446 self.assertEqual(1, self.encryption.key_size)
447 self.assertEqual(ControlLocation.back_end.value,
448 self.encryption.control_location)
450 encryption1 = cinder_utils.get_volume_encryption_by_type(
451 self.cinder, self.volume_type)
452 self.assertEquals(self.encryption, encryption1)
454 def test_create_bad_key_size(self):
456 Tests the cinder_utils.create_volume_encryption() raises an exception
457 when the provider class does not exist
459 encryption_settings = VolumeTypeEncryptionConfig(
460 name=self.encryption_name, provider_class='foo',
461 cipher='bar', control_location=ControlLocation.back_end,
464 with self.assertRaises(BadRequest):
465 self.encryption = cinder_utils.create_volume_encryption(
466 self.cinder, self.volume_type, encryption_settings)
469 class CinderUtilsVolumeTypeCompleteTests(OSComponentTestCase):
471 Tests to ensure that a volume type can have a QoS Spec added to it
476 Creates objects for testing cinder_utils.py
479 self.qos_name = self.__class__.__name__ + '-' + str(guid) + '-qos'
480 self.vol_type_name = self.__class__.__name__ + '-' + str(guid)
481 self.specs = {'foo': 'bar'}
482 self.cinder = cinder_utils.cinder_client(
483 self.os_creds, self.os_session)
484 qos_settings = QoSConfig(
485 name=self.qos_name, specs=self.specs, consumer=Consumer.both)
486 self.qos = cinder_utils.create_qos(self.cinder, qos_settings)
487 self.volume_type = None
491 Cleans the remote OpenStack objects
494 if self.volume_type.encryption:
496 cinder_utils.delete_volume_type_encryption(
497 self.cinder, self.volume_type)
501 cinder_utils.delete_volume_type(self.cinder, self.volume_type)
507 cinder_utils.delete_qos(self.cinder, self.qos)
511 super(self.__class__, self).__clean__()
513 def test_create_with_encryption(self):
515 Tests the cinder_utils.create_volume_type() where encryption has been
518 encryption_settings = VolumeTypeEncryptionConfig(
519 name='foo', provider_class='bar',
520 control_location=ControlLocation.back_end)
521 volume_type_settings = VolumeTypeConfig(
522 name=self.vol_type_name, encryption=encryption_settings)
523 self.volume_type = cinder_utils.create_volume_type(
524 self.cinder, volume_type_settings)
526 vol_encrypt = cinder_utils.get_volume_encryption_by_type(
527 self.cinder, self.volume_type)
528 self.assertIsNotNone(vol_encrypt)
529 self.assertIsNone(self.volume_type.qos_spec)
530 self.assertEqual(self.volume_type.encryption, vol_encrypt)
531 self.assertEqual(self.volume_type.id, vol_encrypt.volume_type_id)
533 def test_create_with_qos(self):
535 Tests the cinder_utils.create_volume_type() with an associated QoS Spec
537 volume_type_settings = VolumeTypeConfig(
538 name=self.vol_type_name, qos_spec_name=self.qos_name)
539 self.volume_type = cinder_utils.create_volume_type(
540 self.cinder, volume_type_settings)
542 self.assertIsNotNone(self.volume_type)
543 self.assertIsNone(self.volume_type.encryption)
544 self.assertIsNotNone(self.volume_type.qos_spec)
545 self.assertEqual(self.qos.id, self.volume_type.qos_spec.id)
547 def test_create_with_invalid_qos(self):
549 Tests the cinder_utils.create_volume_type() when the QoS Spec name
552 volume_type_settings = VolumeTypeConfig(
553 name=self.vol_type_name, qos_spec_name='foo')
555 self.volume_type = cinder_utils.create_volume_type(
556 self.cinder, volume_type_settings)
558 self.assertIsNone(self.volume_type.qos_spec)
560 def test_create_with_qos_and_encryption(self):
562 Tests the cinder_utils.create_volume_type() with encryption and an
565 encryption_settings = VolumeTypeEncryptionConfig(
566 name='foo', provider_class='bar',
567 control_location=ControlLocation.back_end)
568 volume_type_settings = VolumeTypeConfig(
569 name=self.vol_type_name, qos_spec_name=self.qos_name,
570 encryption=encryption_settings)
571 self.volume_type = cinder_utils.create_volume_type(
572 self.cinder, volume_type_settings)
574 self.assertIsNotNone(self.volume_type)
575 vol_encrypt = cinder_utils.get_volume_encryption_by_type(
576 self.cinder, self.volume_type)
577 self.assertIsNotNone(vol_encrypt)
578 self.assertEqual(self.volume_type.id, vol_encrypt.volume_type_id)
579 self.assertIsNotNone(self.volume_type.qos_spec)
580 self.assertEqual(self.qos.id, self.volume_type.qos_spec.id)