self.assertIsNotNone(outputs)
self.assertEqual(0, len(outputs))
- # Wait until stack deployment has completed
- end_time = time.time() + create_stack.STACK_COMPLETE_TIMEOUT
- is_active = False
- while time.time() < end_time:
- status = heat_utils.get_stack_status(self.heat_client,
- self.stack1.id)
- if status == create_stack.STATUS_CREATE_COMPLETE:
- is_active = True
- break
- elif status == create_stack.STATUS_CREATE_FAILED:
- is_active = False
- break
-
- time.sleep(3)
-
- self.assertTrue(is_active)
+ self.assertTrue(stack_active(self.heat_client, self.stack1))
neutron = neutron_utils.neutron_client(self.os_creds)
networks = heat_utils.get_stack_networks(
self.stack1.id)
self.assertEqual(self.stack1, stack1_query_3)
- end_time = time.time() + create_stack.STACK_COMPLETE_TIMEOUT
- is_active = False
- while time.time() < end_time:
- status = heat_utils.get_stack_status(self.heat_client,
- self.stack1.id)
- if status == create_stack.STATUS_CREATE_COMPLETE:
- is_active = True
- break
- elif status == create_stack.STATUS_CREATE_FAILED:
- is_active = False
- break
-
- time.sleep(3)
-
- self.assertTrue(is_active)
+ self.assertTrue(stack_active(self.heat_client, self.stack1))
self.stack2 = heat_utils.create_stack(self.heat_client,
self.stack_settings2)
self.heat_client = heat_utils.heat_client(self.os_creds)
self.stack = heat_utils.create_stack(self.heat_client, stack_settings)
- # Wait until stack deployment has completed
- end_time = time.time() + create_stack.STACK_COMPLETE_TIMEOUT
- is_active = False
- while time.time() < end_time:
- status = heat_utils.get_stack_status(self.heat_client,
- self.stack.id)
- if status == create_stack.STATUS_CREATE_COMPLETE:
- is_active = True
- break
- elif status == create_stack.STATUS_CREATE_FAILED:
- is_active = False
- break
-
- time.sleep(3)
- self.assertTrue(is_active)
+ self.assertTrue(stack_active(self.heat_client, self.stack))
def tearDown(self):
"""
"""
self.stack = heat_utils.create_stack(
self.heat_client, self.stack_settings)
-
- # Wait until stack deployment has completed
- end_time = time.time() + create_stack.STACK_COMPLETE_TIMEOUT
- is_active = False
- while time.time() < end_time:
- status = heat_utils.get_stack_status(self.heat_client,
- self.stack.id)
- if status == create_stack.STATUS_CREATE_COMPLETE:
- is_active = True
- break
- elif status == create_stack.STATUS_CREATE_FAILED:
- is_active = False
- break
-
- time.sleep(3)
-
- self.assertTrue(is_active)
+ self.assertTrue(stack_active(self.heat_client, self.stack))
volumes = heat_utils.get_stack_volumes(
self.heat_client, self.cinder, self.stack)
"""
self.stack = heat_utils.create_stack(
self.heat_client, self.stack_settings)
-
- # Wait until stack deployment has completed
- end_time = time.time() + create_stack.STACK_COMPLETE_TIMEOUT
- is_active = False
- while time.time() < end_time:
- status = heat_utils.get_stack_status(self.heat_client,
- self.stack.id)
- if status == create_stack.STATUS_CREATE_COMPLETE:
- is_active = True
- break
- elif status == create_stack.STATUS_CREATE_FAILED:
- is_active = False
- break
-
- time.sleep(3)
-
- self.assertTrue(is_active)
+ self.assertTrue(stack_active(self.heat_client, self.stack))
volume_types = heat_utils.get_stack_volume_types(
self.heat_client, self.cinder, self.stack)
self.assertEqual(u'nova.volume.encryptors.luks.LuksEncryptor',
encryption.provider)
self.assertEqual(volume_type.id, encryption.volume_type_id)
+
+
+class HeatUtilsKeypairTests(OSComponentTestCase):
+ """
+ Test Heat volume functionality
+ """
+
+ def setUp(self):
+ """
+ Instantiates OpenStack instances that cannot be spawned by Heat
+ """
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ stack_name = guid + '-stack'
+ self.keypair_name = guid + '-kp'
+
+ env_values = {
+ 'keypair_name': self.keypair_name}
+
+ heat_tmplt_path = pkg_resources.resource_filename(
+ 'snaps.openstack.tests.heat', 'keypair_heat_template.yaml')
+ self.stack_settings = StackSettings(
+ name=stack_name, template_path=heat_tmplt_path,
+ env_values=env_values)
+ self.stack = None
+ self.heat_client = heat_utils.heat_client(self.os_creds)
+ self.nova = nova_utils.nova_client(self.os_creds)
+
+ def tearDown(self):
+ """
+ Cleans the image and downloaded image file
+ """
+ if self.stack:
+ try:
+ heat_utils.delete_stack(self.heat_client, self.stack)
+ except:
+ pass
+
+ def test_create_keypair_with_stack(self):
+ """
+ Tests the creation of an OpenStack keypair with Heat.
+ """
+ self.stack = heat_utils.create_stack(
+ self.heat_client, self.stack_settings)
+ self.assertTrue(stack_active(self.heat_client, self.stack))
+
+ keypairs = heat_utils.get_stack_keypairs(
+ self.heat_client, self.nova, self.stack)
+
+ self.assertEqual(1, len(keypairs))
+ keypair = keypairs[0]
+
+ self.assertEqual(self.keypair_name, keypair.name)
+
+ outputs = heat_utils.get_outputs(self.heat_client, self.stack)
+
+ for output in outputs:
+ if output.key == 'private_key':
+ self.assertTrue(output.value.startswith(
+ '-----BEGIN RSA PRIVATE KEY-----'))
+
+ keypair = nova_utils.get_keypair_by_id(self.nova, keypair.id)
+ self.assertIsNotNone(keypair)
+
+ self.assertEqual(self.keypair_name, keypair.name)
+
+
+def stack_active(heat_cli, stack):
+ """
+ Blocks until stack application has successfully completed or failed
+ :param heat_cli: the Heat client
+ :param stack: the Stack domain object
+ :return: T/F
+ """
+ # Wait until stack deployment has completed
+ end_time = time.time() + create_stack.STACK_COMPLETE_TIMEOUT
+ is_active = False
+ while time.time() < end_time:
+ status = heat_utils.get_stack_status(heat_cli, stack.id)
+ if status == create_stack.STATUS_CREATE_COMPLETE:
+ is_active = True
+ break
+ elif status == create_stack.STATUS_CREATE_FAILED:
+ is_active = False
+ break
+
+ time.sleep(3)
+
+ return is_active