1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
21 from snaps import file_utils
22 from snaps.config.flavor import FlavorConfig
23 from snaps.config.network import PortConfig
24 from snaps.config.vm_inst import VmInstanceConfig
25 from snaps.config.volume import VolumeConfig
26 from snaps.openstack import create_instance
27 from snaps.openstack.create_flavor import OpenStackFlavor
28 from snaps.openstack.create_image import OpenStackImage
29 from snaps.openstack.create_instance import OpenStackVmInstance
30 from snaps.openstack.create_network import OpenStackNetwork
31 from snaps.openstack.create_volume import OpenStackVolume
32 from snaps.openstack.tests import openstack_tests
33 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
34 from snaps.openstack.utils import (
35 nova_utils, neutron_utils, glance_utils, cinder_utils)
36 from snaps.openstack.utils.nova_utils import NovaException
38 __author__ = 'spisarski'
40 logger = logging.getLogger('nova_utils_tests')
43 class NovaSmokeTests(OSComponentTestCase):
45 Tests to ensure that the nova client can communicate with the cloud
48 def test_nova_connect_success(self):
50 Tests to ensure that the proper credentials can connect.
52 nova = nova_utils.nova_client(self.os_creds)
54 # This should not throw an exception
57 def test_nova_get_hypervisor_hosts(self):
59 Tests to ensure that get_hypervisors() function works.
61 nova = nova_utils.nova_client(self.os_creds)
63 hosts = nova_utils.get_hypervisor_hosts(nova)
64 # This should not throw an exception
65 self.assertGreaterEqual(len(hosts), 1)
67 def test_nova_connect_fail(self):
69 Tests to ensure that the improper credentials cannot connect.
71 from snaps.openstack.os_credentials import OSCreds
73 nova = nova_utils.nova_client(
74 OSCreds(username='user', password='pass',
75 auth_url=self.os_creds.auth_url,
76 project_name=self.os_creds.project_name,
77 proxy_settings=self.os_creds.proxy_settings))
79 # This should throw an exception
80 with self.assertRaises(Exception):
84 class NovaUtilsKeypairTests(OSComponentTestCase):
86 Test basic nova keypair functionality
91 Instantiates the CreateImage object that is responsible for downloading
92 and creating an OS image file within OpenStack
94 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
95 self.priv_key_file_path = 'tmp/' + guid
96 self.pub_key_file_path = self.priv_key_file_path + '.pub'
98 self.nova = nova_utils.nova_client(self.os_creds)
99 self.keys = nova_utils.create_keys()
100 self.public_key = nova_utils.public_key_openssh(self.keys)
101 self.keypair_name = guid
106 Cleans the image and downloaded image file
110 nova_utils.delete_keypair(self.nova, self.keypair)
115 os.chmod(self.priv_key_file_path, 0o777)
116 os.remove(self.priv_key_file_path)
121 os.chmod(self.pub_key_file_path, 0o777)
122 os.remove(self.pub_key_file_path)
126 def test_create_keypair(self):
128 Tests the creation of an OpenStack keypair that does not exist.
130 self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name,
132 result = nova_utils.keypair_exists(self.nova, self.keypair)
133 self.assertEqual(self.keypair, result)
134 keypair = nova_utils.get_keypair_by_name(self.nova, self.keypair_name)
135 self.assertEqual(self.keypair, keypair)
137 def test_create_delete_keypair(self):
139 Tests the creation of an OpenStack keypair that does not exist.
141 self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name,
143 result = nova_utils.keypair_exists(self.nova, self.keypair)
144 self.assertEqual(self.keypair, result)
145 nova_utils.delete_keypair(self.nova, self.keypair)
146 result2 = nova_utils.keypair_exists(self.nova, self.keypair)
147 self.assertIsNone(result2)
149 def test_create_key_from_file(self):
151 Tests that the generated RSA keys are properly saved to files
154 file_utils.save_keys_to_files(self.keys, self.pub_key_file_path,
155 self.priv_key_file_path)
156 self.keypair = nova_utils.upload_keypair_file(self.nova,
158 self.pub_key_file_path)
159 pub_key_file = open(os.path.expanduser(self.pub_key_file_path))
160 pub_key = pub_key_file.read()
162 self.assertEqual(self.keypair.public_key, pub_key)
165 class NovaUtilsFlavorTests(OSComponentTestCase):
167 Test basic nova flavor functionality
172 Instantiates the CreateImage object that is responsible for downloading
173 and creating an OS image file within OpenStack
175 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
176 self.flavor_settings = FlavorConfig(
177 name=guid + '-name', flavor_id=guid + '-id', ram=1, disk=1,
178 vcpus=1, ephemeral=1, swap=2, rxtx_factor=3.0, is_public=False)
179 self.nova = nova_utils.nova_client(self.os_creds)
184 Cleans the image and downloaded image file
188 nova_utils.delete_flavor(self.nova, self.flavor)
192 def test_create_flavor(self):
194 Tests the creation of an OpenStack keypair that does not exist.
196 self.flavor = nova_utils.create_flavor(self.nova, self.flavor_settings)
197 self.validate_flavor()
199 def test_create_delete_flavor(self):
201 Tests the creation of an OpenStack keypair that does not exist.
203 self.flavor = nova_utils.create_flavor(self.nova, self.flavor_settings)
204 self.validate_flavor()
205 nova_utils.delete_flavor(self.nova, self.flavor)
206 flavor = nova_utils.get_flavor_by_name(self.nova,
207 self.flavor_settings.name)
208 self.assertIsNone(flavor)
210 def validate_flavor(self):
212 Validates the flavor_settings against the OpenStack flavor object
214 self.assertIsNotNone(self.flavor)
215 self.assertEqual(self.flavor_settings.name, self.flavor.name)
216 self.assertEqual(self.flavor_settings.flavor_id, self.flavor.id)
217 self.assertEqual(self.flavor_settings.ram, self.flavor.ram)
218 self.assertEqual(self.flavor_settings.disk, self.flavor.disk)
219 self.assertEqual(self.flavor_settings.vcpus, self.flavor.vcpus)
220 self.assertEqual(self.flavor_settings.ephemeral, self.flavor.ephemeral)
222 if self.flavor_settings.swap == 0:
223 self.assertEqual('', self.flavor.swap)
225 self.assertEqual(self.flavor_settings.swap, self.flavor.swap)
227 self.assertEqual(self.flavor_settings.rxtx_factor,
228 self.flavor.rxtx_factor)
229 self.assertEqual(self.flavor_settings.is_public, self.flavor.is_public)
232 class NovaUtilsInstanceTests(OSComponentTestCase):
234 Tests the creation of VM instances via nova_utils.py
239 Setup objects required by VM instances
243 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
245 self.nova = nova_utils.nova_client(self.os_creds)
246 self.neutron = neutron_utils.neutron_client(self.os_creds)
247 self.glance = glance_utils.glance_client(self.os_creds)
249 self.image_creator = None
250 self.network_creator = None
251 self.flavor_creator = None
256 image_settings = openstack_tests.cirros_image_settings(
257 name=guid + '-image', image_metadata=self.image_metadata)
258 self.image_creator = OpenStackImage(
259 self.os_creds, image_settings=image_settings)
260 self.image_creator.create()
262 network_settings = openstack_tests.get_priv_net_config(
263 guid + '-net', guid + '-subnet').network_settings
264 self.network_creator = OpenStackNetwork(
265 self.os_creds, network_settings)
266 self.network_creator.create()
268 self.flavor_creator = OpenStackFlavor(
271 name=guid + '-flavor-name', ram=256, disk=10, vcpus=1))
272 self.flavor_creator.create()
274 port_settings = PortConfig(
275 name=guid + '-port', network_name=network_settings.name)
276 self.port = neutron_utils.create_port(
277 self.neutron, self.os_creds, port_settings)
279 self.instance_settings = VmInstanceConfig(
280 name=guid + '-vm_inst',
281 flavor=self.flavor_creator.flavor_settings.name,
282 port_settings=[port_settings])
289 Cleanup deployed resources
294 nova_utils.delete_vm_instance(self.nova, self.vm_inst)
299 neutron_utils.delete_port(self.neutron, self.port)
302 if self.flavor_creator:
304 self.flavor_creator.clean()
307 if self.network_creator:
309 self.network_creator.clean()
312 if self.image_creator:
314 self.image_creator.clean()
318 def test_create_instance(self):
320 Tests the nova_utils.create_server() method
324 self.vm_inst = nova_utils.create_server(
325 self.nova, self.neutron, self.glance, self.instance_settings,
326 self.image_creator.image_settings)
328 self.assertIsNotNone(self.vm_inst)
330 # Wait until instance is ACTIVE
334 if create_instance.STATUS_ACTIVE == nova_utils.get_server_status(
335 self.nova, self.vm_inst):
342 self.assertTrue(active)
343 vm_inst = nova_utils.get_latest_server_object(
344 self.nova, self.neutron, self.vm_inst)
346 self.assertEqual(self.vm_inst.name, vm_inst.name)
347 self.assertEqual(self.vm_inst.id, vm_inst.id)
350 class NovaUtilsInstanceVolumeTests(OSComponentTestCase):
352 Tests the creation of VM instances via nova_utils.py
357 Setup objects required by VM instances
361 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
363 self.nova = nova_utils.nova_client(self.os_creds)
364 self.cinder = cinder_utils.cinder_client(self.os_creds)
366 self.image_creator = None
367 self.network_creator = None
368 self.flavor_creator = None
369 self.volume_creator = None
370 self.instance_creator = None
373 image_settings = openstack_tests.cirros_image_settings(
374 name=guid + '-image', image_metadata=self.image_metadata)
375 self.image_creator = OpenStackImage(
376 self.os_creds, image_settings=image_settings)
377 self.image_creator.create()
379 network_settings = openstack_tests.get_priv_net_config(
380 guid + '-net', guid + '-subnet').network_settings
381 self.network_creator = OpenStackNetwork(
382 self.os_creds, network_settings)
383 self.network_creator.create()
385 self.flavor_creator = OpenStackFlavor(
388 name=guid + '-flavor-name', ram=256, disk=10, vcpus=1))
389 self.flavor_creator.create()
392 volume_settings = VolumeConfig(
393 name=self.__class__.__name__ + '-' + str(guid))
394 self.volume_creator = OpenStackVolume(
395 self.os_creds, volume_settings)
396 self.volume_creator.create(block=True)
398 port_settings = PortConfig(
399 name=guid + '-port', network_name=network_settings.name)
400 instance_settings = VmInstanceConfig(
401 name=guid + '-vm_inst',
402 flavor=self.flavor_creator.flavor_settings.name,
403 port_settings=[port_settings])
404 self.instance_creator = OpenStackVmInstance(
405 self.os_creds, instance_settings, image_settings)
406 self.instance_creator.create(block=True)
413 Cleanup deployed resources
416 if self.instance_creator:
418 self.instance_creator.clean()
421 if self.volume_creator:
423 self.volume_creator.clean()
426 if self.flavor_creator:
428 self.flavor_creator.clean()
431 if self.network_creator:
433 self.network_creator.clean()
436 if self.image_creator:
438 self.image_creator.clean()
442 def test_add_remove_volume(self):
444 Tests the nova_utils.attach_volume() and detach_volume functions with
449 self.assertIsNotNone(self.volume_creator.get_volume())
450 self.assertEqual(0, len(self.volume_creator.get_volume().attachments))
452 # Attach volume to VM
453 neutron = neutron_utils.neutron_client(self.os_creds)
454 self.assertIsNotNone(nova_utils.attach_volume(
455 self.nova, neutron, self.instance_creator.get_vm_inst(),
456 self.volume_creator.get_volume()))
460 start_time = time.time()
461 while time.time() < start_time + 30:
462 vol_attach = cinder_utils.get_volume_by_id(
463 self.cinder, self.volume_creator.get_volume().id)
465 if len(vol_attach.attachments) > 0:
471 self.assertTrue(attached)
472 self.assertIsNotNone(vol_attach)
474 vm_attach = nova_utils.get_server_object_by_id(
475 self.nova, neutron, self.instance_creator.get_vm_inst().id)
477 # Validate Attachment
478 self.assertIsNotNone(vol_attach)
479 self.assertEqual(self.volume_creator.get_volume().id, vol_attach.id)
480 self.assertEqual(1, len(vol_attach.attachments))
481 self.assertEqual(vm_attach.volume_ids[0]['id'],
482 vol_attach.attachments[0]['volume_id'])
484 # Detach volume to VM
485 self.assertIsNotNone(nova_utils.detach_volume(
486 self.nova, neutron, self.instance_creator.get_vm_inst(),
487 self.volume_creator.get_volume()))
489 vol_detach = cinder_utils.get_volume_by_id(
490 self.cinder, self.volume_creator.get_volume().id)
491 vm_detach = nova_utils.get_server_object_by_id(
492 self.nova, neutron, self.instance_creator.get_vm_inst().id)
494 # Validate Detachment
495 self.assertIsNotNone(vol_detach)
496 self.assertEqual(self.volume_creator.get_volume().id, vol_detach.id)
498 if len(vol_detach.attachments) > 0:
499 vol_detach = cinder_utils.get_volume_by_id(
500 self.cinder, self.volume_creator.get_volume().id)
502 self.assertEqual(0, len(vol_detach.attachments))
503 self.assertEqual(0, len(vm_detach.volume_ids))
505 def test_attach_volume_nowait(self):
507 Tests the nova_utils.attach_volume() with a timeout value that is too
508 small to have the volume attachment data to be included on the VmInst
509 object that was supposed to be returned
512 self.assertIsNotNone(self.volume_creator.get_volume())
513 self.assertEqual(0, len(self.volume_creator.get_volume().attachments))
515 # Attach volume to VM
516 neutron = neutron_utils.neutron_client(self.os_creds)
517 with self.assertRaises(NovaException):
518 nova_utils.attach_volume(
519 self.nova, neutron, self.instance_creator.get_vm_inst(),
520 self.volume_creator.get_volume(), 0)
522 def test_detach_volume_nowait(self):
524 Tests the nova_utils.detach_volume() with a timeout value that is too
525 small to have the volume attachment data to be included on the VmInst
526 object that was supposed to be returned
529 self.assertIsNotNone(self.volume_creator.get_volume())
530 self.assertEqual(0, len(self.volume_creator.get_volume().attachments))
532 # Attach volume to VM
533 neutron = neutron_utils.neutron_client(self.os_creds)
534 nova_utils.attach_volume(
535 self.nova, neutron, self.instance_creator.get_vm_inst(),
536 self.volume_creator.get_volume())
538 # Check VmInst for attachment
539 latest_vm = nova_utils.get_server_object_by_id(
540 self.nova, neutron, self.instance_creator.get_vm_inst().id)
541 self.assertEqual(1, len(latest_vm.volume_ids))
543 # Check Volume for attachment
546 start_time = time.time()
547 while time.time() < start_time + 30:
548 vol_attach = cinder_utils.get_volume_by_id(
549 self.cinder, self.volume_creator.get_volume().id)
551 if len(vol_attach.attachments) > 0:
557 self.assertTrue(attached)
558 self.assertIsNotNone(vol_attach)
561 with self.assertRaises(NovaException):
562 nova_utils.detach_volume(
563 self.nova, neutron, self.instance_creator.get_vm_inst(),
564 self.volume_creator.get_volume(), 0)