import unittest
import uuid
+from snaps.config.security_group import (
+ SecurityGroupConfig, SecurityGroupRuleConfig,
+ SecurityGroupRuleConfigError, SecurityGroupConfigError)
from snaps.openstack import create_security_group
from snaps.openstack.create_security_group import (
SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
- Protocol, SecurityGroupRuleSettingsError, SecurityGroupSettingsError)
+ Protocol, OpenStackSecurityGroup)
from snaps.openstack.tests import validation_utils
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
from snaps.openstack.utils import neutron_utils
"""
def test_no_params(self):
- with self.assertRaises(SecurityGroupRuleSettingsError):
+ with self.assertRaises(SecurityGroupRuleConfigError):
SecurityGroupRuleSettings()
def test_empty_config(self):
- with self.assertRaises(SecurityGroupRuleSettingsError):
+ with self.assertRaises(SecurityGroupRuleConfigError):
SecurityGroupRuleSettings(**dict())
def test_name_only(self):
- with self.assertRaises(SecurityGroupRuleSettingsError):
+ with self.assertRaises(SecurityGroupRuleConfigError):
SecurityGroupRuleSettings(sec_grp_name='foo')
def test_config_with_name_only(self):
- with self.assertRaises(SecurityGroupRuleSettingsError):
+ with self.assertRaises(SecurityGroupRuleConfigError):
SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
def test_name_and_direction(self):
settings = SecurityGroupRuleSettings(sec_grp_name='foo',
direction=Direction.ingress)
self.assertEqual('foo', settings.sec_grp_name)
- self.assertEqual(Direction.ingress, settings.direction)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
def test_config_name_and_direction(self):
settings = SecurityGroupRuleSettings(
**{'sec_grp_name': 'foo', 'direction': 'ingress'})
self.assertEqual('foo', settings.sec_grp_name)
- self.assertEqual(Direction.ingress, settings.direction)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
+
+ def test_proto_ah_str(self):
+ settings = SecurityGroupRuleSettings(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 'ah'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
+ self.assertEqual(Protocol.ah.value, settings.protocol.value)
+
+ def test_proto_ah_value(self):
+ settings = SecurityGroupRuleSettings(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 51})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
+ self.assertEqual(Protocol.ah.value, settings.protocol.value)
+
+ def test_proto_any(self):
+ settings = SecurityGroupRuleSettings(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 'any'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
+ self.assertEqual(Protocol.null.value, settings.protocol.value)
+
+ def test_proto_null(self):
+ settings = SecurityGroupRuleSettings(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 'null'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
+ self.assertEqual(Protocol.null.value, settings.protocol.value)
def test_all(self):
settings = SecurityGroupRuleSettings(
remote_ip_prefix='prfx')
self.assertEqual('foo', settings.sec_grp_name)
self.assertEqual('fubar', settings.description)
- self.assertEqual(Direction.egress, settings.direction)
+ self.assertEqual(Direction.egress.value, settings.direction.value)
self.assertEqual('rgi', settings.remote_group_id)
- self.assertEqual(Protocol.icmp, settings.protocol)
- self.assertEqual(Ethertype.IPv6, settings.ethertype)
+ self.assertEqual(Protocol.icmp.value, settings.protocol.value)
+ self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
self.assertEqual(1, settings.port_range_min)
self.assertEqual(2, settings.port_range_max)
self.assertEqual('prfx', settings.remote_ip_prefix)
'remote_ip_prefix': 'prfx'})
self.assertEqual('foo', settings.sec_grp_name)
self.assertEqual('fubar', settings.description)
- self.assertEqual(Direction.egress, settings.direction)
+ self.assertEqual(Direction.egress.value, settings.direction.value)
self.assertEqual('rgi', settings.remote_group_id)
- self.assertEqual(Protocol.tcp, settings.protocol)
- self.assertEqual(Ethertype.IPv6, settings.ethertype)
+ self.assertEqual(Protocol.tcp.value, settings.protocol.value)
+ self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
self.assertEqual(1, settings.port_range_min)
self.assertEqual(2, settings.port_range_max)
self.assertEqual('prfx', settings.remote_ip_prefix)
"""
def test_no_params(self):
- with self.assertRaises(SecurityGroupSettingsError):
+ with self.assertRaises(SecurityGroupConfigError):
SecurityGroupSettings()
def test_empty_config(self):
- with self.assertRaises(SecurityGroupSettingsError):
+ with self.assertRaises(SecurityGroupConfigError):
SecurityGroupSettings(**dict())
def test_name_only(self):
self.assertEqual('foo', settings.name)
def test_invalid_rule(self):
- rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar',
- direction=Direction.ingress)
- with self.assertRaises(SecurityGroupSettingsError):
+ rule_setting = SecurityGroupRuleSettings(
+ sec_grp_name='bar', direction=Direction.ingress,
+ description='test_rule_1')
+ with self.assertRaises(SecurityGroupConfigError):
SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
def test_all(self):
rule_settings = list()
rule_settings.append(SecurityGroupRuleSettings(
- sec_grp_name='bar', direction=Direction.egress))
+ sec_grp_name='bar', direction=Direction.egress,
+ description='test_rule_1'))
rule_settings.append(SecurityGroupRuleSettings(
- sec_grp_name='bar', direction=Direction.ingress))
+ sec_grp_name='bar', direction=Direction.ingress,
+ description='test_rule_2'))
settings = SecurityGroupSettings(
name='bar', description='fubar', project_name='foo',
rule_settings=rule_settings)
self.assertEqual('foo', settings.project_name)
self.assertEqual(1, len(settings.rule_settings))
self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
- self.assertEqual(Direction.ingress,
- settings.rule_settings[0].direction)
+ self.assertEqual(Direction.ingress.value,
+ settings.rule_settings[0].direction.value)
class CreateSecurityGroupTests(OSIntegrationTestCase):
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
self.sec_grp_name = guid + 'name'
- self.neutron = neutron_utils.neutron_client(self.os_creds)
+ self.neutron = neutron_utils.neutron_client(
+ self.os_creds, self.os_session)
# Initialize for cleanup
self.sec_grp_creator = None
"""
Tests the creation of an OpenStack Security Group without custom rules.
"""
- # Create Image
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
- description='hello group')
+ # Create Security Group
+ sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
+ description='hello group')
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp)
validation_utils.objects_equivalent(
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group()))
+
def test_create_group_admin_user_to_new_project(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
"""
- # Create Image
- sec_grp_settings = SecurityGroupSettings(
+ # Create Security Group
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
- project_name=self.admin_os_creds.project_name)
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
- self.os_creds, sec_grp_settings)
+ project_name=self.os_creds.project_name)
+ self.sec_grp_creator = OpenStackSecurityGroup(
+ self.admin_os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp)
validation_utils.objects_equivalent(
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
+ self.assertEqual(self.sec_grp_creator.get_security_group().id,
+ sec_grp.id)
+
+ proj_creator = OpenStackSecurityGroup(
+ self.os_creds, SecurityGroupConfig(name=self.sec_grp_name))
+ proj_creator.create()
+
+ self.assertEqual(self.sec_grp_creator.get_security_group().id,
+ proj_creator.get_security_group().id)
+
def test_create_group_new_user_to_admin_project(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
"""
- # Create Image
- sec_grp_settings = SecurityGroupSettings(
+ # Create Security Group
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
project_name=self.os_creds.project_name)
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
self.admin_os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp)
validation_utils.objects_equivalent(
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_create_delete_group(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
"""
- # Create Image
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
- description='hello group')
+ # Create Security Group
+ sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
+ description='hello group')
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
self.os_creds, sec_grp_settings)
created_sec_grp = self.sec_grp_creator.create()
self.assertIsNotNone(created_sec_grp)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group()))
+
neutron_utils.delete_security_group(self.neutron, created_sec_grp)
self.assertIsNone(neutron_utils.get_security_group(
- self.neutron, self.sec_grp_creator.sec_grp_settings.name))
+ self.neutron, self.keystone,
+ sec_grp_settings=self.sec_grp_creator.sec_grp_settings))
self.sec_grp_creator.clean()
Tests the creation of an OpenStack Security Group with one simple
custom rule.
"""
- # Create Image
+ # Create Security Group
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
- sec_grp_settings = SecurityGroupSettings(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
+ def test_create_group_with_one_complex_rule(self):
+ """
+ Tests the creation of an OpenStack Security Group with one simple
+ custom rule.
+ """
+ # Create Security Group
+ sec_grp_rule_settings = list()
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_1'))
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
+ self.sec_grp_creator.create()
+
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_create_group_with_several_rules(self):
"""
Tests the creation of an OpenStack Security Group with one simple
custom rule.
"""
- # Create Image
+ # Create Security Group
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
- sec_grp_settings = SecurityGroupSettings(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_add_rule(self):
"""
Tests the creation of an OpenStack Security Group with one simple
custom rule then adds one after creation.
"""
- # Create Image
+ # Create Security Group
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
- sec_grp_settings = SecurityGroupSettings(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
+
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
rules = neutron_utils.get_rules_by_security_group(
self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
- self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
+ self.sec_grp_creator.add_rule(SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
- direction=Direction.egress, protocol=Protocol.icmp))
+ direction=Direction.egress, protocol=Protocol.icmp,
+ description='test_rule_2'))
rules2 = neutron_utils.get_rules_by_security_group(
self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(rules) + 1, len(rules2))
Tests the creation of an OpenStack Security Group with two simple
custom rules then removes one by the rule ID.
"""
- # Create Image
+ # Create Security Group
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
- sec_grp_settings = SecurityGroupSettings(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
self.sec_grp_creator.remove_rule(
rule_id=rules[0].id)
rules_after_del = neutron_utils.get_rules_by_security_group(
Tests the creation of an OpenStack Security Group with two simple
custom rules then removes one by the rule setting object
"""
- # Create Image
+ # Create Security Group
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
- sec_grp_settings = SecurityGroupSettings(
+ SecurityGroupRuleConfig(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
+
rules = neutron_utils.get_rules_by_security_group(
self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
rules_after_del = neutron_utils.get_rules_by_security_group(
self.neutron,
self.sec_grp_creator.get_security_group())
self.assertEqual(len(rules) - 1, len(rules_after_del))
-# TODO - Add more tests with different rules. Rule creation parameters can be
-# somewhat complex
+
+def validate_sec_grp(neutron, keystone, sec_grp_settings, sec_grp,
+ rules=list()):
+ """
+ Returns True is the settings on a security group are properly contained
+ on the SNAPS SecurityGroup domain object
+ :param neutron: the neutron client
+ :param keystone: the keystone client
+ :param sec_grp_settings: the security group configuration
+ :param sec_grp: the SNAPS-OO security group object
+ :param rules: collection of SNAPS-OO security group rule objects
+ :return: T/F
+ """
+ return (sec_grp.description == sec_grp_settings.description and
+ sec_grp.name == sec_grp_settings.name and
+ validate_sec_grp_rules(
+ neutron, keystone, sec_grp_settings.rule_settings, rules))
+
+
+def validate_sec_grp_rules(neutron, keystone, rule_settings, rules):
+ """
+ Returns True is the settings on a security group rule are properly
+ contained on the SNAPS SecurityGroupRule domain object.
+ This function will only operate on rules that contain a description as
+ this is the only means to tell if the rule is custom or defaulted by
+ OpenStack
+ :param neutron: the neutron client
+ :param keystone: the keystone client
+ :param rule_settings: collection of SecurityGroupRuleConfig objects
+ :param rules: a collection of SecurityGroupRule domain objects
+ :return: T/F
+ """
+
+ for rule_setting in rule_settings:
+ if rule_setting.description:
+ match = False
+ for rule in rules:
+ sec_grp = neutron_utils.get_security_group(
+ neutron, keystone, sec_grp_name=rule_setting.sec_grp_name)
+
+ setting_eth_type = create_security_group.Ethertype.IPv4
+ if rule_setting.ethertype:
+ setting_eth_type = rule_setting.ethertype
+
+ if not sec_grp:
+ return False
+
+ proto_str = 'null'
+ if rule.protocol:
+ proto_str = rule.protocol
+
+ if (rule.description == rule_setting.description and
+ rule.direction == rule_setting.direction.name and
+ rule.ethertype == setting_eth_type.name and
+ rule.port_range_max == rule_setting.port_range_max and
+ rule.port_range_min == rule_setting.port_range_min and
+ proto_str == str(rule_setting.protocol.value) and
+ rule.remote_group_id == rule_setting.remote_group_id and
+ rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
+ rule.security_group_id == sec_grp.id):
+ match = True
+ break
+
+ if not match:
+ return False
+
+ return True
+
+
+class CreateMultipleSecurityGroupTests(OSIntegrationTestCase):
+ """
+ Test for the CreateSecurityGroup class and how it interacts with security
+ groups within other projects with the same name
+ """
+
+ def setUp(self):
+ """
+ Instantiates the CreateSecurityGroup object that is responsible for
+ downloading and creating an OS image file within OpenStack
+ """
+ super(self.__class__, self).__start__()
+
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ self.sec_grp_name = guid + 'name'
+ self.neutron = neutron_utils.neutron_client(
+ self.os_creds, self.os_session)
+
+ # Initialize for cleanup
+ self.admin_sec_grp_config = SecurityGroupConfig(
+ name=self.sec_grp_name, description='hello group')
+ self.sec_grp_creator_admin = OpenStackSecurityGroup(
+ self.admin_os_creds, self.admin_sec_grp_config)
+ self.sec_grp_creator_admin.create()
+ self.sec_grp_creator_proj = None
+
+ def tearDown(self):
+ """
+ Cleans the image and downloaded image file
+ """
+ if self.sec_grp_creator_admin:
+ self.sec_grp_creator_admin.clean()
+ if self.sec_grp_creator_proj:
+ self.sec_grp_creator_proj.clean()
+
+ super(self.__class__, self).__clean__()
+
+ def test_sec_grp_same_name_diff_proj(self):
+ """
+ Tests the creation of an OpenStack Security Group with the same name
+ within a different project/tenant.
+ """
+ # Create Security Group
+ sec_grp_config = SecurityGroupConfig(
+ name=self.sec_grp_name, description='hello group')
+ self.sec_grp_creator_proj = OpenStackSecurityGroup(
+ self.os_creds, sec_grp_config)
+ self.sec_grp_creator_proj.create()
+
+ self.assertNotEqual(
+ self.sec_grp_creator_admin.get_security_group().id,
+ self.sec_grp_creator_proj.get_security_group().id)
+
+ admin_sec_grp_creator = OpenStackSecurityGroup(
+ self.admin_os_creds, self.admin_sec_grp_config)
+ admin_sec_grp_creator.create()
+ self.assertEqual(self.sec_grp_creator_admin.get_security_group().id,
+ admin_sec_grp_creator.get_security_group().id)
+
+ proj_sec_grp_creator = OpenStackSecurityGroup(
+ self.os_creds, sec_grp_config)
+ proj_sec_grp_creator.create()
+ self.assertEqual(self.sec_grp_creator_proj.get_security_group().id,
+ proj_sec_grp_creator.get_security_group().id)