| Test Name | Heat API | Description |
+=======================================+===============+===========================================================+
| test_create_keypair_with_stack | 1-3 | Tests ability of the function |
-| | | heat_utils.create_stack() to return the correct |
+| | | heat_utils.get_stack_keypairs() to return the correct |
| | | Keypair domain objects deployed with Heat |
+---------------------------------------+---------------+-----------------------------------------------------------+
+heat_utils_tests.py - HeatUtilsSecurityGroupTests
+-------------------------------------------------
+
++---------------------------------------+---------------+-----------------------------------------------------------+
+| Test Name | Heat API | Description |
++=======================================+===============+===========================================================+
+| test_create_security_group_with_stack | 1-3 | Tests ability of the function |
+| | | heat_utils.get_stack_security_groups() to return the |
+| | | correct SecurityGroup domain objects deployed with Heat |
++---------------------------------------+---------------+-----------------------------------------------------------+
+
heat_utils_tests.py - HeatUtilsFlavorTests
------------------------------------------
| | | deploying |
+---------------------------------------+---------------+-----------------------------------------------------------+
+create_stack_tests.py - CreateStackSecurityGroupTests
+-----------------------------------------------------
+
++---------------------------------------+---------------+-----------------------------------------------------------+
+| Test Name | Heat API | Description |
++=======================================+===============+===========================================================+
+| test_retrieve_security_group_creator | 1-3 | Ensures that an OpenStackHeatStack instance can return a |
+| | | OpenStackSecurityGroup instance that it was responsible |
+| | | for deploying |
++---------------------------------------+---------------+-----------------------------------------------------------+
+
create_stack_tests.py - CreateComplexStackTests
-----------------------------------------------
Constructor
:param name: the security group's name
:param id: the security group's id
+ :param description: the security group's description
+ :param project_id: the security group's project_id
+ :param rules: list of SecurityGroupRule objects associated to this
"""
self.name = kwargs.get('name')
self.id = kwargs.get('id')
self.description = kwargs.get('description')
self.project_id = kwargs.get('project_id', kwargs.get('tenant_id'))
+ self.rules = list()
+ if kwargs.get('rules') and isinstance(kwargs.get('rules'), list):
+ for rule in kwargs.get('rules'):
+ if isinstance(rule, SecurityGroupRule):
+ self.rules.append(rule)
+ else:
+ self.rules.append(SecurityGroupRule(**rule))
+
def __eq__(self, other):
- return (self.name == other.name and self.id == other.id and
- self.project_id == other.project_id)
+ return (self.name == other.name and
+ self.id == other.id and
+ self.description == other.description and
+ self.project_id == other.project_id and
+ self.rules == other.rules)
class SecurityGroupRule:
"""
Constructor
:param id: the security group rule's id
- :param sec_grp_id: the ID of the associated security group
+ :param security_group_id: the ID of the associated security group
:param description: the security group rule's description
:param direction: the security group rule's direction
:param ethertype: the security group rule's ethertype
def test_construction_proj_id_kwargs(self):
sec_grp = SecurityGroup(
**{'name': 'name', 'id': 'id', 'project_id': 'foo',
- 'description': 'test desc'})
+ 'description': 'test desc',
+ 'rules': [
+ {'id': 'id', 'security_group_id': 'grp_id',
+ 'description': 'desc', 'direction': 'dir',
+ 'ethertype': 'eType', 'port_range_min': '10.0.0.100',
+ 'port_range_max': '10.0.0.200', 'protocol': 'proto',
+ 'remote_group_id': 'group_id',
+ 'remote_ip_prefix': 'ip_prefix'}
+ ]})
self.assertEqual('name', sec_grp.name)
self.assertEqual('id', sec_grp.id)
self.assertEqual('test desc', sec_grp.description)
self.assertEqual('foo', sec_grp.project_id)
+ self.assertEqual(1, len(sec_grp.rules))
+ rule = sec_grp.rules[0]
+ self.assertEqual('id', rule.id)
+ self.assertEqual('grp_id', rule.security_group_id)
+ self.assertEqual('desc', rule.description)
+ self.assertEqual('dir', rule.direction)
+ self.assertEqual('eType', rule.ethertype)
+ self.assertEqual('10.0.0.100', rule.port_range_min)
+ self.assertEqual('10.0.0.200', rule.port_range_max)
+ self.assertEqual('proto', rule.protocol)
+ self.assertEqual('group_id', rule.remote_group_id)
+ self.assertEqual('ip_prefix', rule.remote_ip_prefix)
+
def test_construction_tenant_id_kwargs(self):
sec_grp = SecurityGroup(
**{'name': 'name', 'id': 'id',
self.assertEqual('id', sec_grp.id)
self.assertEqual('foo', sec_grp.project_id)
self.assertIsNone(sec_grp.description)
+ self.assertEqual(0, len(sec_grp.rules))
def test_construction_named(self):
+ rules = [SecurityGroupRule(
+ **{'id': 'id', 'security_group_id': 'grp_id',
+ 'description': 'desc', 'direction': 'dir',
+ 'ethertype': 'eType', 'port_range_min': '10.0.0.100',
+ 'port_range_max': '10.0.0.200', 'protocol': 'proto',
+ 'remote_group_id': 'group_id',
+ 'remote_ip_prefix': 'ip_prefix'}
+ )]
sec_grp = SecurityGroup(description='test desc', tenant_id='foo',
- id='id', name='name')
+ id='id', name='name', rules=rules)
self.assertEqual('name', sec_grp.name)
self.assertEqual('id', sec_grp.id)
self.assertEqual('test desc', sec_grp.description)
self.assertEqual('foo', sec_grp.project_id)
+ self.assertEqual(1, len(sec_grp.rules))
+ rule = sec_grp.rules[0]
+ self.assertEqual('id', rule.id)
+ self.assertEqual('grp_id', rule.security_group_id)
+ self.assertEqual('desc', rule.description)
+ self.assertEqual('dir', rule.direction)
+ self.assertEqual('eType', rule.ethertype)
+ self.assertEqual('10.0.0.100', rule.port_range_min)
+ self.assertEqual('10.0.0.200', rule.port_range_max)
+ self.assertEqual('proto', rule.protocol)
+ self.assertEqual('group_id', rule.remote_group_id)
+ self.assertEqual('ip_prefix', rule.remote_ip_prefix)
+
class SecurityGroupRuleDomainObjectTests(unittest.TestCase):
"""
rule_setting = self.__get_setting_from_rule(existing_rule)
self.__rules[rule_setting] = existing_rule
+ self.__security_group = neutron_utils.get_security_group_by_id(
+ self._neutron, self.__security_group.id)
+
return self.__security_group
def create(self):
from snaps.openstack.create_flavor import OpenStackFlavor
from snaps.openstack.create_instance import OpenStackVmInstance
from snaps.openstack.create_keypairs import OpenStackKeypair
+from snaps.openstack.create_security_group import OpenStackSecurityGroup
from snaps.openstack.create_router import OpenStackRouter
from snaps.openstack.create_volume import OpenStackVolume
from snaps.openstack.create_volume_type import OpenStackVolumeType
return out
+ def get_security_group_creators(self):
+ """
+ Returns a list of security group creator objects as configured by the
+ heat template
+ :return: list() of OpenStackNetwork objects
+ """
+
+ neutron = neutron_utils.neutron_client(self._os_creds)
+
+ out = list()
+ stack_security_groups = heat_utils.get_stack_security_groups(
+ self.__heat_cli, neutron, self.__stack)
+
+ for stack_security_group in stack_security_groups:
+ settings = settings_utils.create_security_group_settings(
+ neutron, stack_security_group)
+ creator = OpenStackSecurityGroup(self._os_creds, settings)
+ out.append(creator)
+ creator.initialize()
+
+ return out
+
def get_router_creators(self):
"""
Returns a list of router creator objects as configured by the heat
self.assertEqual(creator.get_keypair(), keypair)
+class CreateStackSecurityGroupTests(OSIntegrationTestCase):
+ """
+ Tests for the OpenStackHeatStack class to ensure it returns an
+ OpenStackSecurityGroup object
+ """
+
+ def setUp(self):
+ """
+ Instantiates the CreateStack object that is responsible for downloading
+ and creating an OS stack file within OpenStack
+ """
+ super(self.__class__, self).__start__()
+
+ self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+
+ self.heat_creds = self.admin_os_creds
+ self.heat_creds.project_name = self.admin_os_creds.project_name
+
+ self.heat_cli = heat_utils.heat_client(self.heat_creds)
+ self.nova = nova_utils.nova_client(self.heat_creds)
+ self.stack_creator = None
+
+ self.security_group_name = self.guid + '-sec-grp'
+
+ self.env_values = {
+ 'security_group_name': self.security_group_name}
+
+ self.heat_tmplt_path = pkg_resources.resource_filename(
+ 'snaps.openstack.tests.heat', 'security_group_heat_template.yaml')
+
+ stack_settings = StackSettings(
+ name=self.__class__.__name__ + '-' + str(self.guid) + '-stack',
+ template_path=self.heat_tmplt_path,
+ env_values=self.env_values)
+ self.stack_creator = create_stack.OpenStackHeatStack(
+ self.heat_creds, stack_settings)
+ self.created_stack = self.stack_creator.create()
+ self.assertIsNotNone(self.created_stack)
+
+ def tearDown(self):
+ """
+ Cleans the stack and downloaded stack file
+ """
+ if self.stack_creator:
+ try:
+ self.stack_creator.clean()
+ except:
+ pass
+
+ super(self.__class__, self).__clean__()
+
+ def test_retrieve_security_group_creator(self):
+ """
+ Tests the creation of an OpenStack stack from Heat template file and
+ the retrieval of an OpenStackSecurityGroup creator/state machine
+ instance
+ """
+ sec_grp_creators = self.stack_creator.get_security_group_creators()
+ self.assertEqual(1, len(sec_grp_creators))
+
+ creator = sec_grp_creators[0]
+ sec_grp = creator.get_security_group()
+
+ self.assertEqual(self.security_group_name, sec_grp.name)
+ self.assertEqual('Test description', sec_grp.description)
+ self.assertEqual(2, len(sec_grp.rules))
+
+ has_ssh_rule = False
+ has_icmp_rule = False
+
+ for rule in sec_grp.rules:
+ if (rule.security_group_id == sec_grp.id
+ and rule.direction == 'egress'
+ and rule.ethertype == 'IPv4'
+ and rule.port_range_min == 22
+ and rule.port_range_max == 22
+ and rule.protocol == 'tcp'
+ and rule.remote_group_id is None
+ and rule.remote_ip_prefix == '0.0.0.0/0'):
+ has_ssh_rule = True
+ if (rule.security_group_id == sec_grp.id
+ and rule.direction == 'ingress'
+ and rule.ethertype == 'IPv4'
+ and rule.port_range_min is None
+ and rule.port_range_max is None
+ and rule.protocol == 'icmp'
+ and rule.remote_group_id is None
+ and rule.remote_ip_prefix == '0.0.0.0/0'):
+ has_icmp_rule = True
+
+ self.assertTrue(has_ssh_rule)
+ self.assertTrue(has_icmp_rule)
+
+
class CreateStackNegativeTests(OSIntegrationTestCase):
"""
Negative test cases for the OpenStackHeatStack class with poor
--- /dev/null
+##############################################################################
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##############################################################################
+heat_template_version: 2015-04-30
+
+description: >
+ Sample template for creating a single SecurityGroup
+
+parameters:
+ security_group_name:
+ type: string
+ label: Security Group name
+ description: The name of the stack's security group
+ default: security_group_name
+
+resources:
+ server_security_group:
+ type: OS::Neutron::SecurityGroup
+ properties:
+ description: Test description
+ name: { get_param: security_group_name }
+ rules:
+ - direction: egress
+ ethertype: IPv4
+ port_range_min: 22
+ port_range_max: 22
+ protocol: tcp
+ remote_ip_prefix: 0.0.0.0/0
+ - direction: ingress
+ ethertype: IPv4
+ protocol: icmp
+ remote_ip_prefix: 0.0.0.0/0
return out
+def get_stack_security_groups(heat_cli, neutron, stack):
+ """
+ Returns a list of SecurityGroup domain objects deployed by this stack
+ :param heat_cli: the OpenStack heat client object
+ :param neutron: the OpenStack neutron client object
+ :param stack: the SNAPS-OO Stack domain object
+ :return: a list of SecurityGroup objects
+ """
+
+ out = list()
+ resources = get_resources(heat_cli, stack, 'OS::Neutron::SecurityGroup')
+ for resource in resources:
+ security_group = neutron_utils.get_security_group_by_id(
+ neutron, resource.id)
+ if security_group:
+ out.append(security_group)
+
+ return out
+
+
def get_stack_servers(heat_cli, nova, stack):
"""
Returns a list of VMInst domain objects associated with a Stack
sec_grp_settings.name)
os_group = neutron.create_security_group(
sec_grp_settings.dict_for_neutron(keystone))
- return SecurityGroup(**os_group['security_group'])
+ return __map_os_security_group(neutron, os_group['security_group'])
def delete_security_group(neutron, sec_grp):
groups = neutron.list_security_groups(**sec_grp_filter)
for group in groups['security_groups']:
- return SecurityGroup(**group)
+ return __map_os_security_group(neutron, group)
+
+
+def __map_os_security_group(neutron, os_sec_grp):
+ """
+ Creates a SecurityGroup SNAPS domain object from an OpenStack Security
+ Group dict
+ :param neutron: the neutron client for performing rule lookups
+ :param os_sec_grp: the OpenStack Security Group dict object
+ :return: a SecurityGroup object
+ """
+ os_sec_grp['rules'] = get_rules_by_security_group_id(
+ neutron, os_sec_grp['id'])
+ return SecurityGroup(**os_sec_grp)
def get_security_group_by_id(neutron, sec_grp_id):
groups = neutron.list_security_groups(**{'id': sec_grp_id})
for group in groups['security_groups']:
if group['id'] == sec_grp_id:
- return SecurityGroup(**group)
+ return __map_os_security_group(neutron, group)
return None
:param neutron: the client
:param sec_grp: a list of SNAPS SecurityGroupRule domain objects
"""
+ return get_rules_by_security_group_id(neutron, sec_grp.id)
+
+
+def get_rules_by_security_group_id(neutron, sec_grp_id):
+ """
+ Retrieves all of the rules for a given security group
+ :param neutron: the client
+ :param sec_grp_id: the ID of the associated security group
+ """
logger.info('Retrieving security group rules associate with the '
- 'security group - %s', sec_grp.name)
+ 'security group with ID - %s', sec_grp_id)
out = list()
rules = neutron.list_security_group_rules(
- **{'security_group_id': sec_grp.id})
+ **{'security_group_id': sec_grp_id})
for rule in rules['security_group_rules']:
- if rule['security_group_id'] == sec_grp.id:
+ if rule['security_group_id'] == sec_grp_id:
out.append(SecurityGroupRule(**rule))
return out
from snaps.openstack.create_keypairs import KeypairSettings
from snaps.openstack.create_network import (
PortSettings, SubnetSettings, NetworkSettings)
+from snaps.openstack.create_security_group import (
+ SecurityGroupSettings, SecurityGroupRuleSettings)
from snaps.openstack.create_router import RouterSettings
from snaps.openstack.create_volume import VolumeSettings
from snaps.openstack.create_volume_type import (
subnet_settings=create_subnet_settings(neutron, network))
+def create_security_group_settings(neutron, security_group):
+ """
+ Returns a NetworkSettings object
+ :param neutron: the neutron client
+ :param security_group: a SNAPS-OO SecurityGroup domain object
+ :return:
+ """
+ rules = neutron_utils.get_rules_by_security_group(neutron, security_group)
+
+ rule_settings = list()
+ for rule in rules:
+ rule_settings.append(SecurityGroupRuleSettings(
+ sec_grp_name=security_group.name, description=rule.description,
+ direction=rule.direction, ethertype=rule.ethertype,
+ port_range_min=rule.port_range_min,
+ port_range_max=rule.port_range_max, protocol=rule.protocol,
+ remote_group_id=rule.remote_group_id,
+ remote_ip_prefix=rule.remote_ip_prefix))
+
+ return SecurityGroupSettings(
+ name=security_group.name, description=security_group.description,
+ rule_settings=rule_settings)
+
+
def create_subnet_settings(neutron, network):
"""
Returns a list of SubnetSettings objects for a given network
def tearDown(self):
"""
- Cleans the image and downloaded image file
+ Cleans the stack and image
"""
if self.stack1:
try:
def tearDown(self):
"""
- Cleans the image and downloaded image file
+ Cleans the stack and image
"""
if self.stack:
try:
def tearDown(self):
"""
- Cleans the image and downloaded image file
+ Cleans the stack
"""
if self.stack:
try:
def tearDown(self):
"""
- Cleans the image and downloaded image file
+ Cleans the stack
"""
if self.stack:
try:
stack_name = guid + '-stack'
self.keypair_name = guid + '-kp'
- env_values = {
- 'keypair_name': self.keypair_name}
+ env_values = {'keypair_name': self.keypair_name}
heat_tmplt_path = pkg_resources.resource_filename(
'snaps.openstack.tests.heat', 'keypair_heat_template.yaml')
def tearDown(self):
"""
- Cleans the image and downloaded image file
+ Cleans the stack
"""
if self.stack:
try:
self.assertEqual(self.keypair_name, keypair.name)
+class HeatUtilsSecurityGroupTests(OSComponentTestCase):
+ """
+ Test Heat volume functionality
+ """
+
+ def setUp(self):
+ """
+ Instantiates OpenStack instances that cannot be spawned by Heat
+ """
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ stack_name = guid + '-stack'
+ self.sec_grp_name = guid + '-sec-grp'
+
+ env_values = {'security_group_name': self.sec_grp_name}
+
+ heat_tmplt_path = pkg_resources.resource_filename(
+ 'snaps.openstack.tests.heat', 'security_group_heat_template.yaml')
+ self.stack_settings = StackSettings(
+ name=stack_name, template_path=heat_tmplt_path,
+ env_values=env_values)
+ self.stack = None
+ self.heat_client = heat_utils.heat_client(self.os_creds)
+ self.neutron = neutron_utils.neutron_client(self.os_creds)
+
+ def tearDown(self):
+ """
+ Cleans the stack
+ """
+ if self.stack:
+ try:
+ heat_utils.delete_stack(self.heat_client, self.stack)
+ except:
+ pass
+
+ def test_create_security_group_with_stack(self):
+ """
+ Tests the creation of an OpenStack SecurityGroup with Heat.
+ """
+ self.stack = heat_utils.create_stack(
+ self.heat_client, self.stack_settings)
+ self.assertTrue(stack_active(self.heat_client, self.stack))
+
+ sec_grp = heat_utils.get_stack_security_groups(
+ self.heat_client, self.neutron, self.stack)[0]
+
+ self.assertEqual(self.sec_grp_name, sec_grp.name)
+ self.assertEqual('Test description', sec_grp.description)
+ self.assertEqual(2, len(sec_grp.rules))
+
+ has_ssh_rule = False
+ has_icmp_rule = False
+
+ for rule in sec_grp.rules:
+ if (rule.security_group_id == sec_grp.id
+ and rule.direction == 'egress'
+ and rule.ethertype == 'IPv4'
+ and rule.port_range_min == 22
+ and rule.port_range_max == 22
+ and rule.protocol == 'tcp'
+ and rule.remote_group_id is None
+ and rule.remote_ip_prefix == '0.0.0.0/0'):
+ has_ssh_rule = True
+ if (rule.security_group_id == sec_grp.id
+ and rule.direction == 'ingress'
+ and rule.ethertype == 'IPv4'
+ and rule.port_range_min is None
+ and rule.port_range_max is None
+ and rule.protocol == 'icmp'
+ and rule.remote_group_id is None
+ and rule.remote_ip_prefix == '0.0.0.0/0'):
+ has_icmp_rule = True
+
+ self.assertTrue(has_ssh_rule)
+ self.assertTrue(has_icmp_rule)
+
+
def stack_active(heat_cli, stack):
"""
Blocks until stack application has successfully completed or failed
from snaps.openstack.tests.create_stack_tests import (
StackSettingsUnitTests, CreateStackSuccessTests, CreateStackNegativeTests,
CreateStackFlavorTests, CreateStackFloatingIpTests,
- CreateStackKeypairTests, CreateStackVolumeTests)
+ CreateStackKeypairTests, CreateStackVolumeTests,
+ CreateStackSecurityGroupTests)
from snaps.openstack.tests.create_user_tests import (
UserSettingsUnitTests, CreateUserSuccessTests)
from snaps.openstack.tests.create_volume_tests import (
from snaps.openstack.utils.tests.heat_utils_tests import (
HeatSmokeTests, HeatUtilsCreateSimpleStackTests,
HeatUtilsCreateComplexStackTests, HeatUtilsFlavorTests,
- HeatUtilsKeypairTests, HeatUtilsVolumeTests)
+ HeatUtilsKeypairTests, HeatUtilsVolumeTests, HeatUtilsSecurityGroupTests)
from snaps.openstack.utils.tests.keystone_utils_tests import (
KeystoneSmokeTests, KeystoneUtilsTests)
from snaps.openstack.utils.tests.neutron_utils_tests import (
HeatUtilsKeypairTests, os_creds=os_creds,
ext_net_name=ext_net_name, log_level=log_level,
image_metadata=image_metadata))
+ suite.addTest(OSComponentTestCase.parameterize(
+ HeatUtilsSecurityGroupTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, log_level=log_level,
+ image_metadata=image_metadata))
suite.addTest(OSComponentTestCase.parameterize(
CinderUtilsQoSTests, os_creds=os_creds,
ext_net_name=ext_net_name, log_level=log_level,
use_keystone=use_keystone,
flavor_metadata=flavor_metadata, image_metadata=image_metadata,
log_level=log_level))
+ suite.addTest(OSIntegrationTestCase.parameterize(
+ CreateStackSecurityGroupTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
CreateStackNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name,
use_keystone=use_keystone,