X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=snaps%2Fopenstack%2Ftests%2Fcreate_security_group_tests.py;h=090d7361c8190f741157dee74209baab898836f0;hb=8d64362faf1998bceae353da3d29883883a6c351;hp=99ea53aa9ff74d43bdad0d199c0da79a0651c47e;hpb=27b8b1c246d63c14f112a99362873fe887c13c10;p=snaps.git diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py index 99ea53a..090d736 100644 --- a/snaps/openstack/tests/create_security_group_tests.py +++ b/snaps/openstack/tests/create_security_group_tests.py @@ -15,10 +15,13 @@ import unittest import uuid +from snaps.config.security_group import ( + SecurityGroupConfig, SecurityGroupRuleConfig, + SecurityGroupRuleConfigError, SecurityGroupConfigError) from snaps.openstack import create_security_group from snaps.openstack.create_security_group import ( SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype, - Protocol, SecurityGroupRuleSettingsError, SecurityGroupSettingsError) + Protocol) from snaps.openstack.tests import validation_utils from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase from snaps.openstack.utils import neutron_utils @@ -32,32 +35,64 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): """ def test_no_params(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings() def test_empty_config(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings(**dict()) def test_name_only(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings(sec_grp_name='foo') def test_config_with_name_only(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'}) def test_name_and_direction(self): settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) + self.assertEqual(Direction.ingress.value, settings.direction.value) def test_config_name_and_direction(self): settings = SecurityGroupRuleSettings( **{'sec_grp_name': 'foo', 'direction': 'ingress'}) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) + self.assertEqual(Direction.ingress.value, settings.direction.value) + + def test_proto_ah_str(self): + settings = SecurityGroupRuleSettings( + **{'sec_grp_name': 'foo', 'direction': 'ingress', + 'protocol': 'ah'}) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.ah.value, settings.protocol.value) + + def test_proto_ah_value(self): + settings = SecurityGroupRuleSettings( + **{'sec_grp_name': 'foo', 'direction': 'ingress', + 'protocol': 51}) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.ah.value, settings.protocol.value) + + def test_proto_any(self): + settings = SecurityGroupRuleSettings( + **{'sec_grp_name': 'foo', 'direction': 'ingress', + 'protocol': 'any'}) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.null.value, settings.protocol.value) + + def test_proto_null(self): + settings = SecurityGroupRuleSettings( + **{'sec_grp_name': 'foo', 'direction': 'ingress', + 'protocol': 'null'}) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.null.value, settings.protocol.value) def test_all(self): settings = SecurityGroupRuleSettings( @@ -68,10 +103,10 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): remote_ip_prefix='prfx') self.assertEqual('foo', settings.sec_grp_name) self.assertEqual('fubar', settings.description) - self.assertEqual(Direction.egress, settings.direction) + self.assertEqual(Direction.egress.value, settings.direction.value) self.assertEqual('rgi', settings.remote_group_id) - self.assertEqual(Protocol.icmp, settings.protocol) - self.assertEqual(Ethertype.IPv6, settings.ethertype) + self.assertEqual(Protocol.icmp.value, settings.protocol.value) + self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value) self.assertEqual(1, settings.port_range_min) self.assertEqual(2, settings.port_range_max) self.assertEqual('prfx', settings.remote_ip_prefix) @@ -89,10 +124,10 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): 'remote_ip_prefix': 'prfx'}) self.assertEqual('foo', settings.sec_grp_name) self.assertEqual('fubar', settings.description) - self.assertEqual(Direction.egress, settings.direction) + self.assertEqual(Direction.egress.value, settings.direction.value) self.assertEqual('rgi', settings.remote_group_id) - self.assertEqual(Protocol.tcp, settings.protocol) - self.assertEqual(Ethertype.IPv6, settings.ethertype) + self.assertEqual(Protocol.tcp.value, settings.protocol.value) + self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value) self.assertEqual(1, settings.port_range_min) self.assertEqual(2, settings.port_range_max) self.assertEqual('prfx', settings.remote_ip_prefix) @@ -104,11 +139,11 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): """ def test_no_params(self): - with self.assertRaises(SecurityGroupSettingsError): + with self.assertRaises(SecurityGroupConfigError): SecurityGroupSettings() def test_empty_config(self): - with self.assertRaises(SecurityGroupSettingsError): + with self.assertRaises(SecurityGroupConfigError): SecurityGroupSettings(**dict()) def test_name_only(self): @@ -123,7 +158,7 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): rule_setting = SecurityGroupRuleSettings( sec_grp_name='bar', direction=Direction.ingress, description='test_rule_1') - with self.assertRaises(SecurityGroupSettingsError): + with self.assertRaises(SecurityGroupConfigError): SecurityGroupSettings(name='foo', rule_settings=[rule_setting]) def test_all(self): @@ -157,8 +192,8 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): self.assertEqual('foo', settings.project_name) self.assertEqual(1, len(settings.rule_settings)) self.assertEqual('bar', settings.rule_settings[0].sec_grp_name) - self.assertEqual(Direction.ingress, - settings.rule_settings[0].direction) + self.assertEqual(Direction.ingress.value, + settings.rule_settings[0].direction.value) class CreateSecurityGroupTests(OSIntegrationTestCase): @@ -194,8 +229,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, - description='hello group') + sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name, + description='hello group') self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( self.os_creds, sec_grp_settings) self.sec_grp_creator.create() @@ -222,7 +257,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', project_name=self.admin_os_creds.project_name) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -251,7 +286,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', project_name=self.os_creds.project_name) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -280,8 +315,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, - description='hello group') + sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name, + description='hello group') self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( self.os_creds, sec_grp_settings) created_sec_grp = self.sec_grp_creator.create() @@ -307,10 +342,10 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -340,12 +375,12 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20, description='test_rule_1')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -375,21 +410,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv6, description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20, description='test_rule_3')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -419,10 +454,10 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -448,7 +483,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) - self.sec_grp_creator.add_rule(SecurityGroupRuleSettings( + self.sec_grp_creator.add_rule(SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_creator.sec_grp_settings.name, direction=Direction.egress, protocol=Protocol.icmp, description='test_rule_2')) @@ -464,21 +499,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv6, description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20, description='test_rule_3')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -515,21 +550,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv6, description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20, description='test_rule_3')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -583,7 +618,7 @@ def validate_sec_grp_rules(neutron, rule_settings, rules): this is the only means to tell if the rule is custom or defaulted by OpenStack :param neutron: the neutron client - :param rule_settings: collection of SecurityGroupRuleSettings objects + :param rule_settings: collection of SecurityGroupRuleConfig objects :param rules: a collection of SecurityGroupRule domain objects :return: T/F """ @@ -592,11 +627,6 @@ def validate_sec_grp_rules(neutron, rule_settings, rules): if rule_setting.description: match = False for rule in rules: - if rule_setting.protocol == Protocol.null: - setting_proto = None - else: - setting_proto = rule_setting.protocol.name - sec_grp = neutron_utils.get_security_group( neutron, sec_grp_name=rule_setting.sec_grp_name) @@ -607,15 +637,19 @@ def validate_sec_grp_rules(neutron, rule_settings, rules): if not sec_grp: return False + proto_str = 'null' + if rule.protocol: + proto_str = rule.protocol + if (rule.description == rule_setting.description and rule.direction == rule_setting.direction.name and rule.ethertype == setting_eth_type.name and rule.port_range_max == rule_setting.port_range_max and rule.port_range_min == rule_setting.port_range_min and - rule.protocol == setting_proto and + proto_str == str(rule_setting.protocol.value) and rule.remote_group_id == rule_setting.remote_group_id and rule.remote_ip_prefix == rule_setting.remote_ip_prefix and - rule.security_group_id == sec_grp.id): + rule.security_group_id == sec_grp.id): match = True break