from snaps.openstack.create_network import OpenStackNetwork, PortSettings
from snaps.openstack.create_router import OpenStackRouter
from snaps.openstack.create_image import OpenStackImage, ImageSettings
-from snaps.openstack.create_security_group import SecurityGroupSettings, OpenStackSecurityGroup
+from snaps.openstack.create_security_group import SecurityGroupSettings, OpenStackSecurityGroup, \
+ SecurityGroupRuleSettings, Direction, Protocol
from snaps.openstack.tests import openstack_tests, validation_utils
from snaps.openstack.utils import nova_utils
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase, OSComponentTestCase
self.assertTrue(self.inst_creator.vm_active(block=True))
- found = False
- timeout = 160
- start_time = time.time()
-
- logger.info("Looking for IP %s in the console log" % ip)
- full_log = ''
- while timeout > time.time() - start_time:
- output = vm.get_console_output()
- full_log = full_log + output
- if re.search(ip, output):
- logger.info('DHCP lease obtained logged in console')
- found = True
- break
-
- if not found:
- logger.error('Full console output -\n' + full_log)
- else:
- logger.debug('Full console output -\n' + full_log)
-
- self.assertTrue(found)
+ self.assertTrue(check_dhcp_lease(vm, ip))
class CreateInstanceSimpleTests(OSIntegrationTestCase):
self.router_creator = None
self.flavor_creator = None
self.keypair_creator = None
+ self.sec_grp_creator = None
self.inst_creators = list()
self.pub_net_config = openstack_tests.get_pub_net_config(
name=self.keypair_name, public_filepath=self.keypair_pub_filepath,
private_filepath=self.keypair_priv_filepath))
self.keypair_creator.create()
+
+ sec_grp_name = guid + '-sec-grp'
+ rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.icmp)
+ rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.tcp, port_range_min=22, port_range_max=22)
+ self.sec_grp_creator = OpenStackSecurityGroup(
+ self.os_creds,
+ SecurityGroupSettings(name=sec_grp_name, rule_settings=[rule1, rule2]))
+ self.sec_grp_creator.create()
except Exception as e:
self.tearDown()
raise e
except Exception as e:
logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
+ if self.sec_grp_creator:
+ try:
+ self.sec_grp_creator.clean()
+ except Exception as e:
+ logger.error('Unexpected exception cleaning security group with message - ' + str(e))
+
if self.router_creator:
try:
self.router_creator.clean()
self.assertIsNotNone(vm_inst)
self.assertTrue(inst_creator.vm_active(block=True))
+
+ ip = inst_creator.get_port_ip(port_settings.name)
+ self.assertTrue(check_dhcp_lease(vm_inst, ip))
+
+ inst_creator.add_security_group(self.sec_grp_creator.get_security_group())
self.assertEqual(vm_inst, inst_creator.get_vm_inst())
self.assertTrue(validate_ssh_client(inst_creator))
self.assertIsNotNone(vm_inst)
self.assertTrue(inst_creator.vm_active(block=True))
+
+ ip = inst_creator.get_port_ip(port_settings.name)
+ self.assertTrue(check_dhcp_lease(vm_inst, ip))
+
+ inst_creator.add_security_group(self.sec_grp_creator.get_security_group())
self.assertEqual(vm_inst, inst_creator.get_vm_inst())
self.assertTrue(validate_ssh_client(inst_creator))
self.router_creators = list()
self.flavor_creator = None
self.keypair_creator = None
+ self.sec_grp_creator = None
self.inst_creator = None
self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
name=self.keypair_name, public_filepath=self.keypair_pub_filepath,
private_filepath=self.keypair_priv_filepath))
self.keypair_creator.create()
+
+ sec_grp_name = self.guid + '-sec-grp'
+ rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.icmp)
+ rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.tcp, port_range_min=22, port_range_max=22)
+ self.sec_grp_creator = OpenStackSecurityGroup(
+ self.os_creds,
+ SecurityGroupSettings(name=sec_grp_name, rule_settings=[rule1, rule2]))
+ self.sec_grp_creator.create()
except Exception as e:
self.tearDown()
raise Exception(str(e))
except Exception as e:
logger.error('Unexpected exception cleaning network with message - ' + str(e))
+ if self.sec_grp_creator:
+ try:
+ self.sec_grp_creator.clean()
+ except Exception as e:
+ logger.error('Unexpected exception cleaning security group with message - ' + str(e))
+
if self.image_creator and not self.image_creator.image_settings.exists:
try:
self.image_creator.clean()
# Effectively blocks until VM has been properly activated
self.assertTrue(self.inst_creator.vm_active(block=True))
+ ip = self.inst_creator.get_port_ip(ports_settings[0].name)
+ self.assertTrue(check_dhcp_lease(vm_inst, ip))
+
+ # Add security group to VM
+ self.inst_creator.add_security_group(self.sec_grp_creator.get_security_group())
+
# Effectively blocks until VM's ssh port has been opened
self.assertTrue(self.inst_creator.vm_ssh_active(block=True))
self.inst_creator.create()
self.assertTrue(self.inst_creator.vm_active(block=True))
+
+
+def check_dhcp_lease(vm, ip, timeout=160):
+ """
+ Returns true if the expected DHCP lease has been acquired
+ :param vm:
+ :param ip:
+ :return:
+ """
+ found = False
+ start_time = time.time()
+
+ logger.info("Looking for IP %s in the console log" % ip)
+ full_log = ''
+ while timeout > time.time() - start_time:
+ output = vm.get_console_output()
+ full_log = full_log + output
+ if re.search(ip, output):
+ logger.info('DHCP lease obtained logged in console')
+ found = True
+ break
+
+ if not found:
+ logger.error('Full console output -\n' + full_log)
+ else:
+ logger.debug('Full console output -\n' + full_log)
+
+ return found
import os
import uuid
from scp import SCPClient
+from snaps.openstack.create_security_group import SecurityGroupRuleSettings, Direction, Protocol, \
+ OpenStackSecurityGroup, SecurityGroupSettings
from snaps.openstack import create_flavor
from snaps.openstack import create_instance
from snaps.openstack import create_keypairs
from snaps.openstack import create_network
from snaps.openstack import create_router
-from snaps.openstack.tests import openstack_tests
+from snaps.openstack.tests import openstack_tests, create_instance_tests
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
from snaps.provisioning import ansible_utils
# Setup members to cleanup just in case they don't get created
self.inst_creator = None
self.keypair_creator = None
+ self.sec_grp_creator = None
self.flavor_creator = None
self.router_creator = None
self.network_creator = None
private_filepath=self.keypair_priv_filepath))
self.keypair_creator.create()
+ # Create Security Group
+ sec_grp_name = guid + '-sec-grp'
+ rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.icmp)
+ rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.tcp, port_range_min=22, port_range_max=22)
+ self.sec_grp_creator = OpenStackSecurityGroup(
+ self.os_creds,
+ SecurityGroupSettings(name=sec_grp_name, rule_settings=[rule1, rule2]))
+ self.sec_grp_creator.create()
+
# Create instance
ports_settings = list()
ports_settings.append(
2. Set the following environment variable in your executing shell: ANSIBLE_HOST_KEY_CHECKING=False
Should this not be performed, the creation of the host ssh key will cause your ansible calls to fail.
"""
- self.inst_creator.create(block=True)
+ vm = self.inst_creator.create(block=True)
# Block until VM's ssh port has been opened
self.assertTrue(self.inst_creator.vm_ssh_active(block=True))
+ priv_ip = self.inst_creator.get_port_ip(self.port_1_name)
+ self.assertTrue(create_instance_tests.check_dhcp_lease(vm, priv_ip))
+
+ # Apply Security Group
+ self.inst_creator.add_security_group(self.sec_grp_creator.get_security_group())
+
ssh_client = self.inst_creator.ssh_client()
self.assertIsNotNone(ssh_client)
out = ssh_client.exec_command('pwd')[1].channel.in_buffer.read(1024)
2. Set the following environment variable in your executing shell: ANSIBLE_HOST_KEY_CHECKING=False
Should this not be performed, the creation of the host ssh key will cause your ansible calls to fail.
"""
- self.inst_creator.create(block=True)
+ vm = self.inst_creator.create(block=True)
# Block until VM's ssh port has been opened
self.assertTrue(self.inst_creator.vm_ssh_active(block=True))
+ priv_ip = self.inst_creator.get_port_ip(self.port_1_name)
+ self.assertTrue(create_instance_tests.check_dhcp_lease(vm, priv_ip))
+
+ # Apply Security Group
+ self.inst_creator.add_security_group(self.sec_grp_creator.get_security_group())
+
# Need to use the first floating IP as subsequent ones are currently broken with Apex CO
ip = self.inst_creator.get_floating_ip().ip
user = self.inst_creator.get_image_user()