1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
21 from snaps import file_utils
22 from snaps.config.flavor import FlavorConfig
23 from snaps.openstack import create_instance
24 from snaps.openstack.create_flavor import OpenStackFlavor
25 from snaps.openstack.create_image import OpenStackImage
26 from snaps.openstack.create_instance import (
27 VmInstanceSettings, OpenStackVmInstance)
28 from snaps.openstack.create_network import OpenStackNetwork, PortSettings
29 from snaps.openstack.create_volume import OpenStackVolume, VolumeSettings
30 from snaps.openstack.tests import openstack_tests
31 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
32 from snaps.openstack.utils import (
33 nova_utils, neutron_utils, glance_utils, cinder_utils)
35 __author__ = 'spisarski'
37 logger = logging.getLogger('nova_utils_tests')
40 class NovaSmokeTests(OSComponentTestCase):
42 Tests to ensure that the nova client can communicate with the cloud
45 def test_nova_connect_success(self):
47 Tests to ensure that the proper credentials can connect.
49 nova = nova_utils.nova_client(self.os_creds)
51 # This should not throw an exception
54 def test_nova_connect_fail(self):
56 Tests to ensure that the improper credentials cannot connect.
58 from snaps.openstack.os_credentials import OSCreds
60 nova = nova_utils.nova_client(
61 OSCreds(username='user', password='pass',
62 auth_url=self.os_creds.auth_url,
63 project_name=self.os_creds.project_name,
64 proxy_settings=self.os_creds.proxy_settings))
66 # This should throw an exception
67 with self.assertRaises(Exception):
71 class NovaUtilsKeypairTests(OSComponentTestCase):
73 Test basic nova keypair functionality
78 Instantiates the CreateImage object that is responsible for downloading
79 and creating an OS image file within OpenStack
81 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
82 self.priv_key_file_path = 'tmp/' + guid
83 self.pub_key_file_path = self.priv_key_file_path + '.pub'
85 self.nova = nova_utils.nova_client(self.os_creds)
86 self.keys = nova_utils.create_keys()
87 self.public_key = nova_utils.public_key_openssh(self.keys)
88 self.keypair_name = guid
93 Cleans the image and downloaded image file
97 nova_utils.delete_keypair(self.nova, self.keypair)
102 os.chmod(self.priv_key_file_path, 0o777)
103 os.remove(self.priv_key_file_path)
108 os.chmod(self.pub_key_file_path, 0o777)
109 os.remove(self.pub_key_file_path)
113 def test_create_keypair(self):
115 Tests the creation of an OpenStack keypair that does not exist.
117 self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name,
119 result = nova_utils.keypair_exists(self.nova, self.keypair)
120 self.assertEqual(self.keypair, result)
121 keypair = nova_utils.get_keypair_by_name(self.nova, self.keypair_name)
122 self.assertEqual(self.keypair, keypair)
124 def test_create_delete_keypair(self):
126 Tests the creation of an OpenStack keypair that does not exist.
128 self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name,
130 result = nova_utils.keypair_exists(self.nova, self.keypair)
131 self.assertEqual(self.keypair, result)
132 nova_utils.delete_keypair(self.nova, self.keypair)
133 result2 = nova_utils.keypair_exists(self.nova, self.keypair)
134 self.assertIsNone(result2)
136 def test_create_key_from_file(self):
138 Tests that the generated RSA keys are properly saved to files
141 file_utils.save_keys_to_files(self.keys, self.pub_key_file_path,
142 self.priv_key_file_path)
143 self.keypair = nova_utils.upload_keypair_file(self.nova,
145 self.pub_key_file_path)
146 pub_key_file = open(os.path.expanduser(self.pub_key_file_path))
147 pub_key = pub_key_file.read()
149 self.assertEqual(self.keypair.public_key, pub_key)
152 class NovaUtilsFlavorTests(OSComponentTestCase):
154 Test basic nova flavor functionality
159 Instantiates the CreateImage object that is responsible for downloading
160 and creating an OS image file within OpenStack
162 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
163 self.flavor_settings = FlavorConfig(
164 name=guid + '-name', flavor_id=guid + '-id', ram=1, disk=1,
165 vcpus=1, ephemeral=1, swap=2, rxtx_factor=3.0, is_public=False)
166 self.nova = nova_utils.nova_client(self.os_creds)
171 Cleans the image and downloaded image file
175 nova_utils.delete_flavor(self.nova, self.flavor)
179 def test_create_flavor(self):
181 Tests the creation of an OpenStack keypair that does not exist.
183 self.flavor = nova_utils.create_flavor(self.nova, self.flavor_settings)
184 self.validate_flavor()
186 def test_create_delete_flavor(self):
188 Tests the creation of an OpenStack keypair that does not exist.
190 self.flavor = nova_utils.create_flavor(self.nova, self.flavor_settings)
191 self.validate_flavor()
192 nova_utils.delete_flavor(self.nova, self.flavor)
193 flavor = nova_utils.get_flavor_by_name(self.nova,
194 self.flavor_settings.name)
195 self.assertIsNone(flavor)
197 def validate_flavor(self):
199 Validates the flavor_settings against the OpenStack flavor object
201 self.assertIsNotNone(self.flavor)
202 self.assertEqual(self.flavor_settings.name, self.flavor.name)
203 self.assertEqual(self.flavor_settings.flavor_id, self.flavor.id)
204 self.assertEqual(self.flavor_settings.ram, self.flavor.ram)
205 self.assertEqual(self.flavor_settings.disk, self.flavor.disk)
206 self.assertEqual(self.flavor_settings.vcpus, self.flavor.vcpus)
207 self.assertEqual(self.flavor_settings.ephemeral, self.flavor.ephemeral)
209 if self.flavor_settings.swap == 0:
210 self.assertEqual('', self.flavor.swap)
212 self.assertEqual(self.flavor_settings.swap, self.flavor.swap)
214 self.assertEqual(self.flavor_settings.rxtx_factor,
215 self.flavor.rxtx_factor)
216 self.assertEqual(self.flavor_settings.is_public, self.flavor.is_public)
219 class NovaUtilsInstanceTests(OSComponentTestCase):
221 Tests the creation of VM instances via nova_utils.py
226 Setup objects required by VM instances
230 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
232 self.nova = nova_utils.nova_client(self.os_creds)
233 self.neutron = neutron_utils.neutron_client(self.os_creds)
234 self.glance = glance_utils.glance_client(self.os_creds)
236 self.image_creator = None
237 self.network_creator = None
238 self.flavor_creator = None
243 image_settings = openstack_tests.cirros_image_settings(
244 name=guid + '-image', image_metadata=self.image_metadata)
245 self.image_creator = OpenStackImage(
246 self.os_creds, image_settings=image_settings)
247 self.image_creator.create()
249 network_settings = openstack_tests.get_priv_net_config(
250 guid + '-net', guid + '-subnet').network_settings
251 self.network_creator = OpenStackNetwork(
252 self.os_creds, network_settings)
253 self.network_creator.create()
255 self.flavor_creator = OpenStackFlavor(
258 name=guid + '-flavor-name', ram=256, disk=10, vcpus=1))
259 self.flavor_creator.create()
261 port_settings = PortSettings(name=guid + '-port',
262 network_name=network_settings.name)
263 self.port = neutron_utils.create_port(
264 self.neutron, self.os_creds, port_settings)
266 self.instance_settings = VmInstanceSettings(
267 name=guid + '-vm_inst',
268 flavor=self.flavor_creator.flavor_settings.name,
269 port_settings=[port_settings])
276 Cleanup deployed resources
281 nova_utils.delete_vm_instance(self.nova, self.vm_inst)
286 neutron_utils.delete_port(self.neutron, self.port)
289 if self.flavor_creator:
291 self.flavor_creator.clean()
294 if self.network_creator:
296 self.network_creator.clean()
299 if self.image_creator:
301 self.image_creator.clean()
305 def test_create_instance(self):
307 Tests the nova_utils.create_server() method
311 self.vm_inst = nova_utils.create_server(
312 self.nova, self.neutron, self.glance, self.instance_settings,
313 self.image_creator.image_settings)
315 self.assertIsNotNone(self.vm_inst)
317 # Wait until instance is ACTIVE
321 if create_instance.STATUS_ACTIVE == nova_utils.get_server_status(
322 self.nova, self.vm_inst):
329 self.assertTrue(active)
330 vm_inst = nova_utils.get_latest_server_object(self.nova, self.vm_inst)
332 self.assertEqual(self.vm_inst.name, vm_inst.name)
333 self.assertEqual(self.vm_inst.id, vm_inst.id)
336 class NovaUtilsInstanceVolumeTests(OSComponentTestCase):
338 Tests the creation of VM instances via nova_utils.py
343 Setup objects required by VM instances
347 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
349 self.nova = nova_utils.nova_client(self.os_creds)
350 self.cinder = cinder_utils.cinder_client(self.os_creds)
352 self.image_creator = None
353 self.network_creator = None
354 self.flavor_creator = None
355 self.volume_creator = None
356 self.instance_creator = None
359 image_settings = openstack_tests.cirros_image_settings(
360 name=guid + '-image', image_metadata=self.image_metadata)
361 self.image_creator = OpenStackImage(
362 self.os_creds, image_settings=image_settings)
363 self.image_creator.create()
365 network_settings = openstack_tests.get_priv_net_config(
366 guid + '-net', guid + '-subnet').network_settings
367 self.network_creator = OpenStackNetwork(
368 self.os_creds, network_settings)
369 self.network_creator.create()
371 self.flavor_creator = OpenStackFlavor(
374 name=guid + '-flavor-name', ram=256, disk=10, vcpus=1))
375 self.flavor_creator.create()
378 volume_settings = VolumeSettings(
379 name=self.__class__.__name__ + '-' + str(guid))
380 self.volume_creator = OpenStackVolume(
381 self.os_creds, volume_settings)
382 self.volume_creator.create(block=True)
384 port_settings = PortSettings(
385 name=guid + '-port', network_name=network_settings.name)
386 instance_settings = VmInstanceSettings(
387 name=guid + '-vm_inst',
388 flavor=self.flavor_creator.flavor_settings.name,
389 port_settings=[port_settings])
390 self.instance_creator = OpenStackVmInstance(
391 self.os_creds, instance_settings, image_settings)
392 self.instance_creator.create(block=True)
399 Cleanup deployed resources
402 if self.instance_creator:
404 self.instance_creator.clean()
407 if self.volume_creator:
409 self.volume_creator.clean()
412 if self.flavor_creator:
414 self.flavor_creator.clean()
417 if self.network_creator:
419 self.network_creator.clean()
422 if self.image_creator:
424 self.image_creator.clean()
428 def test_add_remove_volume(self):
430 Tests the nova_utils.create_server() method
434 self.assertIsNotNone(self.volume_creator.get_volume())
435 self.assertEqual(0, len(self.volume_creator.get_volume().attachments))
437 # Attach volume to VM
438 nova_utils.attach_volume(
439 self.nova, self.instance_creator.get_vm_inst(),
440 self.volume_creator.get_volume())
444 vol_attach = cinder_utils.get_volume_by_id(
445 self.cinder, self.volume_creator.get_volume().id)
446 vm_attach = nova_utils.get_server_object_by_id(
447 self.nova, self.instance_creator.get_vm_inst().id)
449 # Detach volume to VM
450 nova_utils.detach_volume(
451 self.nova, self.instance_creator.get_vm_inst(),
452 self.volume_creator.get_volume())
456 vol_detach = cinder_utils.get_volume_by_id(
457 self.cinder, self.volume_creator.get_volume().id)
458 vm_detach = nova_utils.get_server_object_by_id(
459 self.nova, self.instance_creator.get_vm_inst().id)
461 # Validate Attachment
462 self.assertIsNotNone(vol_attach)
463 self.assertEqual(self.volume_creator.get_volume().id, vol_attach.id)
464 self.assertEqual(1, len(vol_attach.attachments))
465 self.assertEqual(vm_attach.volume_ids[0]['id'],
466 vol_attach.attachments[0]['volume_id'])
468 # Validate Detachment
469 self.assertIsNotNone(vol_detach)
470 self.assertEqual(self.volume_creator.get_volume().id, vol_detach.id)
471 self.assertEqual(0, len(vol_detach.attachments))
472 self.assertEqual(0, len(vm_detach.volume_ids))