1 # Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
2 # and others. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
21 from snaps import file_utils
22 from snaps.openstack import create_instance
23 from snaps.openstack.create_flavor import FlavorSettings, OpenStackFlavor
24 from snaps.openstack.create_image import OpenStackImage
25 from snaps.openstack.create_instance import (
26 VmInstanceSettings, OpenStackVmInstance)
27 from snaps.openstack.create_network import OpenStackNetwork, PortSettings
28 from snaps.openstack.create_volume import OpenStackVolume, VolumeSettings
29 from snaps.openstack.tests import openstack_tests
30 from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
31 from snaps.openstack.utils import (
32 nova_utils, neutron_utils, glance_utils, cinder_utils)
34 __author__ = 'spisarski'
36 logger = logging.getLogger('nova_utils_tests')
39 class NovaSmokeTests(OSComponentTestCase):
41 Tests to ensure that the nova client can communicate with the cloud
44 def test_nova_connect_success(self):
46 Tests to ensure that the proper credentials can connect.
48 nova = nova_utils.nova_client(self.os_creds)
50 # This should not throw an exception
53 def test_nova_connect_fail(self):
55 Tests to ensure that the improper credentials cannot connect.
57 from snaps.openstack.os_credentials import OSCreds
59 nova = nova_utils.nova_client(
60 OSCreds(username='user', password='pass',
61 auth_url=self.os_creds.auth_url,
62 project_name=self.os_creds.project_name,
63 proxy_settings=self.os_creds.proxy_settings))
65 # This should throw an exception
66 with self.assertRaises(Exception):
70 class NovaUtilsKeypairTests(OSComponentTestCase):
72 Test basic nova keypair functionality
77 Instantiates the CreateImage object that is responsible for downloading
78 and creating an OS image file within OpenStack
80 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
81 self.priv_key_file_path = 'tmp/' + guid
82 self.pub_key_file_path = self.priv_key_file_path + '.pub'
84 self.nova = nova_utils.nova_client(self.os_creds)
85 self.keys = nova_utils.create_keys()
86 self.public_key = nova_utils.public_key_openssh(self.keys)
87 self.keypair_name = guid
92 Cleans the image and downloaded image file
96 nova_utils.delete_keypair(self.nova, self.keypair)
101 os.chmod(self.priv_key_file_path, 0o777)
102 os.remove(self.priv_key_file_path)
107 os.chmod(self.pub_key_file_path, 0o777)
108 os.remove(self.pub_key_file_path)
112 def test_create_keypair(self):
114 Tests the creation of an OpenStack keypair that does not exist.
116 self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name,
118 result = nova_utils.keypair_exists(self.nova, self.keypair)
119 self.assertEqual(self.keypair, result)
120 keypair = nova_utils.get_keypair_by_name(self.nova, self.keypair_name)
121 self.assertEqual(self.keypair, keypair)
123 def test_create_delete_keypair(self):
125 Tests the creation of an OpenStack keypair that does not exist.
127 self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name,
129 result = nova_utils.keypair_exists(self.nova, self.keypair)
130 self.assertEqual(self.keypair, result)
131 nova_utils.delete_keypair(self.nova, self.keypair)
132 result2 = nova_utils.keypair_exists(self.nova, self.keypair)
133 self.assertIsNone(result2)
135 def test_create_key_from_file(self):
137 Tests that the generated RSA keys are properly saved to files
140 file_utils.save_keys_to_files(self.keys, self.pub_key_file_path,
141 self.priv_key_file_path)
142 self.keypair = nova_utils.upload_keypair_file(self.nova,
144 self.pub_key_file_path)
145 pub_key_file = open(os.path.expanduser(self.pub_key_file_path))
146 pub_key = pub_key_file.read()
148 self.assertEqual(self.keypair.public_key, pub_key)
151 class NovaUtilsFlavorTests(OSComponentTestCase):
153 Test basic nova flavor functionality
158 Instantiates the CreateImage object that is responsible for downloading
159 and creating an OS image file within OpenStack
161 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
162 self.flavor_settings = FlavorSettings(name=guid + '-name',
163 flavor_id=guid + '-id', ram=1,
166 rxtx_factor=3.0, is_public=False)
167 self.nova = nova_utils.nova_client(self.os_creds)
172 Cleans the image and downloaded image file
176 nova_utils.delete_flavor(self.nova, self.flavor)
180 def test_create_flavor(self):
182 Tests the creation of an OpenStack keypair that does not exist.
184 self.flavor = nova_utils.create_flavor(self.nova, self.flavor_settings)
185 self.validate_flavor()
187 def test_create_delete_flavor(self):
189 Tests the creation of an OpenStack keypair that does not exist.
191 self.flavor = nova_utils.create_flavor(self.nova, self.flavor_settings)
192 self.validate_flavor()
193 nova_utils.delete_flavor(self.nova, self.flavor)
194 flavor = nova_utils.get_flavor_by_name(self.nova,
195 self.flavor_settings.name)
196 self.assertIsNone(flavor)
198 def validate_flavor(self):
200 Validates the flavor_settings against the OpenStack flavor object
202 self.assertIsNotNone(self.flavor)
203 self.assertEqual(self.flavor_settings.name, self.flavor.name)
204 self.assertEqual(self.flavor_settings.flavor_id, self.flavor.id)
205 self.assertEqual(self.flavor_settings.ram, self.flavor.ram)
206 self.assertEqual(self.flavor_settings.disk, self.flavor.disk)
207 self.assertEqual(self.flavor_settings.vcpus, self.flavor.vcpus)
208 self.assertEqual(self.flavor_settings.ephemeral, self.flavor.ephemeral)
210 if self.flavor_settings.swap == 0:
211 self.assertEqual('', self.flavor.swap)
213 self.assertEqual(self.flavor_settings.swap, self.flavor.swap)
215 self.assertEqual(self.flavor_settings.rxtx_factor,
216 self.flavor.rxtx_factor)
217 self.assertEqual(self.flavor_settings.is_public, self.flavor.is_public)
220 class NovaUtilsInstanceTests(OSComponentTestCase):
222 Tests the creation of VM instances via nova_utils.py
227 Setup objects required by VM instances
231 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
233 self.nova = nova_utils.nova_client(self.os_creds)
234 self.neutron = neutron_utils.neutron_client(self.os_creds)
235 self.glance = glance_utils.glance_client(self.os_creds)
237 self.image_creator = None
238 self.network_creator = None
239 self.flavor_creator = None
244 image_settings = openstack_tests.cirros_image_settings(
245 name=guid + '-image', image_metadata=self.image_metadata)
246 self.image_creator = OpenStackImage(
247 self.os_creds, image_settings=image_settings)
248 self.image_creator.create()
250 network_settings = openstack_tests.get_priv_net_config(
251 guid + '-net', guid + '-subnet').network_settings
252 self.network_creator = OpenStackNetwork(
253 self.os_creds, network_settings)
254 self.network_creator.create()
256 self.flavor_creator = OpenStackFlavor(
259 name=guid + '-flavor-name', ram=256, disk=10, vcpus=1))
260 self.flavor_creator.create()
262 port_settings = PortSettings(name=guid + '-port',
263 network_name=network_settings.name)
264 self.port = neutron_utils.create_port(
265 self.neutron, self.os_creds, port_settings)
267 self.instance_settings = VmInstanceSettings(
268 name=guid + '-vm_inst',
269 flavor=self.flavor_creator.flavor_settings.name,
270 port_settings=[port_settings])
277 Cleanup deployed resources
282 nova_utils.delete_vm_instance(self.nova, self.vm_inst)
287 neutron_utils.delete_port(self.neutron, self.port)
290 if self.flavor_creator:
292 self.flavor_creator.clean()
295 if self.network_creator:
297 self.network_creator.clean()
300 if self.image_creator:
302 self.image_creator.clean()
306 def test_create_instance(self):
308 Tests the nova_utils.create_server() method
312 self.vm_inst = nova_utils.create_server(
313 self.nova, self.neutron, self.glance, self.instance_settings,
314 self.image_creator.image_settings)
316 self.assertIsNotNone(self.vm_inst)
318 # Wait until instance is ACTIVE
322 if create_instance.STATUS_ACTIVE == nova_utils.get_server_status(
323 self.nova, self.vm_inst):
330 self.assertTrue(active)
331 vm_inst = nova_utils.get_latest_server_object(self.nova, self.vm_inst)
333 self.assertEqual(self.vm_inst.name, vm_inst.name)
334 self.assertEqual(self.vm_inst.id, vm_inst.id)
337 class NovaUtilsInstanceVolumeTests(OSComponentTestCase):
339 Tests the creation of VM instances via nova_utils.py
344 Setup objects required by VM instances
348 guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
350 self.nova = nova_utils.nova_client(self.os_creds)
351 self.cinder = cinder_utils.cinder_client(self.os_creds)
353 self.image_creator = None
354 self.network_creator = None
355 self.flavor_creator = None
356 self.volume_creator = None
357 self.instance_creator = None
360 image_settings = openstack_tests.cirros_image_settings(
361 name=guid + '-image', image_metadata=self.image_metadata)
362 self.image_creator = OpenStackImage(
363 self.os_creds, image_settings=image_settings)
364 self.image_creator.create()
366 network_settings = openstack_tests.get_priv_net_config(
367 guid + '-net', guid + '-subnet').network_settings
368 self.network_creator = OpenStackNetwork(
369 self.os_creds, network_settings)
370 self.network_creator.create()
372 self.flavor_creator = OpenStackFlavor(
375 name=guid + '-flavor-name', ram=256, disk=10, vcpus=1))
376 self.flavor_creator.create()
379 volume_settings = VolumeSettings(
380 name=self.__class__.__name__ + '-' + str(guid))
381 self.volume_creator = OpenStackVolume(
382 self.os_creds, volume_settings)
383 self.volume_creator.create(block=True)
385 port_settings = PortSettings(
386 name=guid + '-port', network_name=network_settings.name)
387 instance_settings = VmInstanceSettings(
388 name=guid + '-vm_inst',
389 flavor=self.flavor_creator.flavor_settings.name,
390 port_settings=[port_settings])
391 self.instance_creator = OpenStackVmInstance(
392 self.os_creds, instance_settings, image_settings)
393 self.instance_creator.create(block=True)
400 Cleanup deployed resources
403 if self.instance_creator:
405 self.instance_creator.clean()
408 if self.volume_creator:
410 self.volume_creator.clean()
413 if self.flavor_creator:
415 self.flavor_creator.clean()
418 if self.network_creator:
420 self.network_creator.clean()
423 if self.image_creator:
425 self.image_creator.clean()
429 def test_add_remove_volume(self):
431 Tests the nova_utils.create_server() method
435 self.assertIsNotNone(self.volume_creator.get_volume())
436 self.assertEqual(0, len(self.volume_creator.get_volume().attachments))
438 # Attach volume to VM
439 nova_utils.attach_volume(
440 self.nova, self.instance_creator.get_vm_inst(),
441 self.volume_creator.get_volume())
445 vol_attach = cinder_utils.get_volume_by_id(
446 self.cinder, self.volume_creator.get_volume().id)
447 vm_attach = nova_utils.get_server_object_by_id(
448 self.nova, self.instance_creator.get_vm_inst().id)
450 # Detach volume to VM
451 nova_utils.detach_volume(
452 self.nova, self.instance_creator.get_vm_inst(),
453 self.volume_creator.get_volume())
457 vol_detach = cinder_utils.get_volume_by_id(
458 self.cinder, self.volume_creator.get_volume().id)
459 vm_detach = nova_utils.get_server_object_by_id(
460 self.nova, self.instance_creator.get_vm_inst().id)
462 # Validate Attachment
463 self.assertIsNotNone(vol_attach)
464 self.assertEqual(self.volume_creator.get_volume().id, vol_attach.id)
465 self.assertEqual(1, len(vol_attach.attachments))
466 self.assertEqual(vm_attach.volume_ids[0]['id'],
467 vol_attach.attachments[0]['volume_id'])
469 # Validate Detachment
470 self.assertIsNotNone(vol_detach)
471 self.assertEqual(self.volume_creator.get_volume().id, vol_detach.id)
472 self.assertEqual(0, len(vol_detach.attachments))
473 self.assertEqual(0, len(vm_detach.volume_ids))